Parallelization#
PyFlowReg provides multiple parallelization backends for efficient batch processing implemented through a runtime registry system.
Parallelization Backends#
Sequential#
Single-threaded processing, most memory-efficient
from pyflowreg.motion_correction import compensate_recording, OFOptions
from pyflowreg.motion_correction.compensate_recording import RegistrationConfig
config = RegistrationConfig(parallelization="sequential")
compensate_recording(options, config=config)
When to use:
Debugging (easier to trace errors)
Limited memory systems
Small datasets
Pros:
Lowest memory footprint
Deterministic execution
Simple error handling
Cons:
Slowest processing
No CPU parallelization
Threading#
Parallel processing using Python threads
config = RegistrationConfig(
parallelization="threading",
n_jobs=8 # Number of worker threads
)
When to use:
I/O-bound workloads
Limited by disk/network speed
Moderate CPU resources
Pros:
Lower memory overhead than multiprocessing
Good for I/O-bound operations
Shared memory between threads
Cons:
Limited by Python GIL for CPU-bound tasks
Slower than multiprocessing for pure computation
Multiprocessing#
Parallel processing using processes with shared memory (default)
config = RegistrationConfig(
parallelization="multiprocessing",
n_jobs=-1 # Use all available CPUs
)
When to use:
CPU-bound workloads (recommended)
Large multi-core systems
Production processing
Pros:
True parallel execution (bypasses GIL)
Best performance for CPU-intensive tasks
Shared memory for efficient data transfer
Cons:
Higher memory overhead
Slightly more complex error handling
Choosing a Backend#
PyFlowReg auto-selects the best available backend by default:
# Auto-selection order: multiprocessing → threading → sequential
compensate_recording(options) # Uses multiprocessing if available
Manual selection overrides auto-detection:
config = RegistrationConfig(parallelization="threading") # Force threading
Backend Compatibility#
Different optical flow backends support different parallelization modes due to their architecture and Python package compatibility.
CPU Backend (flowreg)#
The default NumPy/Numba backend supports all parallelization modes:
options = OFOptions(flow_backend="flowreg")
# All of these work:
config = RegistrationConfig(parallelization="sequential")
config = RegistrationConfig(parallelization="threading")
config = RegistrationConfig(parallelization="multiprocessing")
GPU Backends (flowreg_torch, flowreg_cuda)#
GPU backends only support sequential execution due to package compatibility constraints:
from pyflowreg.motion_correction import OFOptions
from pyflowreg.motion_correction.compensate_recording import RegistrationConfig
options = OFOptions(
flow_backend="flowreg_torch",
backend_params={"device": "cuda"}
)
# GPU backends require sequential
config = RegistrationConfig(parallelization="sequential")
Automatic fallback: If you request multiprocessing or threading with a GPU backend, PyFlowReg automatically falls back to sequential execution with a warning:
options = OFOptions(flow_backend="flowreg_cuda")
# Requesting multiprocessing with GPU backend
config = RegistrationConfig(parallelization="multiprocessing")
# Warning: Backend 'flowreg_cuda' does not support 'multiprocessing' executor.
# Supported executors: ['sequential']. Falling back to 'sequential'.
compensate_recording(options, config=config) # Uses sequential
Configuration#
RegistrationConfig Parameters#
@dataclass
class RegistrationConfig:
n_jobs: int = -1 # -1 = all cores, N = use N workers
verbose: bool = False # Verbose logging
parallelization: Optional[str] = None # None=auto, 'sequential', 'threading', or 'multiprocessing'
Number of Workers#
config = RegistrationConfig(
n_jobs=-1 # Use all CPU cores
)
# Or specify exact number:
config = RegistrationConfig(
n_jobs=4 # Use 4 workers
)
Guidelines:
n_jobs=-1: Use all available CPUs (recommended)n_jobs=N: Use N worker processes/threadsSequential ignores
n_jobs(always 1)
Buffer Size#
The buffer size controls how many frames are read and processed per batch. This is configured in OFOptions, not RegistrationConfig:
from pyflowreg.motion_correction import OFOptions
options = OFOptions(
buffer_size=400, # Frames per batch (default: 400)
# ... other options
)
Buffer size tradeoffs:
Larger buffers (500-1000):
Fewer I/O operations
Higher memory usage
Better for fast storage (SSD)
Smaller buffers (50-100):
More frequent I/O
Lower memory usage
Better for limited RAM or slow storage (HDD)
Complete Configuration#
from pyflowreg.motion_correction import compensate_recording, OFOptions
from pyflowreg.motion_correction.compensate_recording import RegistrationConfig
options = OFOptions(
input_file="large_video.h5",
output_path="results/",
quality_setting="balanced",
buffer_size=400 # Batch size from MATLAB
)
config = RegistrationConfig(
parallelization="multiprocessing", # Backend selection
n_jobs=-1 # All CPU cores
)
compensate_recording(options, config=config)
Default Behavior#
If no config is provided, defaults are used:
# These are equivalent:
compensate_recording(options)
compensate_recording(options, config=RegistrationConfig())
Default values:
n_jobs=-1(all cores)verbose=Falseparallelization=None(auto-select)
Performance Tips#
Memory Management#
For limited RAM systems:
# Reduce memory usage
config = RegistrationConfig(
parallelization="threading", # Lower memory than multiprocessing
n_jobs=4 # Limit workers
)
options = OFOptions(
buffer_size=50 # Smaller batches
)
For high-RAM systems:
# Maximize throughput
config = RegistrationConfig(
parallelization="multiprocessing",
n_jobs=-1 # All cores
)
options = OFOptions(
buffer_size=500 # Larger batches
)
CPU Utilization#
Check CPU usage during processing:
Sequential: 100% on single core
Threading: Distributed but limited by GIL
Multiprocessing: Near 100% on all cores (best)
Reserve cores for other tasks:
import os
n_cores = os.cpu_count()
config = RegistrationConfig(n_jobs=n_cores - 2) # Leave 2 cores free
I/O Optimization#
For SSD/fast storage:
# Multiprocessing with large buffers
config = RegistrationConfig(parallelization="multiprocessing")
options = OFOptions(buffer_size=500)
For HDD/slow storage:
# Threading to avoid I/O contention
config = RegistrationConfig(parallelization="threading")
options = OFOptions(buffer_size=100)
Examples#
Fast Preview Processing#
# Quick preview with minimal resources
options = OFOptions(
input_file="video.h5",
quality_setting="fast",
buffer_size=50
)
config = RegistrationConfig(parallelization="sequential")
compensate_recording(options, config=config)
Production Processing#
# Maximum performance for production
options = OFOptions(
input_file="large_dataset.h5",
quality_setting="balanced",
buffer_size=500
)
config = RegistrationConfig(
parallelization="multiprocessing",
n_jobs=-1
)
compensate_recording(options, config=config)
Memory-Constrained System#
# Optimize for limited RAM
options = OFOptions(
input_file="video.h5",
quality_setting="balanced",
buffer_size=100
)
config = RegistrationConfig(
parallelization="threading",
n_jobs=4
)
compensate_recording(options, config=config)
Backend Registration#
PyFlowReg uses a runtime registry for parallelization backends. Backends auto-register on import:
from pyflowreg._runtime import RuntimeContext
# Check available backends
available = RuntimeContext.get("available_parallelization", set())
print(f"Available backends: {sorted(available)}")
Troubleshooting#
High Memory Usage#
Problem: Memory usage exceeds available RAM
Solutions:
Reduce
buffer_sizein OFOptionsSwitch to threading or sequential
Reduce
n_jobs
# Low-memory configuration
config = RegistrationConfig(
parallelization="sequential",
n_jobs=1
)
options = OFOptions(buffer_size=50)
Slow Processing#
Problem: Processing slower than expected
Solutions:
Use multiprocessing instead of threading
Increase
buffer_sizeif RAM allowsIncrease
n_jobsUse
quality_setting="fast"for preview
# Fast processing configuration
config = RegistrationConfig(
parallelization="multiprocessing",
n_jobs=-1
)
options = OFOptions(
quality_setting="fast",
buffer_size=500
)
Process Hangs or Crashes#
Problem: Processing hangs or workers crash
Solutions:
Switch to sequential for debugging
Reduce
n_jobsto avoid resource contentionCheck for memory issues
Verify input file integrity
# Debug configuration
config = RegistrationConfig(
parallelization="sequential" # Easier to debug
)