Source code for ibllib.pipes.mesoscope_tasks

"""The mesoscope data extraction pipeline.

The mesoscope pipeline currently comprises raw image registration and timestamps extraction.  In
the future there will be compression (and potential cropping), FOV metadata extraction, and ROI
extraction.

Pipeline:
    1. Register reference images and upload snapshots and notes to Alyx
    2. Run ROI cell detection
    3. Calculate the pixel and ROI brain locations and register fields of view to Alyx
    4. Compress the raw imaging data
    5. Extract the imaging times from the main DAQ
"""
import json
import logging
import subprocess
import shutil
import uuid
from pathlib import Path
from itertools import chain, groupby
from collections import Counter
from fnmatch import fnmatch
import enum
import re
import time

import numba as nb
import numpy as np
import pandas as pd
import sparse
from scipy.io import loadmat
from scipy.interpolate import interpn
import one.alf.io as alfio
from one.alf.spec import to_alf
from one.alf.path import filename_parts, session_path_parts
import one.alf.exceptions as alferr
from iblutil.util import flatten, ensure_list
from iblatlas.atlas import ALLEN_CCF_LANDMARKS_MLAPDV_UM, MRITorontoAtlas

from ibllib.pipes import base_tasks
from ibllib.oneibl.data_handlers import ExpectedDataset, dataset_from_name
from ibllib.io.extractors import mesoscope


_logger = logging.getLogger(__name__)
Provenance = enum.Enum('Provenance', ['ESTIMATE', 'FUNCTIONAL', 'LANDMARK', 'HISTOLOGY'])  # py3.11 make StrEnum


[docs] class MesoscopeRegisterSnapshots(base_tasks.MesoscopeTask, base_tasks.RegisterRawDataTask): """Upload snapshots as Alyx notes and register the 2P reference image(s).""" priority = 100 job_size = 'small' @property def signature(self): I = ExpectedDataset.input # noqa signature = { 'input_files': [I('referenceImage.raw.tif', f'{self.device_collection}/reference', False, register=True), I('referenceImage.stack.tif', f'{self.device_collection}/reference', False, register=True), I('referenceImage.meta.json', f'{self.device_collection}/reference', False, register=True)], 'output_files': [] } return signature def __init__(self, session_path, **kwargs): super().__init__(session_path, **kwargs) self.device_collection = self.get_device_collection('mesoscope', kwargs.get('device_collection', 'raw_imaging_data_??')) def _run(self): """ Assert one reference image per collection and rename it. Register snapshots. Returns ------- list of pathlib.Path containing renamed reference image. """ # Assert that only one tif file exists per collection dsets = dataset_from_name('referenceImage.raw.tif', self.input_files) reference_images = list(chain.from_iterable(map(lambda x: x.find_files(self.session_path)[1], dsets))) assert len(set(x.parent for x in reference_images)) == len(reference_images) # Rename the reference images out_files = super()._run() # Register snapshots in base session folder and raw_imaging_data folders self.register_snapshots(collection=[self.device_collection, '']) return out_files
[docs] class MesoscopeCompress(base_tasks.MesoscopeTask): """ Tar compress raw 2p tif files, optionally remove uncompressed data.""" priority = 90 job_size = 'large' _log_level = None @property def signature(self): signature = { 'input_files': [('*.tif', self.device_collection, True)], 'output_files': [('imaging.frames.tar.bz2', self.device_collection, True)] } return signature
[docs] def setUp(self, **kwargs): """Run at higher log level""" self._log_level = _logger.level _logger.setLevel(logging.DEBUG) return super().setUp(**kwargs)
[docs] def tearDown(self): _logger.setLevel(self._log_level or logging.INFO) return super().tearDown()
def _run(self, remove_uncompressed=False, verify_output=True, overwrite=False, **kwargs): """ Run tar compression on all tif files in the device collection. Parameters ---------- remove_uncompressed: bool Whether to remove the original, uncompressed data. Default is False. verify_output: bool Whether to check that the compressed tar file can be uncompressed without errors. Default is True. Returns ------- list of pathlib.Path Path to compressed tar file. """ outfiles = [] # should be one per raw_imaging_data folder assert not any(x.operator for x in self.input_files), 'input datasets should not be nested' _, all_tifs, _ = zip(*(x.find_files(self.session_path) for x in self.input_files)) # A list of tifs, grouped by raw imaging data collection input_files = groupby(chain.from_iterable(all_tifs), key=lambda x: x.parent) *_, outfile_name = self.output_files[0].identifiers for in_dir, infiles in input_files: infiles = list(infiles) outfile = in_dir / outfile_name if outfile.exists() and not overwrite: _logger.info('%s already exists; skipping...', outfile.relative_to(self.session_path)) continue if not infiles: _logger.info('No image files found in %s', in_dir.relative_to(self.session_path)) continue _logger.debug( 'Input files:\n\t%s', '\n\t'.join(map(Path.as_posix, (x.relative_to(self.session_path) for x in infiles))) ) uncompressed_size = sum(x.stat().st_size for x in infiles) _logger.info('Compressing %i file(s)', len(infiles)) cmd = 'tar -cjvf "{output}" "{input}"'.format( output=outfile.relative_to(in_dir), input='" "'.join(str(x.relative_to(in_dir)) for x in infiles)) _logger.debug(cmd) process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=in_dir) info, error = process.communicate() # b'2023-02-17_2_test_2P_00001_00001.tif\n' _logger.debug(info.decode()) assert process.returncode == 0, f'compression failed: {error.decode()}' # Check the output assert outfile.exists(), 'output file missing' outfiles.append(outfile) compressed_size = outfile.stat().st_size min_size = kwargs.pop('verify_min_size', 1024) assert compressed_size > int(min_size), f'Compressed file < {min_size / 1024:.0f}KB' _logger.info('Compression ratio = %.3f, saving %.2f pct (%.2f MB)', uncompressed_size / compressed_size, round((1 - (compressed_size / uncompressed_size)) * 10000) / 100, (uncompressed_size - compressed_size) / 1024 / 1024) if verify_output: # Test bzip cmd = f'bzip2 -tv {outfile.relative_to(in_dir)}' process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=in_dir) info, error = process.communicate() _logger.debug(info.decode()) assert process.returncode == 0, f'bzip compression test failed: {error}' # Check tar cmd = f'bunzip2 -dc {outfile.relative_to(in_dir)} | tar -tvf -' process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=in_dir) info, error = process.communicate() _logger.debug(info.decode()) assert process.returncode == 0, 'tarball decompression test failed' compressed_files = set(x.split()[-1] for x in filter(None, info.decode().split('\n'))) assert compressed_files == set(x.name for x in infiles) if remove_uncompressed: _logger.info(f'Removing input files for {in_dir.relative_to(self.session_path)}') for file in infiles: file.unlink() return outfiles
[docs] class MesoscopePreprocess(base_tasks.MesoscopeTask): """Run suite2p preprocessing on tif files.""" priority = 80 cpu = -1 job_size = 'large' env = 'suite2p' def __init__(self, *args, **kwargs): self._teardown_files = [] self.overwrite = False super().__init__(*args, **kwargs)
[docs] def setUp(self, **kwargs): """Set up task. This will check the local filesystem for the raw tif files and if not present, will assume they have been compressed and deleted, in which case the signature will be replaced with the compressed input. Note: this will not work correctly if only some collections have compressed tifs. """ self.overwrite = kwargs.get('overwrite', False) all_files_present = super().setUp(**kwargs) # Ensure files present bin_sig = dataset_from_name('data.bin', self.input_files) if not self.overwrite and all(x.find_files(self.session_path)[0] for x in bin_sig): return all_files_present # We have local bin files; no need to extract tifs tif_sig = dataset_from_name('*.tif', self.input_files) if not tif_sig: return all_files_present # No tifs in the signature; just return tif_sig = tif_sig[0] tifs_present, *_ = tif_sig.find_files(self.session_path) if tifs_present or not all_files_present: return all_files_present # Tifs present on disk; no need to decompress # Decompress imaging files tif_sigs = dataset_from_name('imaging.frames.tar.bz2', self.input_files) present, files, _ = zip(*(x.find_files(self.session_path) for x in tif_sigs)) if not all(present): return False # Compressed files missing; return files = flatten(files) _logger.info('Decompressing %i file(s)', len(files)) for file in files: cmd = 'tar -xvjf "{input}"'.format(input=file.name) _logger.debug(cmd) process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=file.parent) stdout, _ = process.communicate() # b'x 2023-02-17_2_test_2P_00001_00001.tif\n' _logger.debug(stdout.decode()) tifs = [file.parent.joinpath(x.split()[-1]) for x in stdout.decode().splitlines() if x.endswith('.tif')] assert process.returncode == 0 and len(tifs) > 0 assert all(map(Path.exists, tifs)) self._teardown_files.extend(tifs) return all_files_present
[docs] def tearDown(self): """Tear down task. This removes any decompressed tif files. """ for file in self._teardown_files: _logger.debug('Removing %s', file) file.unlink() return super().tearDown()
@property def signature(self): # The number of in and outputs will be dependent on the number of input raw imaging folders and output FOVs I = ExpectedDataset.input # noqa signature = { 'input_files': [('_ibl_rawImagingData.meta.json', self.device_collection, True), I('*.tif', self.device_collection, True) | I('imaging.frames.tar.bz2', self.device_collection, True, unique=False), ('exptQC.mat', self.device_collection, False)], 'output_files': [('mpci.ROIActivityF.npy', 'alf/FOV*', True), ('mpci.ROINeuropilActivityF.npy', 'alf/FOV*', True), ('mpci.ROIActivityDeconvolved.npy', 'alf/FOV*', True), ('mpci.badFrames.npy', 'alf/FOV*', True), ('mpci.mpciFrameQC.npy', 'alf/FOV*', True), ('mpciFrameQC.names.tsv', 'alf/FOV*', True), ('mpciMeanImage.images.npy', 'alf/FOV*', True), ('mpciROIs.stackPos.npy', 'alf/FOV*', True), ('mpciROIs.mpciROITypes.npy', 'alf/FOV*', True), ('mpciROIs.cellClassifier.npy', 'alf/FOV*', True), ('mpciROIs.uuids.csv', 'alf/FOV*', True), ('mpciROITypes.names.tsv', 'alf/FOV*', True), ('mpciROIs.masks.npy', 'alf/FOV*', True), ('mpciROIs.neuropilMasks.npy', 'alf/FOV*', True), ('_suite2p_ROIData.raw.zip', self.device_collection, False)] } if not self.overwrite: # If not forcing re-registration, check whether bin files already exist on disk # Including the data.bin in the expected signature ensures raw data files are not needlessly re-downloaded # and/or uncompressed during task setup as the local data.bin may be used instead registered_bin = I('data.bin', 'raw_bin_files/plane*', True, unique=False) signature['input_files'][1] = registered_bin | signature['input_files'][1] return signature @staticmethod def _masks2sparse(stat, ops): """ Extract 3D sparse mask arrays from suit2p output. Parameters ---------- stat : numpy.array The loaded stat.npy file. A structured array with fields ('lam', 'ypix', 'xpix', 'neuropil_mask'). ops : numpy.array The loaded ops.npy file. A structured array with fields ('Ly', 'Lx'). Returns ------- sparse.GCXS A pydata sparse array of type float32, representing the ROI masks. sparse.GCXS A pydata sparse array of type float32, representing the neuropil ROI masks. Notes ----- Save using sparse.save_npz. """ shape = (stat.shape[0], ops['Ly'], ops['Lx']) npx = np.prod(shape[1:]) # Number of pixels per time point coords = [[], [], []] data = [] pil_coords = [] for i, s in enumerate(stat): coords[0].append(np.full(s['ypix'].shape, i)) coords[1].append(s['ypix']) coords[2].append(s['xpix']) data.append(s['lam']) pil_coords.append(s['neuropil_mask'] + i * npx) roi_mask_sp = sparse.COO(list(map(np.concatenate, coords)), np.concatenate(data), shape=shape) pil_mask_sp = sparse.COO(np.unravel_index(np.concatenate(pil_coords), shape), True, shape=shape) return sparse.GCXS.from_coo(roi_mask_sp), sparse.GCXS.from_coo(pil_mask_sp) def _rename_outputs(self, suite2p_dir, frameQC_names, frameQC, rename_dict=None): """ Convert suite2p output files to ALF datasets. This also moves any data.bin and ops.npy files to raw_bin_files for quicker re-runs. Parameters ---------- suite2p_dir : pathlib.Path The location of the suite2p output (typically session_path/suite2p). rename_dict : dict or None The suite2p output filenames and the corresponding ALF name. NB: These files are saved after transposition. Default is None, i.e. using the default mapping hardcoded in the function below. Returns ------- list of pathlib.Path All paths found in FOV folders. """ if rename_dict is None: rename_dict = { 'F.npy': 'mpci.ROIActivityF.npy', 'spks.npy': 'mpci.ROIActivityDeconvolved.npy', 'Fneu.npy': 'mpci.ROINeuropilActivityF.npy' } for plane_dir in self._get_plane_paths(suite2p_dir): # Move bin file(s) out of the way bin_files = list(plane_dir.glob('data*.bin')) # e.g. data.bin, data_raw.bin, data_chan2_raw.bin if any(bin_files): (bin_files_dir := self.session_path.joinpath('raw_bin_files', plane_dir.name)).mkdir(parents=True, exist_ok=True) _logger.debug('Moving bin file(s) to %s', bin_files_dir.relative_to(self.session_path)) for bin_file in bin_files: dst = bin_files_dir.joinpath(bin_file.name) bin_file.replace(dst) # copy ops file for lazy re-runs shutil.copy(plane_dir.joinpath('ops.npy'), bin_files_dir.joinpath('ops.npy')) # Archive the raw suite2p output before renaming n = int(plane_dir.name.split('plane')[1]) fov_dir = self.session_path.joinpath('alf', f'FOV_{n:02}') if fov_dir.exists(): _logger.debug('Removing old folder %s', fov_dir.relative_to(self.session_path)) shutil.rmtree(str(fov_dir), ignore_errors=False, onerror=None) prev_level = _logger.level _logger.setLevel(logging.WARNING) shutil.make_archive(str(fov_dir / '_suite2p_ROIData.raw'), 'zip', plane_dir, logger=_logger) _logger.setLevel(prev_level) # save frameQC in each dir (for now, maybe there will be fov specific frame QC eventually) if frameQC is not None and len(frameQC) > 0: np.save(fov_dir.joinpath('mpci.mpciFrameQC.npy'), frameQC) frameQC_names.to_csv(fov_dir.joinpath('mpciFrameQC.names.tsv'), sep='\t', index=False) # extract some other data from suite2p outputs ops = np.load(plane_dir.joinpath('ops.npy'), allow_pickle=True).item() stat = np.load(plane_dir.joinpath('stat.npy'), allow_pickle=True) iscell = np.load(plane_dir.joinpath('iscell.npy')) # Save suite2p ROI activity outputs in transposed from (n_frames, n_ROI) for k, v in rename_dict.items(): np.save(fov_dir.joinpath(v), np.load(plane_dir.joinpath(k)).T) np.save(fov_dir.joinpath('mpci.badFrames.npy'), np.asarray(ops['badframes'], dtype=bool)) np.save(fov_dir.joinpath('mpciMeanImage.images.npy'), np.asarray(ops['meanImg'], dtype=float)) np.save(fov_dir.joinpath('mpciROIs.stackPos.npy'), np.asarray([(*s['med'], 0) for s in stat], dtype=int)) np.save(fov_dir.joinpath('mpciROIs.cellClassifier.npy'), np.asarray(iscell[:, 1], dtype=float)) np.save(fov_dir.joinpath('mpciROIs.mpciROITypes.npy'), np.asarray(iscell[:, 0], dtype=np.int16)) # clusters uuids uuid_list = ['uuids'] + list(map(str, [uuid.uuid4() for _ in range(len(iscell))])) with open(fov_dir.joinpath('mpciROIs.uuids.csv'), 'w+') as fid: fid.write('\n'.join(uuid_list)) (pd.DataFrame([(0, 'no cell'), (1, 'cell')], columns=['roi_values', 'roi_labels']) .to_csv(fov_dir.joinpath('mpciROITypes.names.tsv'), sep='\t', index=False)) # ROI and neuropil masks roi_mask, pil_mask = self._masks2sparse(stat, ops) with open(fov_dir.joinpath('mpciROIs.masks.sparse_npz'), 'wb') as fp: sparse.save_npz(fp, roi_mask) with open(fov_dir.joinpath('mpciROIs.neuropilMasks.sparse_npz'), 'wb') as fp: sparse.save_npz(fp, pil_mask) # Remove old suite2p files shutil.rmtree(str(suite2p_dir), ignore_errors=False, onerror=None) # Collect all files in those directories return sorted(self.session_path.joinpath('alf').rglob('FOV_??/*.*.*'))
[docs] def load_meta_files(self): """Load the extracted imaging metadata files. Loads and consolidates the imaging data metadata from rawImagingData.meta.json files. These files contain ScanImage metadata extracted from the raw tiff headers by the function `mesoscopeMetadataExtraction.m` in iblscripts/deploy/mesoscope. Returns ------- dict Single, consolidated dictionary containing metadata. list of dict The meta data for each individual imaging bout. """ # Load metadata and make sure all metadata is consistent across FOVs meta_files = sorted(self.session_path.glob(f'{self.device_collection}/*rawImagingData.meta.*')) collections = sorted(set(f.parts[-2] for f in meta_files)) # Check there is exactly 1 meta file per collection assert len(meta_files) == len(list(self.session_path.glob(self.device_collection))) == len(collections) raw_meta = map(alfio.load_file_content, meta_files) all_meta = list(map(mesoscope.patch_imaging_meta, raw_meta)) return self._consolidate_metadata(all_meta) if len(all_meta) > 1 else all_meta[0], all_meta
@staticmethod def _consolidate_metadata(meta_data_all: list) -> dict: """ Check that the metadata is consistent across all raw imaging folders. Parameters ---------- meta_data_all: list of dicts List of metadata dictionaries to be checked for consistency. Returns ------- dict Single, consolidated dictionary containing metadata. """ # Ignore the things we don't expect to match ignore = ('acquisitionStartTime', 'nFrames') ignore_sub = {'rawScanImageMeta': ('ImageDescription', 'Software')} def equal_dicts(a, b, skip=None): ka = set(a).difference(skip or ()) kb = set(b).difference(skip or ()) return ka == kb and all(a[key] == b[key] for key in ka) # Compare each dict with the first one in the list for meta in meta_data_all[1:]: if meta != meta_data_all[0]: # compare entire object first for k, v in meta_data_all[0].items(): # check key by key if not (equal_dicts(v, meta[k], ignore_sub[k]) # compare sub-dicts... if k in ignore_sub else # ... if we have keys to ignore in test not (k in ignore or v == meta[k])): _logger.warning(f'Mismatch in meta data between raw_imaging_data folders for key {k}. ' f'Using meta_data from first folder!') return meta_data_all[0] @staticmethod def _consolidate_exptQC(exptQC): """ Consolidate exptQC.mat files into a single file. Parameters ---------- exptQC : list of pandas.DataFrame The loaded 'exptQC.mat' files as squeezed and simplified data frames, with columns {'frameQC_frames', 'frameQC_names'}. Returns ------- numpy.array An array of uint8 where 0 indicates good frames, and other values correspond to experimenter-defined issues (in 'qc_values' column of output data frame). pandas.DataFrame A data frame with columns {'qc_values', 'qc_labels'}, the former an unsigned int corresponding to a QC code; the latter a human-readable QC explanation. numpy.array An array of frame indices where QC code != 0. """ # Create a new enumeration combining all unique QC labels. # 'ok' will always have an enum of 0, the rest are determined by order alone qc_labels = ['ok'] frame_qc = [] for e in exptQC: assert e.keys() >= set(['frameQC_names', 'frameQC_frames']) # Initialize an NaN array the same size of frameQC_frames to fill with new enum values frames = np.full(e['frameQC_frames'].shape, fill_value=np.nan) # May be numpy array of str or a single str, in both cases we cast to list of str names = list(ensure_list(e['frameQC_names'])) # For each label for the old enum, populate initialized array with the new one for name in names: i_old = names.index(name) # old enumeration name = name if len(name) else 'unknown' # handle empty array and empty str try: i_new = qc_labels.index(name) except ValueError: i_new = len(qc_labels) qc_labels.append(name) frames[e['frameQC_frames'] == i_old] = i_new frame_qc.append(frames) # Concatenate frames frame_qc = np.concatenate(frame_qc) # If any NaNs left over, assign 'unknown' label if (missing_name := np.isnan(frame_qc)).any(): try: i = qc_labels.index('unknown') except ValueError: i = len(qc_labels) qc_labels.append('unknown') frame_qc[missing_name] = i frame_qc = frame_qc.astype(np.uint32) # case to uint bad_frames, = np.where(frame_qc != 0) # Convert labels to value -> label data frame frame_qc_names = pd.DataFrame(list(enumerate(qc_labels)), columns=['qc_values', 'qc_labels']) return frame_qc, frame_qc_names, bad_frames
[docs] def get_default_tau(self): """ Determine the tau (fluorescence decay) from the subject's genotype. Returns ------- float The tau value to use. See Also -------- https://suite2p.readthedocs.io/en/latest/settings.html """ # These settings are from the suite2P documentation TAU_MAP = {'G6s': 1.5, 'G6m': 1., 'G6f': .7, 'default': 1.5} _, subject, *_ = session_path_parts(self.session_path) genotype = self.one.alyx.rest('subjects', 'read', id=subject)['genotype'] match = next(filter(None, (re.match(r'.+-(G\d[fms])$', g['allele']) for g in genotype)), None) key = match.groups()[0] if match else 'default' return TAU_MAP.get(key, TAU_MAP['default'])
def _meta2ops(self, meta): """ Create the ops dictionary for suite2p based on metadata. Parameters ---------- meta: dict Imaging metadata. Returns ------- dict Inputs to suite2p run that deviate from default parameters. """ # Computing dx and dy cXY = np.array([fov['Deg']['topLeft'] for fov in meta['FOV']]) cXY -= np.min(cXY, axis=0) nXnYnZ = np.array([fov['nXnYnZ'] for fov in meta['FOV']]) # Currently supporting z-stacks but not supporting dual plane / volumetric imaging, assert that this is not the case if np.any(nXnYnZ[:, 2] > 1): raise NotImplementedError('Dual-plane imaging not yet supported, data seems to more than one plane per FOV') sW = np.sqrt(np.sum((np.array([fov['Deg']['topRight'] for fov in meta['FOV']]) - np.array( [fov['Deg']['topLeft'] for fov in meta['FOV']])) ** 2, axis=1)) sH = np.sqrt(np.sum((np.array([fov['Deg']['bottomLeft'] for fov in meta['FOV']]) - np.array( [fov['Deg']['topLeft'] for fov in meta['FOV']])) ** 2, axis=1)) pixSizeX = nXnYnZ[:, 0] / sW pixSizeY = nXnYnZ[:, 1] / sH dx = np.round(cXY[:, 0] * pixSizeX).astype(dtype=np.int32) dy = np.round(cXY[:, 1] * pixSizeY).astype(dtype=np.int32) nchannels = len(meta['channelSaved']) if isinstance(meta['channelSaved'], list) else 1 # Computing number of unique z-planes (slices in tiff) # FIXME this should work if all FOVs are discrete or if all FOVs are continuous, but may not work for combination of both slice_ids = [fov['slice_id'] for fov in meta['FOV']] nplanes = len(set(slice_ids)) # Figuring out how many SI Rois we have (one unique ROI may have several FOVs) # FIXME currently unused # roiUUIDs = np.array([fov['roiUUID'] for fov in meta['FOV']]) # nrois = len(np.unique(roiUUIDs)) db = { 'data_path': sorted(map(str, self.session_path.glob(f'{self.device_collection}'))), 'save_path0': str(self.session_path), 'fast_disk': '', # TODO 'look_one_level_down': False, # don't look in the children folders as that is where the reference data is 'num_workers': self.cpu, # this selects number of cores to parallelize over for the registration step 'num_workers_roi': -1, # for parallelization over FOVs during cell detection, for now don't 'keep_movie_raw': False, 'delete_bin': False, # TODO: delete this on the long run 'batch_size': 500, # SP reduced this from 1000 'nimg_init': 400, 'combined': False, 'nonrigid': True, 'maxregshift': 0.05, # default = 1 'denoise': 1, # whether binned movie should be denoised before cell detection 'block_size': [128, 128], 'save_mat': True, # save the data to Fall.mat 'move_bin': True, # move the binary file to save_path 'mesoscan': True, 'nplanes': nplanes, 'nrois': len(meta['FOV']), 'nchannels': nchannels, 'fs': meta['scanImageParams']['hRoiManager']['scanVolumeRate'], 'lines': [list(np.asarray(fov['lineIdx']) - 1) for fov in meta['FOV']], # subtracting 1 to make 0-based 'slices': slice_ids, # this tells us which FOV corresponds to which tiff slices 'tau': self.get_default_tau(), # deduce the GCamp used from Alyx mouse line (defaults to 1.5; that of GCaMP6s) 'functional_chan': 1, # for now, eventually find(ismember(meta.channelSaved == meta.channelID.green)) 'align_by_chan': 1, # for now, eventually find(ismember(meta.channelSaved == meta.channelID.red)) 'dx': dx.tolist(), 'dy': dy.tolist() } return db @staticmethod def _get_plane_paths(path): """Return list of sorted suite2p plane folder paths. Parameters ---------- path : pathlib.Path The path containing plane folders. Returns ------- list of pathlib.Path The plane folder paths, ordered by number. """ pattern = re.compile(r'(?<=^plane)\d+$') return sorted(path.glob('plane?*'), key=lambda x: int(pattern.search(x.name).group()))
[docs] def bin_per_plane(self, metadata, **kwargs): """ Extracts a binary data file of imaging data per imaging plane. Parameters ---------- metadata : dict A dictionary of extracted metadata. save_path0 : str, pathlib.Path The root path of the suite2p bin output. save_folder : str The subfolder within `save_path0` to save the suite2p bin output. kwargs Other optional arguments to overwrite the defaults for. Returns ------- list of pathlib.Path Ordered list of output plane folders containing binary data and ops. dict Suite2p's modified options. """ import suite2p.io options = ('nplanes', 'data_path', 'save_path0', 'save_folder', 'fast_disk', 'batch_size', 'nchannels', 'keep_movie_raw', 'look_one_level_down', 'lines', 'dx', 'dy', 'force_sktiff', 'do_registration') ops = self._meta2ops(metadata) ops['force_sktiff'] = False ops['do_registration'] = True ops = {k: v for k, v in ops.items() if k in options} ops.update(kwargs) ops['save_path0'] = str(ops['save_path0']) # Path objs must be str for suite2p # Update the task kwargs attribute as it will be stored in the arguments json field in alyx self.kwargs = ops.copy() ret = suite2p.io.mesoscan_to_binary(ops.copy()) # Get ordered list of plane folders out_path = Path(ret['save_path0'], ret['save_folder']) assert out_path.exists() planes = self._get_plane_paths(out_path) assert len(planes) == ret['nplanes'] return planes, ret
[docs] def image_motion_registration(self, ops): """Perform motion registration. Parameters ---------- ops : dict A dict of suite2p options. Returns ------- dict A dictionary of registration metrics: "regDX" is nPC x 3, where X[:,0] is rigid, X[:,1] is average nonrigid, X[:,2] is max nonrigid shifts; "regPC" is average of top and bottom frames for each PC; "tPC" is PC across time frames; "reg_metrics_avg" is the average of "regDX"; "reg_metrics_max" is the maximum of "regDX". """ import suite2p ops['do_registration'] = True ops['do_regmetrics'] = True ops['roidetect'] = False ret = suite2p.run_plane(ops) metrics = {k: ret.get(k, None) for k in ('regDX', 'regPC', 'tPC')} has_metrics = ops['do_regmetrics'] and metrics['regDX'] is not None metrics['reg_metrics_avg'] = np.mean(metrics['regDX'], axis=0) if has_metrics else None metrics['reg_metrics_max'] = np.max(metrics['regDX'], axis=0) if has_metrics else None return metrics
[docs] def roi_detection(self, ops): """Perform ROI detection. Parameters ---------- ops : dict A dict of suite2p options. Returns ------- dict An updated copy of the ops after running ROI detection. """ import suite2p ops['do_registration'] = False ops['roidetect'] = True ret = suite2p.run_plane(ops) return ret
def _run(self, rename_files=True, use_badframes=True, **kwargs): """ Process inputs, run suite2p and make outputs alf compatible. The suite2p processing takes place in a 'suite2p' folder within the session path. After running, the data.bin files are moved to 'raw_bin_files' and the rest of the folder is zipped up and moved to 'alf/ Parameters ---------- rename_files: bool Whether to rename and reorganize the suite2p outputs to be alf compatible. Defaults is True. use_badframes: bool Whether to exclude bad frames indicated by the experimenter in badframes.mat. overwrite : bool Whether to re-perform extraction and motion registration. Returns ------- list of pathlib.Path All files created by the task. """ """ Metadata and parameters """ overwrite = kwargs.pop('overwrite', self.overwrite) # Load and consolidate the image metadata from JSON files metadata, all_meta = self.load_meta_files() # Create suite2p output folder in raw imaging data folder raw_image_collections = sorted(self.session_path.glob(f'{self.device_collection}')) save_path = self.session_path.joinpath(save_folder := 'suite2p') # Check for previous intermediate files plane_folders = self._get_plane_paths(save_path) if len(plane_folders) == 0 and self.session_path.joinpath('raw_bin_files').exists(): self.session_path.joinpath('raw_bin_files').replace(save_path) plane_folders = self._get_plane_paths(save_path) if len(plane_folders) == 0 or overwrite: _logger.info('Extracting tif data per plane') # Ingest tiff files plane_folders, _ = self.bin_per_plane(metadata, save_folder=save_folder, save_path0=self.session_path) """ Bad frames """ # exptQC.mat contains experimenter QC values that may not affect ROI detection (e.g. noises, pauses) qc_datasets = dataset_from_name('exptQC.mat', self.input_files) qc_paths = [next(self.session_path.glob(d.glob_pattern), None) for d in qc_datasets] qc_paths = sorted(map(str, filter(None, qc_paths))) exptQC = [loadmat(p, squeeze_me=True, simplify_cells=True) for p in qc_paths] if len(exptQC) > 0: frameQC, frameQC_names, _ = self._consolidate_exptQC(exptQC) else: _logger.warning('No frame QC (exptQC.mat) files found.') frameQC = np.array([], dtype='u1') frameQC_names = pd.DataFrame(columns=['qc_values', 'qc_labels']) # If applicable, save as bad_frames.npy in first raw_imaging_folder for suite2p # badframes.mat contains QC values that do affect ROI detection (e.g. no PMT, lens artefacts) badframes = np.array([], dtype='uint32') total_frames = 0 # Ensure all indices are relative to total cumulative frames for m, collection in zip(all_meta, raw_image_collections): badframes_path = self.session_path.joinpath(collection, 'badframes.mat') if badframes_path.exists(): raw_mat = loadmat(badframes_path, squeeze_me=True, simplify_cells=True) badframes = np.r_[badframes, raw_mat['badframes'].astype('uint32') + total_frames] total_frames += m['nFrames'] if len(badframes) > 0 and use_badframes is True: # The badframes array should always be a subset of the frameQC array assert np.max(badframes) < frameQC.size and np.all(frameQC[badframes]) np.save(raw_image_collections[0].joinpath('bad_frames.npy'), badframes) """ Suite2p """ # Create alf if is doesn't exist self.session_path.joinpath('alf').mkdir(exist_ok=True) # Perform registration _logger.info('Performing registration') for plane in plane_folders: ops = np.load(plane.joinpath('ops.npy'), allow_pickle=True).item() ops.update(kwargs) # (ops['do_registration'], ops['reg_file'], ops['meanImg']) _ = self.image_motion_registration(ops) # TODO Handle metrics and QC here # ROI detection _logger.info('Performing ROI detection') for plane in plane_folders: ops = np.load(plane.joinpath('ops.npy'), allow_pickle=True).item() ops.update(kwargs) self.roi_detection(ops) """ Outputs """ # Save and rename other outputs if rename_files: self._rename_outputs(save_path, frameQC_names, frameQC) # Only return output file that are in the signature (for registration) out_files = chain.from_iterable(map(lambda x: x.find_files(self.session_path)[1], self.output_files)) else: out_files = save_path.rglob('*.*') return list(out_files)
[docs] class MesoscopeSync(base_tasks.MesoscopeTask): """Extract the frame times from the main DAQ.""" priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_{self.sync_namespace}_DAQdata.raw.npy', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.timestamps.npy', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.meta.json', self.sync_collection, True), ('_ibl_rawImagingData.meta.json', self.device_collection, True), ('rawImagingData.times_scanImage.npy', self.device_collection, True, True), # register raw (f'_{self.sync_namespace}_softwareEvents.log.htsv', self.sync_collection, False), ], 'output_files': [('mpci.times.npy', 'alf/mesoscope/FOV*', True), ('mpciStack.timeshift.npy', 'alf/mesoscope/FOV*', True),] } return signature def _run(self): """ Extract the imaging times for all FOVs. Returns ------- list of pathlib.Path Files containing frame timestamps for individual FOVs and time offsets for each line scan. """ # TODO function to determine nFOVs try: alf_path = self.session_path / self.sync_collection events = alfio.load_object(alf_path, 'softwareEvents').get('log') except alferr.ALFObjectNotFound: events = None if events is None or events.empty: _logger.debug('No software events found for session %s', self.session_path) all_collections = flatten(map(lambda x: x.identifiers, self.input_files))[::3] collections = set(filter(lambda x: fnmatch(x, self.device_collection), all_collections)) # Load first meta data file to determine the number of FOVs # Changing FOV between imaging bouts is not supported currently! self.rawImagingData = alfio.load_object(self.session_path / next(iter(collections)), 'rawImagingData') self.rawImagingData['meta'] = mesoscope.patch_imaging_meta(self.rawImagingData['meta']) n_FOVs = len(self.rawImagingData['meta']['FOV']) sync, chmap = self.load_sync() # Extract sync data from raw DAQ data mesosync = mesoscope.MesoscopeSyncTimeline(self.session_path, n_FOVs) _, out_files = mesosync.extract( save=True, sync=sync, chmap=chmap, device_collection=collections, events=events) return out_files
[docs] class MesoscopeFOV(base_tasks.MesoscopeTask): """Create FOV and FOV location objects in Alyx from metadata.""" priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [('_ibl_rawImagingData.meta.json', self.device_collection, True), ('mpciROIs.stackPos.npy', 'alf/FOV*', True)], 'output_files': [('mpciMeanImage.brainLocationIds*.npy', 'alf/FOV_*', True), ('mpciMeanImage.mlapdv*.npy', 'alf/FOV_*', True), ('mpciROIs.mlapdv*.npy', 'alf/FOV_*', True), ('mpciROIs.brainLocationIds*.npy', 'alf/FOV_*', True), ('_ibl_rawImagingData.meta.json', self.device_collection, True)] } return signature def _run(self, *args, provenance=Provenance.ESTIMATE, **kwargs): """ Register fields of view (FOV) to Alyx and extract the coordinates and IDs of each ROI. Steps: 1. Save the mpciMeanImage.brainLocationIds_estimate and mlapdv datasets. 2. Use mean image coordinates and ROI stack position datasets to extract brain location of each ROI. 3. Register the location of each FOV in Alyx. Parameters ---------- provenance : Provenance The provenance of the coordinates in the meta file. For all but 'HISTOLOGY', the provenance is added as a dataset suffix. Defaults to ESTIMATE. Returns ------- dict The newly created FOV Alyx record. list The newly created FOV location Alyx records. Notes ----- - Once the FOVs have been registered they cannot be updated with this task. Rerunning this task will result in an error. - This task modifies the first meta JSON file. All meta files are registered by this task. """ # Load necessary data (filename, collection, _), *_ = self.signature['input_files'] meta_files = sorted(self.session_path.glob(f'{collection}/{filename}')) meta = mesoscope.patch_imaging_meta(alfio.load_file_content(meta_files[0]) or {}) nFOV = len(meta.get('FOV', [])) suffix = None if provenance is Provenance.HISTOLOGY else provenance.name.lower() _logger.info('Extracting %s MLAPDV datasets', suffix or 'final') # Extract mean image MLAPDV coordinates and brain location IDs mean_image_mlapdv, mean_image_ids = self.project_mlapdv(meta) # Save the meta data file with new coordinate fields with open(meta_files[0], 'w') as fp: json.dump(meta, fp) # Save the mean image datasets mean_image_files = [] assert set(mean_image_mlapdv.keys()) == set(mean_image_ids.keys()) and len(mean_image_ids) == nFOV for i in range(nFOV): alf_path = self.session_path.joinpath('alf', f'FOV_{i:02}') for attr, arr, sfx in (('mlapdv', mean_image_mlapdv[i], suffix), ('brainLocationIds', mean_image_ids[i], ('ccf', '2017', suffix))): mean_image_files.append(alf_path / to_alf('mpciMeanImage', attr, 'npy', timescale=sfx)) mean_image_files[-1].parent.mkdir(parents=True, exist_ok=True) np.save(mean_image_files[-1], arr) # Extract ROI MLAPDV coordinates and brain location IDs roi_mlapdv, roi_brain_ids = self.roi_mlapdv(nFOV, suffix=suffix) # Write MLAPDV + brain location ID of ROIs to disk roi_files = [] assert set(roi_mlapdv.keys()) == set(roi_brain_ids.keys()) and len(roi_mlapdv) == nFOV for i in range(nFOV): alf_path = self.session_path.joinpath('alf', f'FOV_{i:02}') for attr, arr, sfx in (('mlapdv', roi_mlapdv[i], suffix), ('brainLocationIds', roi_brain_ids[i], ('ccf', '2017', suffix))): roi_files.append(alf_path / to_alf('mpciROIs', attr, 'npy', timescale=sfx)) np.save(roi_files[-1], arr) # Register FOVs in Alyx self.register_fov(meta, suffix) return sorted([*meta_files, *roi_files, *mean_image_files])
[docs] def update_surgery_json(self, meta, normal_vector): """ Update surgery JSON with surface normal vector. Adds the key 'surface_normal_unit_vector' to the most recent surgery JSON, containing the provided three element vector. The recorded craniotomy center must match the coordinates in the provided meta file. Parameters ---------- meta : dict The imaging meta data file containing the 'centerMM' key. normal_vector : array_like A three element unit vector normal to the surface of the craniotomy center. Returns ------- dict The updated surgery record, or None if no surgeries found. """ if not self.one or self.one.offline: _logger.warning('failed to update surgery JSON: ONE offline') return # Update subject JSON with unit normal vector of craniotomy centre (used in histology) subject = self.one.path2ref(self.session_path, parse=False)['subject'] surgeries = self.one.alyx.rest('surgeries', 'list', subject=subject, procedure='craniotomy') if not surgeries: _logger.error(f'Surgery not found for subject "{subject}"') return surgery = surgeries[0] # Check most recent surgery in list center = (meta['centerMM']['ML'], meta['centerMM']['AP']) match = (k for k, v in (surgery['json'] or {}).items() if str(k).startswith('craniotomy') and np.allclose(v['center'], center)) if (key := next(match, None)) is None: _logger.error('Failed to update surgery JSON: no matching craniotomy found') return surgery data = {key: {**surgery['json'][key], 'surface_normal_unit_vector': tuple(normal_vector)}} surgery['json'] = self.one.alyx.json_field_update('subjects', subject, data=data) return surgery
[docs] def roi_mlapdv(self, nFOV: int, suffix=None): """ Extract ROI MLAPDV coordinates and brain location IDs. MLAPDV coordinates are in um relative to bregma. Location IDs are from the 2017 Allen common coordinate framework atlas. Parameters ---------- nFOV : int The number of fields of view acquired. suffix : {None, 'estimate'} The attribute suffix of the mpciMeanImage datasets to load. If generating from estimates, the suffix should be 'estimate'. Returns ------- dict of int : numpy.array A map of field of view to ROI MLAPDV coordinates. dict of int : numpy.array A map of field of view to ROI brain location IDs. """ all_mlapdv = {} all_brain_ids = {} for n in range(nFOV): alf_path = self.session_path.joinpath('alf', f'FOV_{n:02}') # Load neuron centroids in pixel space stack_pos_file = next(alf_path.glob('mpciROIs.stackPos*'), None) if not stack_pos_file: raise FileNotFoundError(alf_path / 'mpci.stackPos*') stack_pos = alfio.load_file_content(stack_pos_file) # Load MLAPDV + brain location ID maps of pixels mpciMeanImage = alfio.load_object( alf_path, 'mpciMeanImage', attribute=['mlapdv', 'brainLocationIds']) # Get centroid MLAPDV + brainID by indexing pixel-map with centroid locations mlapdv = np.full(stack_pos.shape, np.nan) brain_ids = np.full(stack_pos.shape[0], np.nan) for i in np.arange(stack_pos.shape[0]): idx = (stack_pos[i, 0], stack_pos[i, 1]) sfx = f'_{suffix}' if suffix else '' mlapdv[i, :] = mpciMeanImage['mlapdv' + sfx][idx] brain_ids[i] = mpciMeanImage['brainLocationIds_ccf_2017' + sfx][idx] assert ~np.isnan(brain_ids).any() all_brain_ids[n] = brain_ids.astype(int) all_mlapdv[n] = mlapdv return all_mlapdv, all_brain_ids
[docs] @staticmethod def get_provenance(filename): """ Get the field of view provenance from a mpciMeanImage or mpciROIs dataset. Parameters ---------- filename : str, pathlib.Path A filename to get the provenance from. Returns ------- Provenance The provenance of the file. """ filename = Path(filename).name timescale = (filename_parts(filename)[3] or '').split('_') provenances = [i.name.lower() for i in Provenance] provenance = (Provenance[x.upper()] for x in timescale if x in provenances) return next(provenance, None) or Provenance.HISTOLOGY
[docs] def register_fov(self, meta: dict, suffix: str = None) -> (list, list): """ Create FOV on Alyx. Assumes field of view recorded perpendicular to objective. Assumes field of view is plane (negligible volume). Required Alyx fixtures: - experiments.ImagingType(name='mesoscope') - experiments.CoordinateSystem(name='IBL-Allen') Parameters ---------- meta : dict The raw imaging meta data from _ibl_rawImagingData.meta.json. suffix : str The file attribute suffixes to load from the mpciMeanImage object. Either 'estimate' or None. No suffix means the FOV location provenance will be L (Landmark). Returns ------- list of dict A list registered of field of view entries from Alyx. TODO Determine dual plane ID for JSON field """ dry = self.one is None or self.one.offline alyx_fovs = [] # Count the number of slices per stack ID: only register stacks that contain more than one slice. slice_counts = Counter(f['roiUUID'] for f in meta.get('FOV', [])) # Create a new stack in Alyx for all stacks containing more than one slice. # Map of ScanImage ROI UUID to Alyx ImageStack UUID. if dry: stack_ids = {i: uuid.uuid4() for i in slice_counts if slice_counts[i] > 1} else: stack_ids = {i: self.one.alyx.rest('imaging-stack', 'create', data={'name': i})['id'] for i in slice_counts if slice_counts[i] > 1} for i, fov in enumerate(meta.get('FOV', [])): assert set(fov.keys()) >= {'MLAPDV', 'nXnYnZ', 'roiUUID'} # Field of view alyx_FOV = { 'session': self.session_path.as_posix() if dry else self.path2eid(), 'imaging_type': 'mesoscope', 'name': f'FOV_{i:02}', 'stack': stack_ids.get(fov['roiUUID']) } if dry: print(alyx_FOV) alyx_FOV['location'] = [] alyx_fovs.append(alyx_FOV) else: alyx_fovs.append(self.one.alyx.rest('fields-of-view', 'create', data=alyx_FOV)) # Field of view location data = {'field_of_view': alyx_fovs[-1].get('id'), 'default_provenance': True, 'coordinate_system': 'IBL-Allen', 'n_xyz': fov['nXnYnZ']} if suffix: data['provenance'] = suffix[0].upper() # Convert coordinates to 4 x 3 array (n corners by n dimensions) # x1 = top left ml, y1 = top left ap, y2 = top right ap, etc. coords = [fov['MLAPDV'][key] for key in ('topLeft', 'topRight', 'bottomLeft', 'bottomRight')] coords = np.vstack(coords).T data.update({k: arr.tolist() for k, arr in zip('xyz', coords)}) # Load MLAPDV + brain location ID maps of pixels filename = 'mpciMeanImage.brainLocationIds_ccf_2017' + (f'_{suffix}' if suffix else '') + '.npy' filepath = self.session_path.joinpath('alf', f'FOV_{i:02}', filename) mean_image_ids = alfio.load_file_content(filepath) data['brain_region'] = np.unique(mean_image_ids).astype(int).tolist() if dry: print(data) alyx_FOV['location'].append(data) else: alyx_fovs[-1]['location'].append(self.one.alyx.rest('fov-location', 'create', data=data)) return alyx_fovs
[docs] def load_triangulation(self): """ Load the surface triangulation file. A triangle mesh of the smoothed convex hull of the dorsal surface of the mouse brain, generated from the 2017 Allen 10um annotation volume. This triangulation was generated in MATLAB. Returns ------- points : numpy.array An N by 3 float array of x-y vertices, defining all points of the triangle mesh. These are in um relative to the IBL bregma coordinates. connectivity_list : numpy.array An N by 3 integer array of vertex indices defining all points that form a triangle. """ fixture_path = Path(mesoscope.__file__).parent.joinpath('mesoscope') surface_triangulation = np.load(fixture_path / 'surface_triangulation.npz') points = surface_triangulation['points'].astype('f8') connectivity_list = surface_triangulation['connectivity_list'] surface_triangulation.close() return points, connectivity_list
[docs] def project_mlapdv(self, meta, atlas=None): """ Calculate the mean image pixel locations in MLAPDV coordinates and determine the brain location IDs. MLAPDV coordinates are in um relative to bregma. Location IDs are from the 2017 Allen common coordinate framework atlas. Parameters ---------- meta : dict The raw imaging data meta file, containing coordinates for the centre of each field of view. atlas : ibllib.atlas.Atlas An atlas instance. Returns ------- dict A map of FOV number (int) to mean image MLAPDV coordinates as a 2D numpy array. dict A map of FOV number (int) to mean image brain location IDs as a 2D numpy int array. """ mlapdv = {} location_id = {} # Use the MRI atlas as this applies scaling, particularly along the DV axis to (hopefully) # more accurately represent the living brain. atlas = atlas or MRITorontoAtlas(res_um=10) # The centre of the craniotomy / imaging window coord_ml = meta['centerMM']['ML'] * 1e3 # mm -> um coord_ap = meta['centerMM']['AP'] * 1e3 # mm -> um pt = np.array([coord_ml, coord_ap]) points, connectivity_list = self.load_triangulation() # Only keep faces that have normals pointing up (positive DV value). # Calculate the normal vector pointing out of the convex hull. triangles = points[connectivity_list, :] normals = surface_normal(triangles) up_faces, = np.where(normals[:, -1] > 0) # only keep triangles that have normal vector with positive DV component dorsal_connectivity_list = connectivity_list[up_faces, :] # Flatten triangulation by dropping the dorsal coordinates and find the location of the # window center (we convert mm -> um here) face_ind = find_triangle(pt * 1e-3, points[:, :2] * 1e-3, dorsal_connectivity_list.astype(np.intp)) assert face_ind != -1 dorsal_triangle = points[dorsal_connectivity_list[face_ind, :], :] # Get the surface normal unit vector of dorsal triangle normal_vector = surface_normal(dorsal_triangle) # Update the surgery JSON field with normal unit vector, for use in histology alignment self.update_surgery_json(meta, normal_vector) # find the coordDV that sits on the triangular face and had [coordML, coordAP] coordinates; # the three vertices defining the triangle face_vertices = points[dorsal_connectivity_list[face_ind, :], :] # all the vertices should be on the plane ax + by + cz = 1, so we can find # the abc coefficients by inverting the three equations for the three vertices abc, *_ = np.linalg.lstsq(face_vertices, np.ones(3), rcond=None) # and then find a point on that plane that corresponds to a given x-y # coordinate (which is ML-AP coordinate) coord_dv = (1 - pt @ abc[:2]) / abc[2] # We should not use the actual surface of the brain for this, as it might be in one of the sulci # DO NOT USE THIS: # coordDV = interp2(axisMLmm, axisAPmm, surfaceDV, coordML, coordAP) # Now we need to span the plane of the coverslip with two orthogonal unit vectors. # We start with vY, because the order is important and we usually have less # tilt along AP (pitch), which will cause less deviation in vX from pure ML. vY = np.array([0, normal_vector[2], -normal_vector[1]]) # orthogonal to the normal of the plane vX = np.cross(vY, normal_vector) # orthogonal to n and to vY # normalize and flip the sign if necessary vX = vX / np.sqrt(vX @ vX) * np.sign(vX[0]) # np.sqrt(vY @ vY) == LR norm of vX vY = vY / np.sqrt(vY @ vY) * np.sign(vY[1]) # what are the dimensions of the data arrays (ap, ml, dv) (nAP, nML, nDV) = atlas.image.shape # Let's shift the coordinates relative to bregma voxel_size = atlas.res_um # [um] resolution of the atlas bregma_coords = ALLEN_CCF_LANDMARKS_MLAPDV_UM['bregma'] / voxel_size # (ml, ap, dv) axis_ml_um = (np.arange(nML) - bregma_coords[0]) * voxel_size axis_ap_um = (np.arange(nAP) - bregma_coords[1]) * voxel_size * -1. axis_dv_um = (np.arange(nDV) - bregma_coords[2]) * voxel_size * -1. # projection of FOVs on the brain surface to get ML-AP-DV coordinates _logger.info('Projecting in 3D') for i, fov in enumerate(meta['FOV']): # i, fov = next(enumerate(meta['FOV'])) start_time = time.time() _logger.info(f'FOV {i + 1}/{len(meta["FOV"])}') y_px_idx, x_px_idx = np.mgrid[0:fov['nXnYnZ'][0], 0:fov['nXnYnZ'][1]] # xx and yy are in mm in coverslip space points = ((0, fov['nXnYnZ'][0] - 1), (0, fov['nXnYnZ'][1] - 1)) # The four corners of the FOV, determined by taking the center of the craniotomy in MM, # the x-y coordinates of the imaging window center (from the tiled reference image) in # galvanometer units, and the x-y coordinates of the FOV center in galvanometer units. values = [[fov['MM']['topLeft'][0], fov['MM']['topRight'][0]], [fov['MM']['bottomLeft'][0], fov['MM']['bottomRight'][0]]] values = np.array(values) * 1e3 # mm -> um xx = interpn(points, values, (y_px_idx, x_px_idx)) values = [[fov['MM']['topLeft'][1], fov['MM']['topRight'][1]], [fov['MM']['bottomLeft'][1], fov['MM']['bottomRight'][1]]] values = np.array(values) * 1e3 # mm -> um yy = interpn(points, values, (y_px_idx, x_px_idx)) xx = xx.flatten() - coord_ml yy = yy.flatten() - coord_ap # rotate xx and yy in 3D # the coords are still on the coverslip, but now have 3D values coords = np.outer(xx, vX) + np.outer(yy, vY) # (vX * xx) + (vY * yy) coords = coords + [coord_ml, coord_ap, coord_dv] # for each point of the FOV create a line parametrization (trajectory normal to the coverslip plane). # start just above the coverslip and go 3 mm down, should be enough to 'meet' the brain t = np.arange(-voxel_size, 3e3, voxel_size) # Find the MLAPDV atlas coordinate and brain location of each pixel. MLAPDV, annotation = _update_points( t, normal_vector, coords, axis_ml_um, axis_ap_um, axis_dv_um, atlas.label) annotation = atlas.regions.index2id(annotation) # convert annotation indices to IDs if np.any(np.isnan(MLAPDV)): _logger.warning('Areas of FOV lie outside the brain') _logger.info(f'done ({time.time() - start_time:3.1f} seconds)\n') MLAPDV = np.reshape(MLAPDV, [*x_px_idx.shape, 3]) annotation = np.reshape(annotation, x_px_idx.shape) fov['MLAPDV'] = { 'topLeft': MLAPDV[0, 0, :].tolist(), 'topRight': MLAPDV[0, -1, :].tolist(), 'bottomLeft': MLAPDV[-1, 0, :].tolist(), 'bottomRight': MLAPDV[-1, -1, :].tolist(), 'center': MLAPDV[round(x_px_idx.shape[0] / 2) - 1, round(x_px_idx.shape[1] / 2) - 1, :].tolist() } # Save the brain regions of the corners/centers of FOV (annotation field) fov['brainLocationIds'] = { 'topLeft': int(annotation[0, 0]), 'topRight': int(annotation[0, -1]), 'bottomLeft': int(annotation[-1, 0]), 'bottomRight': int(annotation[-1, -1]), 'center': int(annotation[round(x_px_idx.shape[0] / 2) - 1, round(x_px_idx.shape[1] / 2) - 1]) } mlapdv[i] = MLAPDV location_id[i] = annotation return mlapdv, location_id
[docs] def surface_normal(triangle): """ Calculate the surface normal unit vector of one or more triangles. Parameters ---------- triangle : numpy.array An array of shape (n_triangles, 3, 3) representing (Px Py Pz). Returns ------- numpy.array The surface normal unit vector(s). """ if triangle.shape == (3, 3): triangle = triangle[np.newaxis, :, :] if triangle.shape[1:] != (3, 3): raise ValueError('expected array of shape (3, 3); 3 coordinates in x, y, and z') V = triangle[:, 1, :] - triangle[:, 0, :] # V = P2 - P1 W = triangle[:, 2, :] - triangle[:, 0, :] # W = P3 - P1 Nx = (V[:, 1] * W[:, 2]) - (V[:, 2] * W[:, 1]) # Nx = (Vy * Wz) - (Vz * Wy) Ny = (V[:, 2] * W[:, 0]) - (V[:, 0] * W[:, 2]) # Ny = (Vz * Wx) - (Vx * Wz) Nz = (V[:, 0] * W[:, 1]) - (V[:, 1] * W[:, 0]) # Nz = (Vx * Wy) - (Vy * Wx) N = np.c_[Nx, Ny, Nz] # Calculate unit vector. Transpose allows vectorized operation. A = N / np.sqrt((Nx ** 2) + (Ny ** 2) + (Nz ** 2))[np.newaxis].T return A.squeeze()
[docs] @nb.njit('b1(f8[:,:], f8[:])') def in_triangle(triangle, point): """ Check whether `point` lies within `triangle`. Parameters ---------- triangle : numpy.array A (2 x 3) array of x-y coordinates; A(x1, y1), B(x2, y2) and C(x3, y3). point : numpy.array A point, P(x, y). Returns ------- bool True if coordinate lies within triangle. """ def area(x1, y1, x2, y2, x3, y3): """Calculate the area of a triangle, given its vertices.""" return abs((x1 * (y2 - y3) + x2 * (y3 - y1) + x3 * (y1 - y2)) / 2.) x1, y1, x2, y2, x3, y3 = triangle.flat x, y = point A = area(x1, y1, x2, y2, x3, y3) # area of triangle ABC A1 = area(x, y, x2, y2, x3, y3) # area of triangle PBC A2 = area(x1, y1, x, y, x3, y3) # area of triangle PAC A3 = area(x1, y1, x2, y2, x, y) # area of triangle PAB # Check if sum of A1, A2 and A3 equals that of A diff = np.abs((A1 + A2 + A3) - A) REL_TOL = 1e-9 return diff <= np.abs(REL_TOL * A) # isclose not yet implemented in numba 0.57
[docs] @nb.njit('i8(f8[:], f8[:,:], intp[:,:])', nogil=True) def find_triangle(point, vertices, connectivity_list): """ Find which vertices contain a given point. Currently O(n) but could take advantage of connectivity order to be quicker. Parameters ---------- point : numpy.array The (x, y) coordinate of a point to locate within one of the triangles. vertices : numpy.array An N x 3 array of vertices representing a triangle mesh. connectivity_list : numpy.array An N x 3 array of indices representing the connectivity of `points`. Returns ------- int The index of the vertices containing `point`, or -1 if not within any triangle. """ face_ind = -1 for i in nb.prange(connectivity_list.shape[0]): triangle = vertices[connectivity_list[i, :], :] if in_triangle(triangle, point): face_ind = i break return face_ind
@nb.njit('Tuple((f8[:], intp[:]))(f8[:], f8[:])', nogil=True) def _nearest_neighbour_1d(x, x_new): """ Nearest neighbour interpolation with extrapolation. This was adapted from scipy.interpolate.interp1d but returns the indices of each nearest x value. Assumes x is not sorted. Parameters ---------- x : (N,) array_like A 1-D array of real values. x_new : (N,) array_like A 1D array of values to apply function to. Returns ------- numpy.array A 1D array of interpolated values. numpy.array A 1D array of indices. """ SIDE = 'left' # use 'right' to round up to nearest int instead of rounding down # Sort values ind = np.argsort(x, kind='mergesort') x = x[ind] x_bds = x / 2.0 # Do division before addition to prevent possible integer overflow x_bds = x_bds[1:] + x_bds[:-1] # Find where in the averaged data the values to interpolate would be inserted. x_new_indices = np.searchsorted(x_bds, x_new, side=SIDE) # Clip x_new_indices so that they are within the range of x indices. x_new_indices = x_new_indices.clip(0, len(x) - 1).astype(np.intp) # Calculate the actual value for each entry in x_new. y_new = x[x_new_indices] return y_new, ind[x_new_indices] @nb.njit('Tuple((f8[:,:], u2[:]))(f8[:], f8[:], f8[:,:], f8[:], f8[:], f8[:], u2[:,:,:])', nogil=True) def _update_points(t, normal_vector, coords, axis_ml_um, axis_ap_um, axis_dv_um, atlas_labels): """ Determine the MLAPDV coordinate and brain location index for each of the given coordinates. This has been optimized in numba. The majority of the time savings come from replacing iterp1d and ismember with _nearest_neighbour_1d which were extremely slow. Parallel iteration further halved the time it took per 512x512 FOV. Parameters ---------- t : numpy.array An N x 3 evenly spaced set of coordinates representing points going down from the coverslip towards the brain. normal_vector : numpy.array The unit vector of the face normal to the center of the window. coords : numpy.array A set of N x 3 coordinates representing the MLAPDV coordinates of each pixel relative to the center of the window, in micrometers (um). axis_ml_um : numpy.array An evenly spaced array of medio-lateral brain coordinates relative to bregma in um, at the resolution of the atlas image used. axis_ap_um : numpy.array An evenly spaced array of anterio-posterior brain coordinates relative to bregma in um, at the resolution of the atlas image used. axis_dv_um : numpy.array An evenly spaced array of dorso-ventral brain coordinates relative to bregma in um, at the resolution of the atlas image used. atlas_labels : numpy.array A 3D array of integers representing the brain location index of each voxel of a given atlas. The shape is expected to be (nAP, nML, nDV). Returns ------- numpy.array An N by 3 array containing the MLAPDV coordinates in um of each pixel coordinate. Coordinates outside of the brain are NaN. numpy.array A 1D array of atlas label indices the length of `coordinates`. """ # passing through the center of the craniotomy/coverslip traj_coords_centered = np.outer(t, -normal_vector) MLAPDV = np.full_like(coords, np.nan) annotation = np.zeros(coords.shape[0], dtype=np.uint16) n_points = coords.shape[0] for p in nb.prange(n_points): # Shifted to the correct point on the coverslip, in true ML-AP-DV coords traj_coords = traj_coords_centered + coords[p, :] # Find intersection coordinate with the brain. # Only use coordinates that exist in the atlas (kind of nearest neighbour interpolation) ml, ml_idx = _nearest_neighbour_1d(axis_ml_um, traj_coords[:, 0]) ap, ap_idx = _nearest_neighbour_1d(axis_ap_um, traj_coords[:, 1]) dv, dv_idx = _nearest_neighbour_1d(axis_dv_um, traj_coords[:, 2]) # Iterate over coordinates to find the first (if any) that is within the brain ind = -1 area = 0 # 0 = void; 1 = root for i in nb.prange(traj_coords.shape[0]): anno = atlas_labels[ap_idx[i], ml_idx[i], dv_idx[i]] if anno > 0: # first coordinate in the brain ind = i area = anno if area > 1: # non-root brain area; we're done break if area > 1: point = traj_coords[ind, :] MLAPDV[p, :] = point # in um annotation[p] = area else: MLAPDV[p, :] = np.nan annotation[p] = area # root or void return MLAPDV, annotation