Source code for pyflowreg.util.io._arr

"""
Private module for in-memory array I/O wrappers.
Allows numpy arrays to be processed through the same pipeline as video files.
"""

from typing import Union, List, Optional
import numpy as np

from ._base import VideoReader, VideoWriter


[docs] class ArrayReader(VideoReader): """ Wraps numpy arrays to provide VideoReader interface. Enables batch processing and binning for in-memory arrays. """
[docs] def __init__( self, array: np.ndarray, buffer_size: int = 100, bin_size: int = 1, inplace: bool = False, ): """ Initialize array reader. Args: array: Input array with shape (T,H,W,C) or (H,W,C) or (T,H,W) buffer_size: Number of frames per batch bin_size: Temporal binning factor inplace: If True, return views for memory efficiency (no copy). If False (default), return copies for safety with multiprocessing. """ super().__init__() # Handle different input shapes - store reference, don't copy the whole array if array.ndim == 2: # (H,W) self._array = array[np.newaxis, :, :, np.newaxis] # -> (1,H,W,1) elif array.ndim == 3: # Could be (T,H,W) or (H,W,C) # Assume (H,W,C) if last dimension is small (<=4 channels typical) if array.shape[-1] <= 4: self._array = array[np.newaxis, ...] # (H,W,C) -> (1,H,W,C) else: self._array = array[..., np.newaxis] # (T,H,W) -> (T,H,W,1) elif array.ndim == 4: self._array = array # Already (T,H,W,C) else: raise ValueError(f"Array must be 2D, 3D or 4D, got {array.ndim}D") self.buffer_size = buffer_size self.bin_size = bin_size self._inplace = inplace # Initialize immediately self._initialize()
def _initialize(self): """Set VideoReader properties from array shape.""" self.frame_count, self.height, self.width, self.n_channels = self._array.shape self.dtype = self._array.dtype self._initialized = True def _read_raw_frames(self, frame_indices: Union[slice, List[int]]) -> np.ndarray: """ Read frames from array. Args: frame_indices: Slice or list of frame indices Returns: Array with shape (T,H,W,C), copy by default unless inplace=True """ if isinstance(frame_indices, list): if len(frame_indices) == 0: return np.empty( (0, self.height, self.width, self.n_channels), dtype=self.dtype ) result = self._array[frame_indices] else: # slice result = self._array[frame_indices] # Return copy by default for safety with multiprocessing # Only return view if explicitly requested with inplace=True return result if self._inplace else result.copy()
[docs] def close(self): """No-op for array reader.""" pass
[docs] class ArrayWriter(VideoWriter): """ Accumulates frames in memory instead of writing to file. Provides VideoWriter interface for array output. """
[docs] def __init__(self): """Initialize array writer.""" super().__init__() self._vid = [] # Accumulated video frames
[docs] def init(self, first_frame_batch: np.ndarray): """ Initialize writer from first batch following base class pattern. Args: first_frame_batch: First batch with shape (T,H,W,C) or (H,W,C) or even (H,W) """ shape = first_frame_batch.shape if len(shape) == 2: # Single channel single frame (H,W) self.height = shape[0] self.width = shape[1] self.n_channels = 1 elif len(shape) == 3: # Single frame (H,W,C) self.height = shape[0] self.width = shape[1] self.n_channels = shape[2] if len(shape) > 2 else 1 elif len(shape) == 4: # Batch (T,H,W,C) - use first frame dimensions self.height = shape[1] self.width = shape[2] self.n_channels = shape[3] else: raise ValueError( f"Expected 2D, 3D or 4D array, got {first_frame_batch.ndim}D" ) self.dtype = first_frame_batch.dtype self.bit_depth = self.dtype.itemsize * 8 self.initialized = True
[docs] def write_frames(self, frames: np.ndarray): """ Accumulate frames in memory. Args: frames: Array with shape (T,H,W,C), (H,W,C), or (H,W) """ if not self.initialized: self.init(frames) # Handle different input dimensions if frames.ndim == 2: # Single channel single frame (H,W) -> (1,H,W,1) frames = frames[np.newaxis, :, :, np.newaxis] elif frames.ndim == 3: # Single frame (H,W,C) -> (1,H,W,C) frames = frames[np.newaxis, ...] elif frames.ndim == 4: # Already batched (T,H,W,C) pass else: raise ValueError(f"Expected 2D, 3D or 4D array, got {frames.ndim}D") # Always copy to prevent external modifications self._vid.append(frames.copy())
[docs] def get_array(self) -> Optional[np.ndarray]: """ Fetch accumulated frames as single array. Returns: Concatenated frames or None if empty """ if not self._vid: return None return np.concatenate(self._vid, axis=0)
[docs] def close(self): """No-op for array writer.""" pass
def __repr__(self): n_frames = sum(f.shape[0] for f in self._vid) if self._vid else 0 return f"ArrayWriter(frames={n_frames}, shape=({self.height},{self.width},{self.n_channels}))"