Motion Correction#

High-level APIs for applying motion correction and motion analysis to microscopy videos.

Core Concepts#

Output Formats#

PyFlowReg supports flexible output handling through the output_format parameter for file-based workflows (compensate_recording):

  • OutputFormat.ARRAY - Accumulate in memory, return as array

  • OutputFormat.NULL - Discard output, callback-only processing (no storage overhead)

  • OutputFormat.HDF5 - HDF5 file storage

  • OutputFormat.TIFF - TIFF stack output

  • OutputFormat.MAT - MATLAB compatible files

Callback System#

All motion correction functions support real-time data access through callbacks:

Callback

Signature

Description

progress_callback

(current: int, total: int) -> None

Progress updates

w_callback

(w_batch: ndarray, start_idx: int, end_idx: int) -> None

Access displacement fields during processing

registered_callback

(batch: ndarray, start_idx: int, end_idx: int) -> None

Access corrected frames during processing

Callbacks enable:

  • Real-time visualization without waiting for completion

  • Motion tracking and analysis during processing

  • Memory-efficient file-based processing with OutputFormat.NULL

  • Integration with visualization tools like napari

Array-Based Workflow#

The primary function for in-memory motion correction with callback support:

compensate_arr(
    c1: np.ndarray,                    # Video to correct
    c_ref: np.ndarray,                  # Reference frame
    options: OFOptions = None,          # Configuration
    progress_callback: Callable = None, # Progress updates
    w_callback: Callable = None,        # Displacement access
    registered_callback: Callable = None # Corrected frame access
) -> Tuple[np.ndarray, np.ndarray]     # Returns (registered, w)

Example with Callbacks#

import numpy as np
from pyflowreg.motion_correction import compensate_arr
from pyflowreg.motion_correction.OF_options import OFOptions

def track_motion(w_batch, start_idx, end_idx):
    """Process displacement fields as they're computed."""
    for i in range(w_batch.shape[0]):
        magnitude = np.sqrt(w_batch[i, :, :, 0]**2 + w_batch[i, :, :, 1]**2)
        print(f"Frame {start_idx + i}: mean motion = {np.mean(magnitude):.2f}")

# Configure array workflow
options = OFOptions(
    quality_setting="balanced",
    buffer_size=20                     # Process 20 frames at a time
)

# Run with callbacks
registered, w = compensate_arr(
    video, reference, options,
    w_callback=track_motion
)

compensate_arr always returns arrays in memory and ignores options.output_format. Use compensate_recording(..., OFOptions(output_format=OutputFormat.NULL)) for true callback-only/no-storage processing.

pyflowreg.motion_correction.compensate_arr(c1, c_ref, options=None, progress_callback=None, *, flow_backend=None, backend_params=None, get_displacement=None, get_displacement_factory=None, w_callback=None, registered_callback=None, registration_config=None)[source]#

Process arrays in memory matching MATLAB compensate_inplace functionality.

This function provides the same motion compensation as compensate_recording but operates on in-memory arrays instead of files. It uses the same batching and flow initialization logic to ensure algorithmic consistency.

Parameters:
  • c1 – Input array to register, shape (T,H,W,C), (H,W,C), or (T,H,W) For single-channel 3D arrays, assumes (T,H,W) if T > 4, else (H,W,C)

  • c_ref – Reference frame, shape (H,W,C) or (H,W)

  • options – OF_options configuration. If None, uses defaults.

  • progress_callback – Optional callback function that receives (current_frame, total_frames) for progress updates. Note: For multiprocessing executor, updates are batch-wise.

  • flow_backend – Backend name override (e.g., ‘diso’, ‘flowreg’)

  • backend_params – Backend-specific parameters override

  • get_displacement – Direct displacement callable override

  • get_displacement_factory – Factory function override for creating displacement callable

  • w_callback – Optional callback for displacement field batches, receives (w_batch, start_idx, end_idx)

  • registered_callback – Optional callback for registered frame batches, receives (batch, start_idx, end_idx)

  • registration_config – Optional execution config (executor type/worker count).

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of – - c_reg: Registered array with same shape as input - w: Displacement fields, shape (T,H,W,2) with [u,v] components

Example

>>> import numpy as np
>>> from pyflowreg.motion_correction import compensate_arr
>>>
>>> # Create test data
>>> video = np.random.rand(100, 256, 256, 2)  # 100 frames, 2 channels
>>> reference = np.mean(video[:10], axis=0)
>>>
>>> # Register with progress callback
>>> def progress(current, total):
...     print(f"Progress: {current}/{total} ({100*current/total:.1f}%)")
>>> registered, flow = compensate_arr(video, reference, progress_callback=progress)

File-Based Workflow#

File-based processing with callback support through BatchMotionCorrector:

from pyflowreg.motion_correction.compensate_recording import BatchMotionCorrector
from pyflowreg.motion_correction.OF_options import OFOptions, OutputFormat

class ProcessingMonitor:
    def __init__(self):
        self.batch_count = 0

    def on_batch_complete(self, batch, start_idx, end_idx):
        self.batch_count += 1
        print(f"Batch {self.batch_count} complete: frames {start_idx}-{end_idx}")

monitor = ProcessingMonitor()

options = OFOptions(
    input_file="recording.h5",
    output_format=OutputFormat.HDF5,
    output_path="results/",
    save_w=True
)

compensator = BatchMotionCorrector(options)
compensator.register_registered_callback(monitor.on_batch_complete)
compensator.run()
pyflowreg.motion_correction.compensate_recording(options, reference_frame=None, config=None)[source]#

Main entry point matching MATLAB API.

Parameters:
  • options – OF_options object with parameters

  • reference_frame – Optional pre-computed reference

  • config – Optional registration configuration

Return type:

ndarray

Returns:

The reference frame used

class pyflowreg.motion_correction.compensate_recording.RegistrationConfig(n_jobs=-1, verbose=False, parallelization=None)[source]#

Bases: object

Simplified configuration.

n_jobs: int = -1#
verbose: bool = False#
parallelization: Optional[str] = None#
class pyflowreg.motion_correction.compensate_recording.BatchMotionCorrector(options, config=None)[source]#

Bases: object

Main registration pipeline.

register_progress_callback(callback)[source]#

Register a progress callback function.

Parameters:

callback – Function that receives (current_frame, total_frames) as arguments. For multiprocessing, updates are batch-wise rather than frame-wise.

Return type:

None

register_w_callback(callback)[source]#

Register a callback for displacement field batches.

Parameters:

callback – Function receiving (w_batch, batch_start_idx, batch_end_idx) where w_batch has shape (T, H, W, 2) containing [u,v] components. batch_start_idx and batch_end_idx indicate the frame indices in the original video.

Return type:

None

register_registered_callback(callback)[source]#

Register a callback for registered/compensated frame batches.

Parameters:

callback – Function receiving (registered_batch, batch_start_idx, batch_end_idx) where registered_batch has shape (T, H, W, C). batch_start_idx and batch_end_idx indicate the frame indices in the original video.

Return type:

None

run(reference_frame=None)[source]#

Run complete registration pipeline.

Return type:

ndarray

Real-Time Processing#

class pyflowreg.motion_correction.FlowRegLive(options=None, reference_buffer_size=50, reference_update_interval=20, reference_update_weight=0.2, truncate=4.0, **kwargs)[source]#

Bases: object

Real-time motion compensation with adaptive reference updating.

Optimized for speed using: - Fast quality setting for optical flow - Half kernel temporal filtering - Circular buffer for temporal history - Weighted reference updates every 20 frames

Initialize FlowRegLive for real-time motion compensation.

Parameters:
  • options – OFOptions for configuration. If None, uses fast defaults.

  • reference_buffer_size – Size of buffer for reference initialization

  • reference_update_interval – Update reference every N frames

  • reference_update_weight – Weight for mixing new frames into reference

  • truncate – Truncate filter at this many standard deviations

  • **kwargs – Additional parameters to override OFOptions

__init__(options=None, reference_buffer_size=50, reference_update_interval=20, reference_update_weight=0.2, truncate=4.0, **kwargs)[source]#

Initialize FlowRegLive for real-time motion compensation.

Parameters:
  • options – OFOptions for configuration. If None, uses fast defaults.

  • reference_buffer_size – Size of buffer for reference initialization

  • reference_update_interval – Update reference every N frames

  • reference_update_weight – Weight for mixing new frames into reference

  • truncate – Truncate filter at this many standard deviations

  • **kwargs – Additional parameters to override OFOptions

set_reference(frames=None)[source]#

Initialize reference from frames or buffer.

Parameters:

frames – Optional array of frames (T,H,W,C). If None, uses buffer.

register_frames(frames)[source]#

Register multiple frames in sequence.

Parameters:

frames – Array of frames (T,H,W,C)

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

reset_reference(new_reference)[source]#

Set new reference and reset flow.

Parameters:

new_reference – New reference frame (H,W,C)

get_current_flow()[source]#

Get current flow field state.

Return type:

Optional[ndarray]

set_flow_init(w_init)[source]#

Set flow field initialization for next frame.

Configuration#

OutputFormat Enum#

class pyflowreg.motion_correction.OF_options.OutputFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

TIFF = 'TIFF'#
HDF5 = 'HDF5'#
MAT = 'MAT'#
MULTIFILE_TIFF = 'MULTIFILE_TIFF'#
MULTIFILE_MAT = 'MULTIFILE_MAT'#
MULTIFILE_HDF5 = 'MULTIFILE_HDF5'#
CAIMAN_HDF5 = 'CAIMAN_HDF5'#
BEGONIA = 'BEGONIA'#
SUITE2P_TIFF = 'SUITE2P_TIFF'#
ARRAY = 'ARRAY'#
NULL = 'NULL'#

Key output formats:

  • OutputFormat.NULL - Discards output, ideal for callback-only processing

  • OutputFormat.ARRAY - Returns in-memory arrays (default for compensate_arr)

  • OutputFormat.HDF5 - Efficient storage for large datasets

  • OutputFormat.TIFF - Standard microscopy format

  • OutputFormat.MAT - MATLAB compatibility

OFOptions Class#

class pyflowreg.motion_correction.OFOptions(**data)[source]#

Bases: BaseModel

Python port of MATLAB OF_options class.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

input_file: Optional[Union[str, Path, np.ndarray, VideoReader]]#
output_path: Path#
output_format: OutputFormat#
output_file_name: Optional[str]#
channel_idx: Optional[List[int]]#
alpha: Union[float, Tuple[float, float]]#
weight: Union[List[float], np.ndarray]#
levels: StrictInt#
min_level: StrictInt#
quality_setting: QualitySetting#
eta: float#
update_lag: StrictInt#
iterations: StrictInt#
a_smooth: float#
a_data: float#
gnc_schedule: Optional[Tuple[float, ...]]#
warping_steps: Optional[StrictInt]#
sigma: Any#
bin_size: StrictInt#
buffer_size: StrictInt#
reference_frames: Union[List[int], str, Path, np.ndarray]#
update_reference: bool#
n_references: StrictInt#
min_frames_per_reference: StrictInt#
verbose: bool#
save_meta_info: bool#
save_w: bool#
save_valid_mask: bool#
save_valid_idx: bool#
output_typename: Optional[str]#
channel_normalization: ChannelNormalization#
interpolation_method: InterpolationMethod#
cc_initialization: bool#
cc_hw: Union[int, Tuple[int, int]]#
cc_up: int#
update_initialization_w: bool#
naming_convention: NamingConvention#
constancy_assumption: ConstancyAssumption#
flow_backend: str#
backend_params: Dict[str, Any]#
preproc_funct: Optional[Callable]#
get_displacement_impl: Optional[Callable]#
get_displacement_factory: Optional[Callable[..., Callable]]#
classmethod normalize_alpha(v)[source]#

Normalize alpha to always be a 2-tuple of positive floats.

classmethod normalize_weight(v)[source]#

Normalize weight values to sum to 1.

Accepts: - List [1, 2]: normalized to [0.33, 0.67] - 1D numpy array: normalized and converted to list - 2D numpy array (H, W): spatial weight map for single channel - 3D numpy array (H, W, C): spatial weight maps from preregistration

classmethod normalize_sigma(v)[source]#

Normalize sigma to correct shape.

classmethod normalize_gnc_schedule(v)[source]#

Normalize and validate an optional GNC stage schedule.

classmethod normalize_constancy_assumption(v)[source]#

Normalize constancy assumption aliases to serialized option values.

validate_and_normalize()[source]#

Normalize fields and maintain MATLAB parity.

Return type:

OFOptions

property effective_min_level: int#

Get effective min_level based on quality setting.

get_sigma_at(i)[source]#

Get sigma for channel i (0-indexed).

Return type:

ndarray

get_weight_at(i, n_channels)[source]#

Get weight for channel i (0-indexed).

Return type:

Union[float, ndarray]

copy()[source]#

Create a deep copy (MATLAB copyable interface).

Return type:

OFOptions

get_video_reader()[source]#

Get or create video reader (mirrors MATLAB get_video_file_reader).

Return type:

VideoReader

get_video_writer()[source]#

Get or create video writer (mirrors MATLAB get_video_writer).

Return type:

VideoWriter

get_reference_frame(video_reader=None, registration_config=None)[source]#

Get reference frame(s), with optional preregistration.

Return type:

Union[ndarray, List[ndarray]]

save_options(filepath=None)[source]#

Save options to JSON with MATLAB-compatible header.

Return type:

None

classmethod load_options(filepath)[source]#

Load options from JSON (MATLAB or Python format).

Return type:

OFOptions

resolve_get_displacement()[source]#

Resolve the displacement computation function based on configuration.

Priority order: 1. get_displacement_impl (direct callable) 2. get_displacement_factory with backend_params 3. flow_backend from registry with backend_params

Return type:

Callable

Returns:

Callable for computing optical flow

to_dict()[source]#

Get parameters dict for optical flow functions.

Return type:

dict

model_post_init(context, /)#

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

Return type:

None

Parallelization#

PyFlowReg provides multiple parallelization backends for batch processing.

Parallelization executors for motion correction batch processing.

class pyflowreg.motion_correction.parallelization.BaseExecutor(n_workers=None)[source]#

Bases: ABC

Abstract base class for parallelization executors.

All executors must implement the process_batch method which takes: - Batch of frames to process - Preprocessed batch - Reference frames (raw and preprocessed) - Initial flow field - Options and parameters

And returns: - Registered frames - Computed flow fields

Initialize the executor.

Parameters:

n_workers – Number of workers to use. If None, uses RuntimeContext default.

__init__(n_workers=None)[source]#

Initialize the executor.

Parameters:

n_workers – Number of workers to use. If None, uses RuntimeContext default.

abstract process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process a batch of frames for motion correction.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Function to compute optical flow

  • imregister_func – Function to apply flow field for registration

  • interpolation_method – Interpolation method for registration

  • progress_callback – Optional callback for per-frame progress (frames_completed)

  • **kwargs – Additional parameters

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields) where – registered_frames: shape (T, H, W, C) flow_fields: shape (T, H, W, 2)

setup()[source]#

Setup method called before processing. Override in subclasses if needed.

cleanup()[source]#

Cleanup method called after processing. Override in subclasses if needed.

get_info()[source]#

Get information about this executor.

Return type:

Dict[str, Any]

Returns:

Dictionary with executor information

class pyflowreg.motion_correction.parallelization.SequentialExecutor(n_workers=1)[source]#

Bases: BaseExecutor

Sequential executor that processes frames one at a time.

This is the simplest executor and serves as a reference implementation. It’s also the most memory-efficient as it only processes one frame at a time.

Initialize sequential executor.

Parameters:

n_workers – Ignored for sequential executor, always uses 1.

__init__(n_workers=1)[source]#

Initialize sequential executor.

Parameters:

n_workers – Ignored for sequential executor, always uses 1.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames sequentially.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Function to compute optical flow

  • imregister_func – Function to apply flow field for registration

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict

class pyflowreg.motion_correction.parallelization.ThreadingExecutor(n_workers=None)[source]#

Bases: BaseExecutor

Threading executor that processes frames in parallel using threads.

Good for I/O-bound operations or when the GIL is released (e.g., NumPy operations). Less efficient than multiprocessing for pure Python CPU-bound operations.

Initialize threading executor.

Parameters:

n_workers – Number of worker threads. If None, uses RuntimeContext default.

__init__(n_workers=None)[source]#

Initialize threading executor.

Parameters:

n_workers – Number of worker threads. If None, uses RuntimeContext default.

setup()[source]#

Create the thread pool executor.

cleanup()[source]#

Shutdown the thread pool executor.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames in parallel using threads.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Function to compute optical flow

  • imregister_func – Function to apply flow field for registration

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict

class pyflowreg.motion_correction.parallelization.MultiprocessingExecutor(n_workers=None)[source]#

Bases: BaseExecutor

Multiprocessing executor using shared memory for zero-copy data sharing.

This is the most efficient executor for CPU-bound operations as it: 1. Uses multiple CPU cores in parallel 2. Avoids data serialization overhead with shared memory 3. Bypasses the GIL completely

Initialize multiprocessing executor.

Parameters:

n_workers – Number of worker processes. If None, uses RuntimeContext default.

__init__(n_workers=None)[source]#

Initialize multiprocessing executor.

Parameters:

n_workers – Number of worker processes. If None, uses RuntimeContext default.

setup()[source]#

Create the process pool executor.

cleanup()[source]#

Cleanup shared memory and shutdown executor.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames in parallel using multiprocessing with shared memory.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Ignored (functions imported in worker)

  • imregister_func – Ignored (functions imported in worker)

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict

Sequential Executor#

class pyflowreg.motion_correction.parallelization.SequentialExecutor(n_workers=1)[source]#

Bases: BaseExecutor

Sequential executor that processes frames one at a time.

This is the simplest executor and serves as a reference implementation. It’s also the most memory-efficient as it only processes one frame at a time.

Initialize sequential executor.

Parameters:

n_workers – Ignored for sequential executor, always uses 1.

__init__(n_workers=1)[source]#

Initialize sequential executor.

Parameters:

n_workers – Ignored for sequential executor, always uses 1.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames sequentially.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Function to compute optical flow

  • imregister_func – Function to apply flow field for registration

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict

Threading Executor#

class pyflowreg.motion_correction.parallelization.ThreadingExecutor(n_workers=None)[source]#

Bases: BaseExecutor

Threading executor that processes frames in parallel using threads.

Good for I/O-bound operations or when the GIL is released (e.g., NumPy operations). Less efficient than multiprocessing for pure Python CPU-bound operations.

Initialize threading executor.

Parameters:

n_workers – Number of worker threads. If None, uses RuntimeContext default.

__init__(n_workers=None)[source]#

Initialize threading executor.

Parameters:

n_workers – Number of worker threads. If None, uses RuntimeContext default.

setup()[source]#

Create the thread pool executor.

cleanup()[source]#

Shutdown the thread pool executor.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames in parallel using threads.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Function to compute optical flow

  • imregister_func – Function to apply flow field for registration

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict

Multiprocessing Executor#

class pyflowreg.motion_correction.parallelization.MultiprocessingExecutor(n_workers=None)[source]#

Bases: BaseExecutor

Multiprocessing executor using shared memory for zero-copy data sharing.

This is the most efficient executor for CPU-bound operations as it: 1. Uses multiple CPU cores in parallel 2. Avoids data serialization overhead with shared memory 3. Bypasses the GIL completely

Initialize multiprocessing executor.

Parameters:

n_workers – Number of worker processes. If None, uses RuntimeContext default.

__init__(n_workers=None)[source]#

Initialize multiprocessing executor.

Parameters:

n_workers – Number of worker processes. If None, uses RuntimeContext default.

setup()[source]#

Create the process pool executor.

cleanup()[source]#

Cleanup shared memory and shutdown executor.

process_batch(batch, batch_proc, reference_raw, reference_proc, w_init, get_displacement_func, imregister_func, interpolation_method='cubic', progress_callback=None, **kwargs)[source]#

Process frames in parallel using multiprocessing with shared memory.

Parameters:
  • batch – Raw frames to register, shape (T, H, W, C)

  • batch_proc – Preprocessed frames for flow computation, shape (T, H, W, C)

  • reference_raw – Raw reference frame, shape (H, W, C)

  • reference_proc – Preprocessed reference frame, shape (H, W, C)

  • w_init – Initial flow field, shape (H, W, 2)

  • get_displacement_func – Ignored (functions imported in worker)

  • imregister_func – Ignored (functions imported in worker)

  • interpolation_method – Interpolation method for registration

  • **kwargs – Additional parameters including ‘flow_params’ dict

Return type:

Tuple[ndarray, ndarray]

Returns:

Tuple of (registered_frames, flow_fields)

get_info()[source]#

Get information about this executor.

Return type:

dict