import os
import warnings
from typing import Union, List
import numpy as np
try:
import tifffile
TIFF_SUPPORTED = True
except ImportError:
TIFF_SUPPORTED = False
warnings.warn("tifffile not installed. TIFF support unavailable.")
from pyflowreg.util.io._base import VideoReader, VideoWriter
from pyflowreg.util.io._scanimage import parse_scanimage_metadata
[docs]
class TIFFFileReader(VideoReader):
"""
TIFF stack file reader with support for multi-page and multi-channel formats.
Supports:
- Multi-page TIFF stacks (standard format)
- Single-page multi-sample TIFFs (channels as samples per pixel)
- Deinterleaved reading for formats like Suite2p
- Memory-mapped reading for large files
- Various data types (uint8/16/32/64, int32/64, float32/64)
"""
[docs]
def __init__(
self, file_path: str, buffer_size: int = 500, bin_size: int = 1, **kwargs
):
"""
Initialize TIFF reader.
Args:
file_path: Path to TIFF file
buffer_size: Number of frames per batch
bin_size: Temporal binning factor
deinterleave: Channel deinterleaving factor (1=none, >1 for interleaved formats)
use_memmap: Use memory mapping for large files (default: True)
"""
if not TIFF_SUPPORTED:
raise ImportError("tifffile library required for TIFF support")
# Initialize parent classes
VideoReader.__init__(self)
self.file_path = file_path
self.buffer_size = buffer_size
self.bin_size = bin_size
# TIFF-specific options
self.deinterleave = kwargs.get("deinterleave", 1)
self.use_memmap = kwargs.get("use_memmap", True)
# Internal state
self._tiff_file = None
self._tiff_series = None
self._sample_mode = False
self._page_indices = None
# ScanImage metadata
self._scanimage_metadata = None
self._is_scanimage = False
self._z_stack_info = None
# Validate file
if not os.path.isfile(file_path):
raise FileNotFoundError(f"TIFF file not found: {file_path}")
def _initialize(self):
"""Open TIFF file and read metadata."""
try:
# Open with tifffile
self._tiff_file = tifffile.TiffFile(self.file_path)
# Check if it's a ScanImage file
self._check_scanimage_metadata()
# Check if it's an ImageJ or OME-TIFF with series
if len(self._tiff_file.series) > 0:
self._tiff_series = self._tiff_file.series[0]
self._setup_from_series()
else:
# Fallback to page-based reading
self._setup_from_pages()
# Clamp frame count to actual page count (accounts for bad metadata)
self._clamp_frame_count()
except Exception as e:
raise IOError(f"Failed to open TIFF file: {e}")
def _should_use_series_mode(self) -> bool:
"""
Decide whether reads should come from tifffile series instead of pages.
Keep this narrow: contiguous ImageJ-like stacks can have many series
frames but only one TIFF page, while most existing paths (including
ScanImage/deinterleave) should remain page-based for compatibility.
"""
if self._tiff_file is None or self._tiff_series is None:
return False
if self._sample_mode or self.deinterleave > 1:
return False
if self._is_scanimage:
return False
axes = getattr(self._tiff_series, "axes", "")
if "T" not in axes:
return False
return len(self._tiff_file.pages) < self.frame_count
def _series_frame_capacity(self) -> int:
"""
Maximum safe frame count when reading through series mode.
"""
if self._tiff_series is None:
return 0
shape = self._tiff_series.shape
axes = self._tiff_series.axes
if "T" in axes:
return int(shape[axes.index("T")])
if "Z" in axes:
return int(shape[axes.index("Z")])
if self._tiff_file is not None:
return len(self._tiff_file.pages)
return 0
def _clamp_frame_count(self):
"""
Clamp frame_count to what the active read path can safely provide.
"""
if self._tiff_file is None:
return
actual_pages = len(self._tiff_file.pages)
if self._should_use_series_mode():
max_frames = self._series_frame_capacity()
source_desc = (
f"series_axes={self._tiff_series.axes}, pages={actual_pages}, "
f"deinterleave={self.deinterleave}"
)
else:
max_frames = actual_pages // max(1, self.deinterleave)
source_desc = f"pages={actual_pages}, deinterleave={self.deinterleave}"
if self.frame_count > max_frames:
warnings.warn(
f"Clamping TIFF frame_count from {self.frame_count} to {max_frames} "
f"({source_desc})"
)
self.frame_count = max_frames
def _check_scanimage_metadata(self):
"""Check if this is a ScanImage TIFF and parse metadata if so."""
try:
# First check if tifffile directly identifies it as ScanImage
if hasattr(self._tiff_file, "is_scanimage"):
self._is_scanimage = self._tiff_file.is_scanimage
else:
self._is_scanimage = False
# If it's ScanImage, parse detailed metadata
if self._is_scanimage:
# Try to get metadata from tifffile first
if hasattr(self._tiff_file, "scanimage_metadata"):
si_metadata = self._tiff_file.scanimage_metadata
# Parse framesPerSlice and numSlices
if "SI" in si_metadata and "hStackManager" in si_metadata["SI"]:
stack = si_metadata["SI"]["hStackManager"]
slices = stack.get("actualNumSlices", stack.get("numSlices", 1))
frames_per_slice = stack.get("framesPerSlice", 1)
volumes = stack.get(
"actualNumVolumes", stack.get("numVolumes", 1)
)
z_step = stack.get(
"stackZStepSize", stack.get("actualStackZStepSize", None)
)
# Parse channel information
channels_saved = 1
if "hChannels" in si_metadata["SI"]:
chan_info = si_metadata["SI"]["hChannels"]
if "channelSave" in chan_info:
channels_saved = len(chan_info["channelSave"])
# Check if we need to auto-deinterleave for ScanImage
# ScanImage often stores channels as interleaved pages
if channels_saved > 1 and self.deinterleave == 1:
# Check if page count suggests interleaved channels
expected_pages = (
slices * frames_per_slice * volumes * channels_saved
)
actual_pages = len(self._tiff_file.pages)
if actual_pages == expected_pages:
print(
f"ScanImage channel interleaving detected: auto-setting deinterleave={channels_saved}"
)
self.deinterleave = channels_saved
if slices > 1 or frames_per_slice > 1:
self._z_stack_info = {
"volumes": volumes,
"slices_per_volume": slices,
"frames_per_slice": frames_per_slice,
"total_frames_flattened": slices
* frames_per_slice
* volumes,
"z_step": z_step,
"interpretation": "z_stack",
"channels_saved": channels_saved,
}
print("ScanImage Z-stack detected from metadata:")
print(
f" {slices} slices × {frames_per_slice} frames/slice × {volumes} volume(s)"
)
print(
f" = {slices * frames_per_slice * volumes} total frames"
)
print(f" Channels: {channels_saved}")
if z_step:
print(f" Z step: {z_step} µm")
else:
# Fallback to parsing from file
self._scanimage_metadata = parse_scanimage_metadata(self.file_path)
if self._scanimage_metadata.get("is_scanimage"):
slices = self._scanimage_metadata.get("slices_per_volume", 1)
volumes = self._scanimage_metadata.get("volumes", 1)
if slices > 1 or volumes > 1:
self._z_stack_info = {
"volumes": volumes,
"slices_per_volume": slices,
"total_frames_flattened": volumes * slices,
"z_step": self._scanimage_metadata.get("z_step"),
"interpretation": self._scanimage_metadata.get(
"interpretation", "z_stack"
),
}
print(
f"ScanImage Z-stack detected: {volumes} volume(s), {slices} slice(s) per volume"
)
print(
f" Treating as {volumes * slices} individual frames for motion correction"
)
except Exception as e:
# If parsing fails, treat as regular TIFF
print(f"Warning: Error parsing ScanImage metadata: {e}")
self._is_scanimage = False
self._scanimage_metadata = None
def _setup_from_series(self):
"""Setup reader from tifffile series (standard multi-page format)."""
shape = self._tiff_series.shape
axes = self._tiff_series.axes
# Get spatial dimensions first
y_idx = axes.index("Y") if "Y" in axes else 0
x_idx = axes.index("X") if "X" in axes else 1
self.height = shape[y_idx]
self.width = shape[x_idx]
# Get channels
if "C" in axes:
c_idx = axes.index("C")
self.n_channels = shape[c_idx]
elif "S" in axes:
# Samples as channels
s_idx = axes.index("S")
self.n_channels = shape[s_idx]
else:
self.n_channels = 1
# For ScanImage files with channels, auto-enable deinterleaving
# ScanImage stores multi-channel data as interleaved pages
if self._is_scanimage and "C" in axes and shape[axes.index("C")] > 1:
if self.deinterleave == 1: # Only auto-set if not manually specified
self.deinterleave = shape[axes.index("C")]
print(
f"ScanImage multi-channel detected: auto-setting deinterleave={self.deinterleave}"
)
# When deinterleaving, we treat pages as interleaved channels
self.n_channels = self.deinterleave
# Handle frame count based on whether this is ScanImage with Z-stacks
if self._is_scanimage and self._z_stack_info:
# For ScanImage Z-stacks, flatten volumes * slices into frames
self.frame_count = self._z_stack_info["total_frames_flattened"]
else:
# Standard TIFF handling - but check for ScanImage-style ZTCYX
# Common patterns: 'TYX', 'TCYX', 'YXS', 'ZCYX', 'ZTCYX'
if "Z" in axes and "T" in axes:
# Both Z and T present (like ScanImage ZTCYX)
z_idx = axes.index("Z")
t_idx = axes.index("T")
# For ScanImage files, even without explicit metadata,
# we should flatten Z*T as total frames
if self._is_scanimage:
self.frame_count = shape[z_idx] * shape[t_idx]
print(
f"ScanImage ZTCYX detected: {shape[z_idx]} Z-slices × {shape[t_idx]} frames/slice = {self.frame_count} total frames"
)
else:
# For non-ScanImage, T is primary time dimension
self.frame_count = shape[t_idx]
elif "T" in axes:
# Time series only
time_idx = axes.index("T")
self.frame_count = shape[time_idx]
elif "Z" in axes:
# Z-stack only, treated as time
z_idx = axes.index("Z")
self.frame_count = shape[z_idx]
else:
# Single frame or special format
self.frame_count = len(self._tiff_file.pages)
# Apply deinterleaving semantics consistently with page-based setup:
# deinterleave N means N interleaved single-channel pages form one frame.
if self.deinterleave > 1:
self.n_channels = self.deinterleave
self.frame_count = self.frame_count // self.deinterleave
# Get data type
self.dtype = self._tiff_series.dtype
def _setup_from_pages(self):
"""Setup reader from individual pages (fallback method)."""
pages = self._tiff_file.pages
first_page = pages[0]
# Check if single page with multiple samples (sample mode)
if len(pages) == 1 and hasattr(first_page, "samplesperpixel"):
samples = first_page.samplesperpixel
if samples > 1:
self._sample_mode = True
self.frame_count = samples
self.n_channels = 1
self.height = first_page.imagelength
self.width = first_page.imagewidth
else:
# Single page, single sample
self.frame_count = 1
self.n_channels = 1
self.height = first_page.imagelength
self.width = first_page.imagewidth
else:
# Multi-page format
# For ScanImage with Z-stacks, override frame count
if self._is_scanimage and self._z_stack_info:
self.frame_count = self._z_stack_info["total_frames_flattened"]
else:
self.frame_count = len(pages)
self.height = first_page.imagelength
self.width = first_page.imagewidth
# Determine channels from samples per pixel
if hasattr(first_page, "samplesperpixel"):
self.n_channels = first_page.samplesperpixel
else:
self.n_channels = (
1 if len(first_page.shape) == 2 else first_page.shape[2]
)
# Apply deinterleaving
if self.deinterleave > 1:
self.n_channels = self.deinterleave
self.frame_count = self.frame_count // self.deinterleave
# Get data type
self.dtype = first_page.dtype
def _read_raw_frames(self, frame_indices: Union[slice, List[int]]) -> np.ndarray:
"""
Read raw frames from TIFF file.
Args:
frame_indices: 0-based indices (slice or list)
Returns:
Array with shape (T, H, W, C)
"""
# Ensure frame_count does not exceed available pages
self._clamp_frame_count()
# Convert slice to list
if isinstance(frame_indices, slice):
start, stop, step = frame_indices.indices(self.frame_count)
frame_indices = list(range(start, stop, step))
n_frames = len(frame_indices)
if n_frames == 0:
return np.empty(
(0, self.height, self.width, self.n_channels), dtype=self.dtype
)
# Allocate output
output = np.zeros(
(n_frames, self.height, self.width, self.n_channels), dtype=self.dtype
)
if self._sample_mode:
# Single page with multiple samples - read strips
self._read_sample_mode(frame_indices, output)
elif self._should_use_series_mode():
# Contiguous ImageJ-like stacks: series carries full time axis.
self._read_series_mode(frame_indices, output)
else:
# Page-based reading (default, includes deinterleave path).
self._read_page_mode(frame_indices, output)
return output
def _read_sample_mode(self, frame_indices: List[int], output: np.ndarray):
"""Read frames in sample mode (single page, multiple samples)."""
page = self._tiff_file.pages[0]
# Read the entire page once
data = page.asarray()
# Extract requested samples (treating samples as frames)
for i, frame_idx in enumerate(frame_indices):
if len(data.shape) == 3:
# Data is (H, W, Samples)
output[i, :, :, 0] = data[:, :, frame_idx]
else:
# Data is (H, W)
output[i, :, :, 0] = data
def _read_series_mode(self, frame_indices: List[int], output: np.ndarray):
"""Read frames using tifffile series (efficient for large stacks)."""
# Get full data array (may be memory-mapped)
if self.use_memmap:
data = self._tiff_series.asarray(out="memmap")
else:
data = self._tiff_series.asarray()
# Parse axes to find dimensions
axes = self._tiff_series.axes
# Build indexing based on axes
for i, frame_idx in enumerate(frame_indices):
if "T" in axes:
t_idx = axes.index("T")
if "C" in axes:
# Handle different axis orders
if axes == "TCYX":
output[i] = np.transpose(data[frame_idx], (1, 2, 0))
elif axes == "TYX":
output[i, :, :, 0] = data[frame_idx]
elif axes == "TYXC":
output[i] = data[frame_idx]
else:
# Generic handling
frame_data = np.take(data, frame_idx, axis=t_idx)
output[i] = self._reshape_to_output(frame_data, axes)
else:
# No channel dimension
output[i, :, :, 0] = np.take(data, frame_idx, axis=t_idx)
else:
# No time dimension, treat pages as frames
output[i] = self._page_to_frame(data, frame_idx)
def _read_page_mode(self, frame_indices: List[int], output: np.ndarray):
"""Read frames page by page (with deinterleaving support)."""
for i, frame_idx in enumerate(frame_indices):
if self.deinterleave > 1:
# Read multiple pages for deinterleaved channels
base_page_idx = frame_idx * self.deinterleave
for ch in range(self.deinterleave):
page_idx = base_page_idx + ch
if page_idx < len(self._tiff_file.pages):
page_data = self._tiff_file.pages[page_idx].asarray()
if page_data.ndim == 2:
output[i, :, :, ch] = page_data
else:
output[i, :, :, ch] = page_data[:, :, 0]
else:
# Standard page reading
page = self._tiff_file.pages[frame_idx]
page_data = page.asarray()
if page_data.ndim == 2:
output[i, :, :, 0] = page_data
elif page_data.ndim == 3:
# Multi-channel page
n_ch = min(page_data.shape[2], self.n_channels)
output[i, :, :, :n_ch] = page_data[:, :, :n_ch]
def _reshape_to_output(self, data: np.ndarray, axes: str) -> np.ndarray:
"""Reshape data from arbitrary axes order to (H, W, C)."""
# Find indices
y_idx = axes.index("Y") if "Y" in axes else 0
x_idx = axes.index("X") if "X" in axes else 1
c_idx = (
axes.index("C") if "C" in axes else axes.index("S") if "S" in axes else None
)
if c_idx is not None:
# Transpose to (Y, X, C)
perm = [y_idx, x_idx, c_idx]
remaining = [i for i in range(len(axes)) if i not in perm]
perm.extend(remaining)
result = np.transpose(data, perm)
return result[:, :, : self.n_channels]
else:
# No channel dimension
if data.ndim == 2:
return data[:, :, np.newaxis]
else:
perm = [y_idx, x_idx]
remaining = [i for i in range(len(axes)) if i not in perm]
perm.extend(remaining)
result = np.transpose(data, perm)
return result[:, :, np.newaxis]
def _page_to_frame(self, data: np.ndarray, idx: int) -> np.ndarray:
"""Convert page data to frame format (H, W, C)."""
if data.ndim == 2:
return data[:, :, np.newaxis]
elif data.ndim == 3:
return data[idx] if data.shape[0] > idx else data
else:
return data
[docs]
def close(self):
"""Close TIFF file."""
if self._tiff_file:
self._tiff_file.close()
self._tiff_file = None
self._tiff_series = None
[docs]
class TIFFFileWriter(VideoWriter):
"""
TIFF stack file writer with multi-page and compression support.
Features:
- Multi-page TIFF writing
- Suite2p format support (interleaved single-channel pages)
- Various compression algorithms
- ImageJ metadata compatibility
- BigTIFF support for files >4GB (default: always enabled like MATLAB)
"""
[docs]
def __init__(self, file_path: str, **kwargs):
"""
Initialize TIFF writer.
Args:
file_path: Output file path
format: 'default' or 'suite2p' (changes to interleaved single-channel pages)
compression: Compression type ('none', 'lzw', 'zlib', 'jpeg')
compression_level: Compression level for zlib (0-9)
bigtiff: Use BigTIFF format (default: True, matching MATLAB 'w8')
imagej: Write ImageJ-compatible metadata
metadata: Additional metadata dict to include
"""
if not TIFF_SUPPORTED:
raise ImportError("tifffile library required for TIFF support")
# Initialize parent classes
VideoWriter.__init__(self)
self.file_path = file_path
self._frame_count = 0
# Options
self.format = kwargs.get("format", "default")
self.compression = kwargs.get("compression", "none")
self.compression_level = kwargs.get("compression_level", 6)
self.bigtiff = kwargs.get("bigtiff", True) # Default to True like MATLAB 'w8'
self.imagej = kwargs.get("imagej", False)
self.metadata = kwargs.get("metadata", {})
# Compression mapping
self._compression_map = {
"none": None,
"lzw": "lzw",
"zlib": "zlib",
"deflate": "zlib",
"jpeg": "jpeg",
}
# Track axes for current write
self._current_axes = None
[docs]
def write_frames(self, frames: np.ndarray):
"""
Write frames to TIFF file.
Args:
frames: Array with shape (T, H, W, C) or (T, H, W) or (H, W)
"""
# Normalize input to 4D (T, H, W, C)
if frames.ndim == 2: # Single frame, single channel
frames = frames[np.newaxis, :, :, np.newaxis]
elif frames.ndim == 3:
if len(frames) == 1 or (
hasattr(self, "height")
and frames.shape[0] == self.height
and frames.shape[1] == self.width
):
# Single frame, multiple channels (H, W, C) or (1, H, W)
if len(frames) == 1:
frames = frames[:, :, :, np.newaxis]
else:
frames = frames[np.newaxis, :, :, :]
else:
# Multiple frames, single channel (T, H, W)
frames = frames[:, :, :, np.newaxis]
elif frames.ndim != 4:
raise ValueError(f"Expected 2D, 3D or 4D input, got {frames.ndim}D")
T, H, W, C = frames.shape
# Apply format transformations BEFORE initialization
if self.format == "suite2p" and C > 1:
# Suite2p: interleave channels as single-channel pages
frames = self._format_suite2p(frames)
T, H, W, C = frames.shape
# Now squeeze single channel dimension for proper axes
if C == 1:
frames = frames[:, :, :, 0] # Remove channel dimension
self._current_axes = "TYX"
else:
self._current_axes = "TCYX"
frames = np.moveaxis(frames, -1, -3)
# Initialize on first write (after format transforms)
if not self.initialized:
self.height = H
self.width = W
self.n_channels = 1 if self._current_axes == "TYX" else C
self.dtype = frames.dtype
self.initialized = True
self._create_file()
# Validate shape consistency
if H != self.height or W != self.width:
raise ValueError(
f"Frame size mismatch. Expected ({self.height}, {self.width}), "
f"got ({H}, {W})"
)
expected_c = 1 if self._current_axes == "TYX" else C
actual_c = 1 if frames.ndim == 3 else frames.shape[1]
if actual_c != expected_c:
raise ValueError(
f"Channel count mismatch. Expected {expected_c}, got {actual_c}"
)
# Write frames
self._write_frames_to_file(frames)
# Update frame count based on original T (before suite2p transform)
if self.format == "suite2p" and self.n_channels == 1:
# For suite2p, T is already multiplied by original channels
self._frame_count += len(frames)
else:
self._frame_count += T
def _create_file(self):
"""Create and initialize TIFF file."""
# Remove existing file
if os.path.exists(self.file_path):
os.remove(self.file_path)
# Set up file parameters
self._setup_write_params()
def _setup_write_params(self):
"""Set up parameters for writing."""
# Set up compression
compression = self._compression_map.get(self.compression.lower(), None)
if compression == "zlib":
compression = (compression, self.compression_level)
# Create metadata with current axes
metadata = self.metadata.copy()
if hasattr(self, "_current_axes"):
metadata.update(
{
# 'axes': self._current_axes,
"Software": "pyflowreg"
}
)
self._file_kwargs = {
"bigtiff": self.bigtiff,
"compression": compression,
"metadata": metadata
if hasattr(self, "_current_axes")
else {"Software": "pyflowreg"},
"imagej": self.imagej,
}
self._frame_kwargs = {
"compression": compression,
"metadata": metadata
if hasattr(self, "_current_axes")
else None, # ✅ Fallback
}
def _format_suite2p(self, frames: np.ndarray) -> np.ndarray:
"""
Convert frames to Suite2p format (interleaved single-channel pages).
Args:
frames: (T, H, W, C) array
Returns:
(T*C, H, W, 1) array with interleaved channels
"""
T, H, W, C = frames.shape
if C == 1:
return frames
# Interleave channels as separate pages
# Order: frame0_ch0, frame0_ch1, ..., frame1_ch0, frame1_ch1, ...
formatted = np.zeros((T * C, H, W, 1), dtype=frames.dtype)
for t in range(T):
for c in range(C):
formatted[t * C + c, :, :, 0] = frames[t, :, :, c]
return formatted
def _write_frames_to_file(self, frames: np.ndarray):
"""
Write frame data to TIFF file.
Args:
frames: Array with shape (T, H, W) or (T, H, W, C)
"""
# Write using tifffile
# if self._frame_count == 0:
# First write - create new file with all kwargs
# tifffile.imwrite(
## self.file_path,
## frames,
## planarconfig='contig',
# contiguous=True,
# **self._file_kwargs
# )
##else:
# Append to existing file
append = self._frame_count > 0
with tifffile.TiffWriter(
self.file_path, append=append, bigtiff=self.bigtiff, imagej=self.imagej
) as tif:
# Write each frame individually for append mode
if frames.ndim == 3: # (T, H, W)
for frame in frames:
tif.write(frame[None, ...], **self._frame_kwargs)
else: # (T, C, H, W)
for frame in frames:
frame = np.moveaxis(frame, 0, -1) # Move C
tif.write(
frame,
planarconfig="contig",
contiguous=True,
**self._frame_kwargs,
)
[docs]
def close(self):
"""Close the TIFF file."""
if self._frame_count > 0:
print(f"TIFF file written: {self.file_path}")
print(f" Frames: {self._frame_count}")
print(f" Dimensions: {self.height}x{self.width}")
print(f" Channels: {self.n_channels}")
print(f" Format: {self.format}")
print(f" Compression: {self.compression}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# Test functions
[docs]
def test_basic_functionality():
"""Test basic TIFF reading and writing."""
import numpy as np
print("Testing basic TIFF functionality...")
# Create test data
test_data = np.random.randint(0, 65535, (10, 256, 256, 2), dtype=np.uint16)
# Test multi-channel writing
print("\nTesting multi-channel TIFF...")
with TIFFFileWriter("test_multichannel.tif", compression="lzw") as writer:
writer.write_frames(test_data[:5])
writer.write_frames(test_data[5:])
# Test single-channel writing
print("\nTesting single-channel TIFF...")
single_channel = test_data[:, :, :, 0:1]
with TIFFFileWriter("test_singlechannel.tif", compression="lzw") as writer:
writer.write_frames(single_channel)
# Test Suite2p format
print("\nTesting Suite2p format...")
with TIFFFileWriter("test_suite2p.tif", format="suite2p") as writer:
writer.write_frames(test_data[:5])
# Test reading back
print("\nTesting reading...")
reader = TIFFFileReader("test_multichannel.tif")
print(f" Shape: {reader.shape}")
print(f" Dtype: {reader.dtype}")
frame = reader[0]
print(f" Single frame shape: {frame.shape}")
frames = reader[0:3]
print(f" Multiple frames shape: {frames.shape}")
reader.close()
print("\nBasic tests completed successfully!")
[docs]
def test_mdf_conversion():
"""Test MDF to TIFF conversion with proper binning."""
import numpy as np
from pathlib import Path
try:
from pyflowreg.util.io.mdf import MDFFileReader
except ImportError:
print("MDF reader not available, skipping MDF conversion test")
return
import cv2
# Input and output paths
filename = r"D:\2025_OIST\Shinobu\RFPonly\190403_001.MDF"
out_path = Path(filename).with_suffix(".tif")
print("Converting MDF to TIFF with proper binning...")
print(f" Input: {filename}")
print(f" Output: {out_path}")
# Method 1: Write unbinned, then compare binned reads
print("\nMethod 1: Write unbinned TIFF, bin during read")
mdf_unbinned = MDFFileReader(filename, buffer_size=500, bin_size=1)
# Write subset of unbinned frames
start_frame = 5 * 8200
end_frame = 5 * 8300
with TIFFFileWriter(str(out_path), compression="lzw", bigtiff=True) as writer:
print(f"Writing frames {start_frame} to {end_frame}...")
for i in range(start_frame, end_frame):
if (i - start_frame) % 20 == 0:
print(f" Frame {i - start_frame + 1}/{end_frame - start_frame}")
frame = mdf_unbinned[i]
if frame.ndim == 2:
frame = frame[np.newaxis, :, :]
elif frame.ndim == 3 and frame.shape[0] != 1:
frame = frame[np.newaxis, :, :, :]
writer.write_frames(frame)
mdf_unbinned.close()
# Now compare binned versions
print("\nComparing binned versions...")
# Read with binning
tiff_reader = TIFFFileReader(str(out_path), buffer_size=500, bin_size=5)
mdf_binned = MDFFileReader(filename, buffer_size=500, bin_size=5)
# Read corresponding frames
tiff_frames = tiff_reader[0:20] # First 20 binned frames from TIFF
mdf_frames = mdf_binned[8200:8220] # Corresponding binned frames from MDF
# Check equality
if np.allclose(tiff_frames, mdf_frames, rtol=1e-5, atol=1):
print("✓ Frames match within tolerance (expected due to rounding)")
else:
max_diff = np.abs(tiff_frames.astype(float) - mdf_frames.astype(float)).max()
print(f"⚠ Max difference: {max_diff:.3f}")
# Visual comparison
print("\nShowing visual comparison (press ESC to exit)...")
for i in range(min(5, tiff_frames.shape[0])):
tiff_norm = cv2.normalize(
tiff_frames[i, :, :, 0], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U
)
mdf_norm = cv2.normalize(
mdf_frames[i, :, :, 0], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U
)
comparison = np.hstack([tiff_norm, mdf_norm])
cv2.putText(comparison, "TIFF", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2)
cv2.putText(
comparison,
"MDF",
(tiff_norm.shape[1] + 10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
255,
2,
)
cv2.imshow("TIFF vs MDF (binned)", comparison)
cv2.waitKey(500)
cv2.destroyAllWindows()
# Method 2: Pre-bin in float, then write
print("\nMethod 2: Pre-bin in float domain, then write")
out_path_binned = (
Path(filename).with_stem(Path(filename).stem + "_binned").with_suffix(".tif")
)
mdf_unbinned = MDFFileReader(filename, buffer_size=500, bin_size=1)
with TIFFFileWriter(
str(out_path_binned), compression="lzw", bigtiff=True
) as writer:
print("Writing pre-binned frames...")
# Process in chunks of bin_size
for chunk_start in range(start_frame, end_frame, 5):
chunk_end = min(chunk_start + 5, end_frame)
# Read chunk and average in float
chunk_frames = []
for i in range(chunk_start, chunk_end):
frame = mdf_unbinned[i]
chunk_frames.append(frame.astype(np.float32))
if chunk_frames:
# Average and convert back to original dtype
avg_frame = np.mean(chunk_frames, axis=0)
avg_frame = np.round(avg_frame).astype(mdf_unbinned.dtype)
if avg_frame.ndim == 2:
avg_frame = avg_frame[np.newaxis, :, :]
elif avg_frame.ndim == 3 and avg_frame.shape[0] != 1:
avg_frame = avg_frame[np.newaxis, :, :, :]
writer.write_frames(avg_frame)
print(f"\nPre-binned TIFF written: {out_path_binned}")
# Cleanup
tiff_reader.close()
mdf_binned.close()
mdf_unbinned.close()
print("\nConversion tests completed!")
[docs]
def main2():
import numpy as np
from pathlib import Path
from mdf import MDFFileReader
import cv2
filename = r"D:\2025_OIST\Shinobu\RFPonly\190403_001.MDF"
out_path = Path(filename + ".tiff")
mdf = MDFFileReader(filename, buffer_size=500, bin_size=1)
with TIFFFileWriter(str(out_path)) as w:
# for i in range(5 * 8200, 5 * 9200):
for i in range(5 * 8200, 5 * 8221):
frame = mdf[i]
w.write_frames(frame[np.newaxis])
h5 = TIFFFileReader(str(out_path), buffer_size=500, bin_size=5)
h5_b5 = h5[0:20]
h5.close()
mdf.close()
mdf2 = MDFFileReader(filename, buffer_size=500, bin_size=5)
mdf_b5 = mdf2[8200 : 8200 + 20]
mdf2.close()
counter = 0
while True:
frame = np.concatenate([h5_b5[counter], mdf_b5[counter]], axis=0)
counter = (counter + 1) % h5_b5.shape[0]
cv2.imshow(
"Frame",
cv2.normalize(
frame[..., 0], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U
),
)
key = cv2.waitKey(1)
if key == 27:
break
if not np.array_equal(h5_b5, mdf_b5):
d = h5_b5.astype(np.int64) - mdf_b5.astype(np.int64)
print(int(np.abs(d).max()))
print("Frames are not equal!")
else:
print(f"OK {out_path}")
[docs]
def main3():
input_file = "D:\\2024_OIST\\flow-registration\\2024\\MotionCorrection_2024\\FearConditioning_L6_somas_z1\\M231221_2_240207_002_001.TIF"
import cv2
tif_reader = TIFFFileReader(input_file, bin_size=20)
frames = tif_reader[0:500]
counter = 0
while True:
cv2.imshow(
"Frame",
cv2.normalize(
frames[counter], None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U
),
)
counter = (counter + 1) % frames.shape[0]
key = cv2.waitKey(1)
if key == 27:
break
if __name__ == "__main__":
main3()