import logging
import re
from abc import ABC, abstractmethod
from pathlib import Path
import numpy as np
import requests
import SimpleITK as sitk # noqa: N813
from iblatlas.atlas import AllenAtlas
from iblutil.util import Bunch
from one import params
from one.webclient import http_download_file
logger = logging.getLogger(__name__)
[docs]
class SliceLoader(ABC):
"""
Abstract base class for loading histology slices.
Subclasses must implement the `get_paths` and `load_volume` methods.
Parameters
----------
file_path : Path
Directory containing histology files.
brain_atlas : AllenAtlas
Reference brain atlas.
"""
def __init__(self, file_path: Path, brain_atlas: AllenAtlas):
self.file_path: Path = file_path
self.brain_atlas: AllenAtlas = brain_atlas
self.hist_paths: dict[str, Path] = {}
self.get_paths()
[docs]
@abstractmethod
def get_paths(self) -> None:
"""Locate and store relevant histology file paths in self.hist_paths."""
[docs]
@abstractmethod
def load_volume(self, vol_path: Path) -> np.ndarray:
"""
Load a 3D volume from a file.
Parameters
----------
vol_path : Path
A path to the volume file.
Returns
-------
np.ndarray
Loaded 3D image volume.
"""
[docs]
def get_slices(self, xyz: np.ndarray) -> dict[str, dict]:
"""
Generate slice images for CCF, annotation, and loaded histology.
Parameters
----------
xyz : np.ndarray
n x 3 array of xyz coordinates.
Returns
-------
slices: dict[str, dict]
A dictionary of dictionaries of image slices and metadata for each image type.
"""
slices = Bunch(
{
'CCF': self.get_slice(xyz, self.brain_atlas.image),
'Annotation': self.get_slice(xyz, self.brain_atlas.label, annotation=True),
}
)
slices['Annotation']['label'] = True
for key, vol_path in self.hist_paths.items():
try:
vol = self.load_volume(vol_path)
slices[key] = self.get_slice(xyz, vol)
except Exception as e:
logger.error(f'Failed to load {key} volume at {vol_path}: {e}')
return slices
[docs]
def get_slice(
self, xyz: np.ndarray, vol: np.ndarray, annotation: bool = False
) -> dict[str, np.ndarray]:
"""
Extract a slice from a 3D volume using given coordinates.
Parameters
----------
xyz : np.ndarray
Nx3 array of XYZ coordinates.
vol : np.ndarray
3D volume from which to extract a slice.
Returns
-------
dict[str, np.ndarray]
A dictionary containing the 2D slice, scale, and offset.
"""
index = self.brain_atlas.bc.xyz2i(xyz)[:, self.brain_atlas.xyz2dims]
width = [self.brain_atlas.bc.i2x(0), self.brain_atlas.bc.i2x(456)]
height = [self.brain_atlas.bc.i2z(index[0, 2]), self.brain_atlas.bc.i2z(index[-1, 2])]
hist_slice = vol[index[:, 0], :, index[:, 2]]
if annotation:
hist_slice = self.brain_atlas._label2rgb(hist_slice)
hist_slice = np.swapaxes(hist_slice, 0, 1)
return Bunch(
{
'slice': hist_slice,
'scale': np.array(
[
(width[-1] - width[0]) / hist_slice.shape[0],
(height[-1] - height[0]) / hist_slice.shape[1],
]
),
'offset': np.array([width[0], height[0]]),
}
)
[docs]
class NrrdSliceLoader(SliceLoader):
"""
SliceLoader for histology in the NRRD format.
Parameters
----------
file_path : Path
Directory containing .nrrd files.
brain_atlas : AllenAtlas
Brain atlas for alignment.
"""
def __init__(self, file_path: Path, brain_atlas: AllenAtlas):
super().__init__(file_path, brain_atlas)
[docs]
def get_paths(self) -> None:
"""Load histology file paths with predefined color channel suffixes."""
col_map = {'red': 'RD', 'green': 'GR'}
files = list(self.file_path.glob('*.nrrd'))
for color, abbrev in col_map.items():
match = next((f for f in files if abbrev in f.name), None)
if match:
self.hist_paths[f'Histology {color}'] = match
[docs]
def load_volume(self, vol_path: Path) -> np.ndarray:
"""
Load a volume using AllenAtlas.
Parameters
----------
vol_path : Path
A path to histology volume.
Returns
-------
np.ndarray
Loaded image volume.
"""
return AllenAtlas._read_volume(vol_path)
[docs]
def download_histology_data(
subject: str, laboratory: str
) -> tuple[list[Path], Path] | tuple[None, Path]:
"""
Download histology data from flatiron server if not already cached locally.
Parameters
----------
subject: str
Subject name
laboratory: str
Laboratory name
Returns
-------
path_to_files: list[Path] or None
List of paths to downloaded or cached nrrd files, or None if not found.
cache_dir: Path
Directory where files are cached.
"""
# If we detect >= 2 nrrd file we assume the histology data already exists
cache_dir = params.get_cache_dir().joinpath(laboratory, 'Subjects', subject, 'histology')
expected_files = list(cache_dir.glob('*.nrrd'))
if len(expected_files) >= 2:
return expected_files, cache_dir
# Otherwise we attempt to download files
lab_hist = 'mrsicflogellab' if laboratory == 'hoferlab' else laboratory
par = params.get()
def _find_histology_folder(subj: str, lab: str):
flatiron_path = Path('histology', lab, subj, 'downsampledStacks_25', 'sample2ARA')
url = f'{par.HTTP_DATA_SERVER}/{"/".join(flatiron_path.parts)}/'
try:
response = requests.get(
url, auth=(par.HTTP_DATA_SERVER_LOGIN, par.HTTP_DATA_SERVER_PWD)
)
response.raise_for_status()
return flatiron_path, response.text
except Exception as e:
logger.warning(f'Failed to find path for lab={lab}, subject={subj}: {e}')
return None
attempts = [(subject, lab_hist), (subject.replace('_', ''), lab_hist)]
if lab_hist == 'churchlandlab_ucla':
attempts.append((subject, 'churchlandlab'))
histology_folder = None
for subj, lab in attempts:
histology_folder = _find_histology_folder(subj, lab)
if histology_folder:
break
if not histology_folder:
logger.error(f'Could not find histology folder for subject={subject}, lab={laboratory}')
return None, cache_dir
rel_path, html_text = histology_folder
base_url = f'{par.HTTP_DATA_SERVER}/{"/".join(rel_path.parts)}'
tif_files = [match + '.tif' for match in re.findall(r'href="(.*).tif"', html_text)]
cache_dir.mkdir(exist_ok=True, parents=True)
path_to_files = []
for file in tif_files:
img_path = Path(cache_dir, file)
if not img_path.exists():
file_url = f'{base_url}/{file}'
http_download_file(
file_url,
target_dir=cache_dir,
username=par.HTTP_DATA_SERVER_LOGIN,
password=par.HTTP_DATA_SERVER_PWD,
)
path_to_files.append(tif2nrrd(img_path))
if len(path_to_files) > 3:
path_to_files = path_to_files[1:3]
return path_to_files, cache_dir
[docs]
def tif2nrrd(path_to_image: str | Path) -> Path:
"""
Convert a tif image to nrrd format if the nrrd does not already exist.
Parameters
----------
path_to_image: str or Path
The path to the tif file.
Returns
-------
path_to_nrrd: Path
The path to the nrrd file.
"""
path_to_nrrd = Path(path_to_image).with_suffix('.nrrd')
if not path_to_nrrd.exists():
reader = sitk.ImageFileReader()
reader.SetImageIO('TIFFImageIO')
reader.SetFileName(str(path_to_image))
img = reader.Execute()
new_img = sitk.PermuteAxes(img, [2, 1, 0])
new_img = sitk.Flip(new_img, [True, False, False])
new_img.SetSpacing([1, 1, 1])
writer = sitk.ImageFileWriter()
writer.SetImageIO('NrrdImageIO')
writer.SetFileName(str(path_to_nrrd))
writer.Execute(new_img)
return path_to_nrrd