Motion Correction#
High-level APIs for applying motion correction and motion analysis to microscopy videos.
Core Concepts#
Output Formats#
PyFlowReg supports flexible output handling through the output_format parameter for file-based workflows (compensate_recording):
OutputFormat.ARRAY- Accumulate in memory, return as arrayOutputFormat.NULL- Discard output, callback-only processing (no storage overhead)OutputFormat.HDF5- HDF5 file storageOutputFormat.TIFF- TIFF stack outputOutputFormat.MAT- MATLAB compatible files
Callback System#
All motion correction functions support real-time data access through callbacks:
Callback |
Signature |
Description |
|---|---|---|
|
|
Progress updates |
|
|
Access displacement fields during processing |
|
|
Access corrected frames during processing |
Callbacks enable:
Real-time visualization without waiting for completion
Motion tracking and analysis during processing
Memory-efficient file-based processing with
OutputFormat.NULLIntegration with visualization tools like napari
Array-Based Workflow#
The primary function for in-memory motion correction with callback support:
compensate_arr(
c1: np.ndarray, # Video to correct
c_ref: np.ndarray, # Reference frame
options: OFOptions = None, # Configuration
progress_callback: Callable = None, # Progress updates
w_callback: Callable = None, # Displacement access
registered_callback: Callable = None # Corrected frame access
) -> Tuple[np.ndarray, np.ndarray] # Returns (registered, w)
Example with Callbacks#
import numpy as np
from pyflowreg.motion_correction import compensate_arr
from pyflowreg.motion_correction.OF_options import OFOptions
def track_motion(w_batch, start_idx, end_idx):
"""Process displacement fields as they're computed."""
for i in range(w_batch.shape[0]):
magnitude = np.sqrt(w_batch[i, :, :, 0]**2 + w_batch[i, :, :, 1]**2)
print(f"Frame {start_idx + i}: mean motion = {np.mean(magnitude):.2f}")
# Configure array workflow
options = OFOptions(
quality_setting="balanced",
buffer_size=20 # Process 20 frames at a time
)
# Run with callbacks
registered, w = compensate_arr(
video, reference, options,
w_callback=track_motion
)
compensate_arr always returns arrays in memory and ignores options.output_format.
Use compensate_recording(..., OFOptions(output_format=OutputFormat.NULL)) for true callback-only/no-storage processing.
- pyflowreg.motion_correction.compensate_arr(c1, c_ref, options=None, progress_callback=None, *, flow_backend=None, backend_params=None, get_displacement=None, get_displacement_factory=None, w_callback=None, registered_callback=None, registration_config=None)[source]#
Process arrays in memory matching MATLAB compensate_inplace functionality.
This function provides the same motion compensation as compensate_recording but operates on in-memory arrays instead of files. It uses the same batching and flow initialization logic to ensure algorithmic consistency.
- Parameters:
c1 – Input array to register, shape (T,H,W,C), (H,W,C), or (T,H,W) For single-channel 3D arrays, assumes (T,H,W) if T > 4, else (H,W,C)
c_ref – Reference frame, shape (H,W,C) or (H,W)
options – OF_options configuration. If None, uses defaults.
progress_callback – Optional callback function that receives (current_frame, total_frames) for progress updates. Note: For multiprocessing executor, updates are batch-wise.
flow_backend – Backend name override (e.g., ‘diso’, ‘flowreg’)
backend_params – Backend-specific parameters override
get_displacement – Direct displacement callable override
get_displacement_factory – Factory function override for creating displacement callable
w_callback – Optional callback for displacement field batches, receives (w_batch, start_idx, end_idx)
registered_callback – Optional callback for registered frame batches, receives (batch, start_idx, end_idx)
registration_config – Optional execution config (executor type/worker count).
- Return type:
- Returns:
Tuple of – - c_reg: Registered array with same shape as input - w: Displacement fields, shape (T,H,W,2) with [u,v] components
Example
>>> import numpy as np >>> from pyflowreg.motion_correction import compensate_arr >>> >>> # Create test data >>> video = np.random.rand(100, 256, 256, 2) # 100 frames, 2 channels >>> reference = np.mean(video[:10], axis=0) >>> >>> # Register with progress callback >>> def progress(current, total): ... print(f"Progress: {current}/{total} ({100*current/total:.1f}%)") >>> registered, flow = compensate_arr(video, reference, progress_callback=progress)
File-Based Workflow#
File-based processing with callback support through BatchMotionCorrector:
from pyflowreg.motion_correction.compensate_recording import BatchMotionCorrector
from pyflowreg.motion_correction.OF_options import OFOptions, OutputFormat
class ProcessingMonitor:
def __init__(self):
self.batch_count = 0
def on_batch_complete(self, batch, start_idx, end_idx):
self.batch_count += 1
print(f"Batch {self.batch_count} complete: frames {start_idx}-{end_idx}")
monitor = ProcessingMonitor()
options = OFOptions(
input_file="recording.h5",
output_format=OutputFormat.HDF5,
output_path="results/",
save_w=True
)
compensator = BatchMotionCorrector(options)
compensator.register_registered_callback(monitor.on_batch_complete)
compensator.run()
- pyflowreg.motion_correction.compensate_recording(options, reference_frame=None, config=None)[source]#
Main entry point matching MATLAB API.
- Parameters:
options – OF_options object with parameters
reference_frame – Optional pre-computed reference
config – Optional registration configuration
- Return type:
- Returns:
The reference frame used
- class pyflowreg.motion_correction.compensate_recording.RegistrationConfig(n_jobs=-1, verbose=False, parallelization=None)[source]#
Bases:
objectSimplified configuration.
- class pyflowreg.motion_correction.compensate_recording.BatchMotionCorrector(options, config=None)[source]#
Bases:
objectMain registration pipeline.
- register_progress_callback(callback)[source]#
Register a progress callback function.
- Parameters:
callback – Function that receives (current_frame, total_frames) as arguments. For multiprocessing, updates are batch-wise rather than frame-wise.
- Return type:
- register_w_callback(callback)[source]#
Register a callback for displacement field batches.
- Parameters:
callback – Function receiving (w_batch, batch_start_idx, batch_end_idx) where w_batch has shape (T, H, W, 2) containing [u,v] components. batch_start_idx and batch_end_idx indicate the frame indices in the original video.
- Return type:
- register_registered_callback(callback)[source]#
Register a callback for registered/compensated frame batches.
- Parameters:
callback – Function receiving (registered_batch, batch_start_idx, batch_end_idx) where registered_batch has shape (T, H, W, C). batch_start_idx and batch_end_idx indicate the frame indices in the original video.
- Return type:
Real-Time Processing#
- class pyflowreg.motion_correction.FlowRegLive(options=None, reference_buffer_size=50, reference_update_interval=20, reference_update_weight=0.2, truncate=4.0, **kwargs)[source]#
Bases:
objectReal-time motion compensation with adaptive reference updating.
Optimized for speed using: - Fast quality setting for optical flow - Half kernel temporal filtering - Circular buffer for temporal history - Weighted reference updates every 20 frames
Initialize FlowRegLive for real-time motion compensation.
- Parameters:
options – OFOptions for configuration. If None, uses fast defaults.
reference_buffer_size – Size of buffer for reference initialization
reference_update_interval – Update reference every N frames
reference_update_weight – Weight for mixing new frames into reference
truncate – Truncate filter at this many standard deviations
**kwargs – Additional parameters to override OFOptions
- __init__(options=None, reference_buffer_size=50, reference_update_interval=20, reference_update_weight=0.2, truncate=4.0, **kwargs)[source]#
Initialize FlowRegLive for real-time motion compensation.
- Parameters:
options – OFOptions for configuration. If None, uses fast defaults.
reference_buffer_size – Size of buffer for reference initialization
reference_update_interval – Update reference every N frames
reference_update_weight – Weight for mixing new frames into reference
truncate – Truncate filter at this many standard deviations
**kwargs – Additional parameters to override OFOptions
- set_reference(frames=None)[source]#
Initialize reference from frames or buffer.
- Parameters:
frames – Optional array of frames (T,H,W,C). If None, uses buffer.
Configuration#
OutputFormat Enum#
- class pyflowreg.motion_correction.OF_options.OutputFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
-
- TIFF = 'TIFF'#
- HDF5 = 'HDF5'#
- MAT = 'MAT'#
- MULTIFILE_TIFF = 'MULTIFILE_TIFF'#
- MULTIFILE_MAT = 'MULTIFILE_MAT'#
- MULTIFILE_HDF5 = 'MULTIFILE_HDF5'#
- CAIMAN_HDF5 = 'CAIMAN_HDF5'#
- BEGONIA = 'BEGONIA'#
- SUITE2P_TIFF = 'SUITE2P_TIFF'#
- ARRAY = 'ARRAY'#
- NULL = 'NULL'#
Key output formats:
OutputFormat.NULL- Discards output, ideal for callback-only processingOutputFormat.ARRAY- Returns in-memory arrays (default forcompensate_arr)OutputFormat.HDF5- Efficient storage for large datasetsOutputFormat.TIFF- Standard microscopy formatOutputFormat.MAT- MATLAB compatibility
OFOptions Class#
- class pyflowreg.motion_correction.OFOptions(**data)[source]#
Bases:
BaseModelPython port of MATLAB OF_options class.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- input_file: Optional[Union[str, Path, np.ndarray, VideoReader]]#
- output_path: Path#
- output_format: OutputFormat#
- output_file_name: Optional[str]#
- channel_idx: Optional[List[int]]#
- alpha: Union[float, Tuple[float, float]]#
- weight: Union[List[float], np.ndarray]#
- levels: StrictInt#
- min_level: StrictInt#
- quality_setting: QualitySetting#
- eta: float#
- update_lag: StrictInt#
- iterations: StrictInt#
- a_smooth: float#
- a_data: float#
- gnc_schedule: Optional[Tuple[float, ...]]#
- warping_steps: Optional[StrictInt]#
- sigma: Any#
- bin_size: StrictInt#
- buffer_size: StrictInt#
- reference_frames: Union[List[int], str, Path, np.ndarray]#
- update_reference: bool#
- n_references: StrictInt#
- min_frames_per_reference: StrictInt#
- verbose: bool#
- save_meta_info: bool#
- save_w: bool#
- save_valid_mask: bool#
- save_valid_idx: bool#
- output_typename: Optional[str]#
- channel_normalization: ChannelNormalization#
- interpolation_method: InterpolationMethod#
- cc_initialization: bool#
- cc_hw: Union[int, Tuple[int, int]]#
- cc_up: int#
- update_initialization_w: bool#
- naming_convention: NamingConvention#
- constancy_assumption: ConstancyAssumption#
- flow_backend: str#
- backend_params: Dict[str, Any]#
- preproc_funct: Optional[Callable]#
- get_displacement_impl: Optional[Callable]#
- get_displacement_factory: Optional[Callable[..., Callable]]#
- classmethod normalize_weight(v)[source]#
Normalize weight values to sum to 1.
Accepts: - List [1, 2]: normalized to [0.33, 0.67] - 1D numpy array: normalized and converted to list - 2D numpy array (H, W): spatial weight map for single channel - 3D numpy array (H, W, C): spatial weight maps from preregistration
- classmethod normalize_gnc_schedule(v)[source]#
Normalize and validate an optional GNC stage schedule.
- classmethod normalize_constancy_assumption(v)[source]#
Normalize constancy assumption aliases to serialized option values.
- get_video_reader()[source]#
Get or create video reader (mirrors MATLAB get_video_file_reader).
- Return type:
- get_video_writer()[source]#
Get or create video writer (mirrors MATLAB get_video_writer).
- Return type:
- get_reference_frame(video_reader=None, registration_config=None)[source]#
Get reference frame(s), with optional preregistration.
- save_options(filepath=None)[source]#
Save options to JSON with MATLAB-compatible header.
- Return type:
- classmethod load_options(filepath)[source]#
Load options from JSON (MATLAB or Python format).
- Return type:
- resolve_get_displacement()[source]#
Resolve the displacement computation function based on configuration.
Priority order: 1. get_displacement_impl (direct callable) 2. get_displacement_factory with backend_params 3. flow_backend from registry with backend_params
- Return type:
- Returns:
Callable for computing optical flow
Parallelization#
PyFlowReg provides multiple parallelization backends for batch processing.
Parallelization executors for motion correction batch processing.
- class pyflowreg.motion_correction.parallelization.BaseExecutor(n_workers=None)[source]#
Bases:
ABCAbstract base class for parallelization executors.
All executors must implement the process_batch method which takes: - Batch of frames to process - Preprocessed batch - Reference frames (raw and preprocessed) - Initial flow field - Options and parameters
And returns: - Registered frames - Computed flow fields
Initialize the executor.
- Parameters:
n_workers – Number of workers to use. If None, uses RuntimeContext default.
- __init__(n_workers=None)[source]#
Initialize the executor.
- Parameters:
n_workers – Number of workers to use. If None, uses RuntimeContext default.
- abstract process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process a batch of frames for motion correction.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Function to compute optical flow
imregister_func – Function to apply flow field for registration
interpolation_method – Interpolation method for registration
progress_callback – Optional callback for per-frame progress (frames_completed)
**kwargs – Additional parameters
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields) where – registered_frames: shape (T, H, W, C) flow_fields: shape (T, H, W, 2)
- class pyflowreg.motion_correction.parallelization.SequentialExecutor(n_workers=1)[source]#
Bases:
BaseExecutorSequential executor that processes frames one at a time.
This is the simplest executor and serves as a reference implementation. It’s also the most memory-efficient as it only processes one frame at a time.
Initialize sequential executor.
- Parameters:
n_workers – Ignored for sequential executor, always uses 1.
- __init__(n_workers=1)[source]#
Initialize sequential executor.
- Parameters:
n_workers – Ignored for sequential executor, always uses 1.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames sequentially.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Function to compute optical flow
imregister_func – Function to apply flow field for registration
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)
- class pyflowreg.motion_correction.parallelization.ThreadingExecutor(n_workers=None)[source]#
Bases:
BaseExecutorThreading executor that processes frames in parallel using threads.
Good for I/O-bound operations or when the GIL is released (e.g., NumPy operations). Less efficient than multiprocessing for pure Python CPU-bound operations.
Initialize threading executor.
- Parameters:
n_workers – Number of worker threads. If None, uses RuntimeContext default.
- __init__(n_workers=None)[source]#
Initialize threading executor.
- Parameters:
n_workers – Number of worker threads. If None, uses RuntimeContext default.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames in parallel using threads.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Function to compute optical flow
imregister_func – Function to apply flow field for registration
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)
- class pyflowreg.motion_correction.parallelization.MultiprocessingExecutor(n_workers=None)[source]#
Bases:
BaseExecutorMultiprocessing executor using shared memory for zero-copy data sharing.
This is the most efficient executor for CPU-bound operations as it: 1. Uses multiple CPU cores in parallel 2. Avoids data serialization overhead with shared memory 3. Bypasses the GIL completely
Initialize multiprocessing executor.
- Parameters:
n_workers – Number of worker processes. If None, uses RuntimeContext default.
- __init__(n_workers=None)[source]#
Initialize multiprocessing executor.
- Parameters:
n_workers – Number of worker processes. If None, uses RuntimeContext default.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames in parallel using multiprocessing with shared memory.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Ignored (functions imported in worker)
imregister_func – Ignored (functions imported in worker)
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)
Sequential Executor#
- class pyflowreg.motion_correction.parallelization.SequentialExecutor(n_workers=1)[source]#
Bases:
BaseExecutorSequential executor that processes frames one at a time.
This is the simplest executor and serves as a reference implementation. It’s also the most memory-efficient as it only processes one frame at a time.
Initialize sequential executor.
- Parameters:
n_workers – Ignored for sequential executor, always uses 1.
- __init__(n_workers=1)[source]#
Initialize sequential executor.
- Parameters:
n_workers – Ignored for sequential executor, always uses 1.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames sequentially.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Function to compute optical flow
imregister_func – Function to apply flow field for registration
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)
Threading Executor#
- class pyflowreg.motion_correction.parallelization.ThreadingExecutor(n_workers=None)[source]#
Bases:
BaseExecutorThreading executor that processes frames in parallel using threads.
Good for I/O-bound operations or when the GIL is released (e.g., NumPy operations). Less efficient than multiprocessing for pure Python CPU-bound operations.
Initialize threading executor.
- Parameters:
n_workers – Number of worker threads. If None, uses RuntimeContext default.
- __init__(n_workers=None)[source]#
Initialize threading executor.
- Parameters:
n_workers – Number of worker threads. If None, uses RuntimeContext default.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames in parallel using threads.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Function to compute optical flow
imregister_func – Function to apply flow field for registration
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)
Multiprocessing Executor#
- class pyflowreg.motion_correction.parallelization.MultiprocessingExecutor(n_workers=None)[source]#
Bases:
BaseExecutorMultiprocessing executor using shared memory for zero-copy data sharing.
This is the most efficient executor for CPU-bound operations as it: 1. Uses multiple CPU cores in parallel 2. Avoids data serialization overhead with shared memory 3. Bypasses the GIL completely
Initialize multiprocessing executor.
- Parameters:
n_workers – Number of worker processes. If None, uses RuntimeContext default.
- __init__(n_workers=None)[source]#
Initialize multiprocessing executor.
- Parameters:
n_workers – Number of worker processes. If None, uses RuntimeContext default.
- process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#
Process frames in parallel using multiprocessing with shared memory.
- Parameters:
batch – Raw frames to register, shape (T, H, W, C)
batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)
reference_raw – Raw reference frame, shape (H, W, C)
reference_proc – Preprocessed reference frame, shape (H, W, C)
w_init – Initial flow field, shape (H, W, 2)
get_displacement_func – Ignored (functions imported in worker)
imregister_func – Ignored (functions imported in worker)
interpolation_method – Interpolation method for registration
**kwargs – Additional parameters including ‘flow_params’ dict
- Return type:
- Returns:
Tuple of (registered_frames, flow_fields)