Source code for pyflowreg.util.io.hdf5

import os
from typing import Union, List

import h5py
import numpy as np

from pyflowreg.util.io._base import VideoReader, VideoWriter
from pyflowreg.util.io._ds_io import DSFileReader, DSFileWriter


[docs] class HDF5FileReader(DSFileReader, VideoReader): """HDF5 video file reader with dataset discovery.""" def __init__( self, file_path: str, buffer_size: int = 500, bin_size: int = 1, **kwargs ): # Initialize parent classes DSFileReader.__init__(self) VideoReader.__init__(self) self.file_path = file_path self.buffer_size = buffer_size self.bin_size = bin_size self.h5file = None # Dataset-specific options self.dataset_names = kwargs.get("dataset_names") self.dimension_ordering = kwargs.get("dimension_ordering") def _initialize(self): """Open file and set up properties.""" try: self.h5file = h5py.File(self.file_path, "r") except Exception as e: raise IOError(f"Cannot open HDF5 file: {e}") # Use DSFileReader mixin to find datasets if not self.dataset_names: datasets_info = [] def visitor(name, obj): if isinstance(obj, h5py.Dataset): datasets_info.append((name, obj.shape)) self.h5file.visititems(visitor) self.dataset_names = self._find_datasets(datasets_info) if not self.dataset_names: raise ValueError("No suitable datasets found") # Set properties from first dataset first_ds = self.h5file[self.dataset_names[0]] shape = first_ds.shape # Detect dimension ordering (implementation specific) # For now assume it's already (T, H, W) or needs transposing if len(shape) == 3: self.frame_count, self.height, self.width = shape self.n_channels = len(self.dataset_names) elif len(shape) == 4: self.frame_count, self.height, self.width, self.n_channels = shape self.dtype = first_ds.dtype def _read_raw_frames(self, frame_indices: Union[slice, List[int]]) -> np.ndarray: """Read raw frames from HDF5 file.""" # Convert list to slice if contiguous if isinstance(frame_indices, list): if len(frame_indices) == 0: return np.empty( (0, self.height, self.width, self.n_channels), dtype=self.dtype ) # Check if contiguous if len(frame_indices) > 1: diffs = np.diff(frame_indices) if np.all(diffs == 1): frame_indices = slice(frame_indices[0], frame_indices[-1] + 1) # Allocate output if isinstance(frame_indices, slice): start, stop, step = frame_indices.indices(self.frame_count) indices = range(start, stop, step) else: indices = frame_indices n_frames = len(indices) output = np.zeros( (n_frames, self.height, self.width, self.n_channels), dtype=self.dtype ) # Read from each dataset/channel for ch_idx, ds_name in enumerate(self.dataset_names): dataset = self.h5file[ds_name] if isinstance(frame_indices, slice): # Efficient slicing for contiguous frames data = dataset[frame_indices, :, :] else: # Fancy indexing for non-contiguous data = dataset[indices, :, :] output[:, :, :, ch_idx] = data return output
[docs] def close(self): """Close HDF5 file.""" if self.h5file: self.h5file.close() self.h5file = None
[docs] class HDF5FileWriter(DSFileWriter, VideoWriter): """ HDF5 video file writer with MATLAB compatibility. Accepts frames in Python format (T, H, W, C) but stores them in MATLAB-compatible format as separate 3D datasets per channel with configurable dimension ordering. """
[docs] def __init__(self, file_path: str, **kwargs): """ Initialize HDF5 writer. Args: file_path: Output file path dataset_names: Optional dataset naming pattern or list Default: 'ch*' (produces ch1, ch2, etc.) dimension_ordering: Storage order for MATLAB compatibility Default: (0, 1, 2) for (H, W, T) storage compression: HDF5 compression ('gzip', 'lzf', or None) compression_level: Compression level for gzip (1-9) chunk_size: Chunk size for temporal dimension (default: 1) """ # Initialize parent classes DSFileWriter.__init__(self, **kwargs) VideoWriter.__init__(self) self.file_path = file_path self._h5file = None self._datasets = {} self._frame_counter = 0 # MATLAB compatibility options # Default (1, 2, 0) means store as (T, H, W) which MATLAB reads as (H, W, T) self.dimension_ordering = kwargs.get("dimension_ordering", (1, 2, 0)) # Compression options self.compression = kwargs.get("compression", None) self.compression_level = kwargs.get("compression_level", 4) self.chunk_temporal = kwargs.get("chunk_size", 1) # Dataset naming - default to MATLAB convention if not self.dataset_names: self.dataset_names = "ch*" # Will produce ch1, ch2, etc.
def _create_datasets(self): """Create HDF5 datasets for each channel.""" # Remove existing file if it exists if os.path.exists(self.file_path): os.remove(self.file_path) self._h5file = h5py.File(self.file_path, "w") # Define initial shape and max shape for expandable datasets # We store in MATLAB format: separate 3D datasets per channel initial_shape = [None, None, None] initial_shape[self.dimension_ordering[0]] = self.height initial_shape[self.dimension_ordering[1]] = self.width initial_shape[self.dimension_ordering[2]] = 0 # Start with 0 frames initial_shape = tuple(initial_shape) max_shape = [None, None, None] max_shape[self.dimension_ordering[0]] = self.height max_shape[self.dimension_ordering[1]] = self.width max_shape[self.dimension_ordering[2]] = None # Unlimited frames max_shape = tuple(max_shape) # Chunking for efficient I/O chunk_shape = [None, None, None] chunk_shape[self.dimension_ordering[0]] = self.height chunk_shape[self.dimension_ordering[1]] = self.width chunk_shape[self.dimension_ordering[2]] = self.chunk_temporal chunk_shape = tuple(chunk_shape) # Create a dataset for each channel for ch_idx in range(self.n_channels): ds_name = self.get_ds_name(ch_idx + 1, self.n_channels) # Create expandable dataset if self.compression: if self.compression == "gzip": ds = self._h5file.create_dataset( name=ds_name, shape=initial_shape, maxshape=max_shape, dtype=self.dtype, chunks=chunk_shape, compression="gzip", compression_opts=self.compression_level, ) else: ds = self._h5file.create_dataset( name=ds_name, shape=initial_shape, maxshape=max_shape, dtype=self.dtype, chunks=chunk_shape, compression=self.compression, ) else: ds = self._h5file.create_dataset( name=ds_name, shape=initial_shape, maxshape=max_shape, dtype=self.dtype, chunks=chunk_shape, ) self._datasets[ds_name] = ds # Add MATLAB-friendly attributes ds.attrs["dimension_ordering"] = self.dimension_ordering ds.attrs["original_shape_THWC"] = ( 0, self.height, self.width, self.n_channels, )
[docs] def write_frames(self, frames: np.ndarray): """ Write frames to HDF5 file. Args: frames: Array with shape (T, H, W, C) or (T, H, W) or (H, W) """ # Normalize input to 4D (T, H, W, C) if frames.ndim == 2: # Single frame, single channel (H, W) frames = frames[np.newaxis, :, :, np.newaxis] elif frames.ndim == 3: if frames.shape[0] == self.height and frames.shape[1] == self.width: # Single frame, multiple channels (H, W, C) frames = frames[np.newaxis, :, :, :] else: # Multiple frames, single channel (T, H, W) frames = frames[:, :, :, np.newaxis] elif frames.ndim != 4: raise ValueError(f"Expected 2D, 3D or 4D input, got {frames.ndim}D") # Initialize on first write if not self.initialized: T, H, W, C = frames.shape self.height = H self.width = W self.n_channels = C self.dtype = frames.dtype self.initialized = True self._create_datasets() # Validate shape T, H, W, C = frames.shape if H != self.height or W != self.width: raise ValueError( f"Frame size mismatch. Expected ({self.height}, {self.width}), " f"got ({H}, {W})" ) if C != self.n_channels: raise ValueError( f"Channel count mismatch. Expected {self.n_channels}, got {C}" ) # Write each channel separately for MATLAB compatibility for ch_idx in range(self.n_channels): ds_name = self.get_ds_name(ch_idx + 1, self.n_channels) dataset = self._datasets[ds_name] # Extract channel data: (T, H, W) channel_data = frames[:, :, :, ch_idx] # Transpose to MATLAB storage order # From (T, H, W) to dimension_ordering if self.dimension_ordering != (2, 0, 1): # Create mapping from current (T=0, H=1, W=2) to target ordering # Default MATLAB is (H=0, W=1, T=2) perm = [None, None, None] perm[self.dimension_ordering[0]] = 1 # H position perm[self.dimension_ordering[1]] = 2 # W position perm[self.dimension_ordering[2]] = 0 # T position channel_data = np.transpose(channel_data, perm) # Determine where to write in the dataset current_frames = dataset.shape[self.dimension_ordering[2]] new_total_frames = current_frames + T # Resize dataset along time dimension new_shape = list(dataset.shape) new_shape[self.dimension_ordering[2]] = new_total_frames dataset.resize(new_shape) # Create slice objects for writing slices = [slice(None), slice(None), slice(None)] slices[self.dimension_ordering[2]] = slice(current_frames, new_total_frames) # Write the data dataset[tuple(slices)] = channel_data # Update attributes dataset.attrs["original_shape_THWC"] = ( new_total_frames, H, W, self.n_channels, ) self._frame_counter = new_total_frames # Flush to ensure data is written if self._h5file: self._h5file.flush()
[docs] def close(self): """Close the HDF5 file.""" if self._h5file: # Write final metadata for MATLAB compatibility if self._datasets: # Add file-level attributes self._h5file.attrs["n_channels"] = self.n_channels self._h5file.attrs["frame_count"] = self._frame_counter self._h5file.attrs["height"] = self.height self._h5file.attrs["width"] = self.width self._h5file.attrs["dimension_ordering"] = self.dimension_ordering self._h5file.attrs["format"] = "pyflowreg_hdf5_v1" # Store dataset names as attribute for easy discovery dataset_names_list = list(self._datasets.keys()) self._h5file.attrs["dataset_names"] = dataset_names_list self._h5file.close() self._h5file = None self._datasets = {} print(f"HDF5 file written: {self.file_path}")
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def main(): import numpy as np from pathlib import Path from mdf import MDFFileReader import cv2 filename = r"D:\2025_OIST\Shinobu\RFPonly\190403_001.MDF" out_path = Path(filename + ".hdf") mdf = MDFFileReader(filename, buffer_size=500, bin_size=1) with HDF5FileWriter(str(out_path)) as w: # for i in range(5 * 8200, 5 * 9200): for i in range(5 * 8200, 5 * 8300): frame = mdf[i] w.write_frames(frame[np.newaxis]) h5 = HDF5FileReader(str(out_path), buffer_size=500, bin_size=5) h5_b5 = h5[0:20] h5.close() mdf.close() mdf2 = MDFFileReader(filename, buffer_size=500, bin_size=5) mdf_b5 = mdf2[8200 : 8200 + 20] mdf2.close() counter = 0 while True: frame = np.concatenate([h5_b5[counter], mdf_b5[counter]], axis=0) counter = (counter + 1) % h5_b5.shape[0] cv2.imshow( "Frame", cv2.normalize( frame[..., 0], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U ), ) key = cv2.waitKey(1) if key == 27: break if not np.array_equal(h5_b5, mdf_b5): d = h5_b5.astype(np.int64) - mdf_b5.astype(np.int64) print(int(np.abs(d).max())) print("Frames are not equal!") else: print(f"OK {out_path}")
[docs] def reader_main(): import cv2 filename = r"D:\2025_OIST\Shinobu\RFPonly\test.hdf" reader = HDF5FileReader(filename, buffer_size=500, bin_size=1) print(f"Number of frames: {len(reader)}") for i in range(len(reader)): frame = reader[i] cv2.imshow( "Frame", cv2.normalize( frame[:, :, 0], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U ), ) cv2.waitKey(1)
if __name__ == "__main__": main()