Source code for ibllib.pipes.neurophotometrics

"""Extraction tasks for fibrephotometry"""

import logging
import numpy as np
import pandas as pd

import ibldsp.utils
import ibllib.io.session_params
from ibllib.pipes import base_tasks
from iblutil.io import jsonable

_logger = logging.getLogger('ibllib')


"""
Neurophotometrics FP3002 specific information.
The light source map refers to the available LEDs on the system.
The flags refers to the byte encoding of led states in the system.
"""
LIGHT_SOURCE_MAP = {
    'color': ['None', 'Violet', 'Blue', 'Green'],
    'wavelength': [0, 415, 470, 560],
    'name': ['None', 'Isosbestic', 'GCaMP', 'RCaMP'],
}

LED_STATES = {
    'Condition': {
        0: 'No additional signal',
        1: 'Output 1 signal HIGH',
        2: 'Output 0 signal HIGH',
        3: 'Stimulation ON',
        4: 'GPIO Line 2 HIGH',
        5: 'GPIO Line 3 HIGH',
        6: 'Input 1 HIGH',
        7: 'Input 0 HIGH',
        8: 'Output 0 signal HIGH + Stimulation',
        9: 'Output 0 signal HIGH + Input 0 signal HIGH',
        10: 'Input 0 signal HIGH + Stimulation',
        11: 'Output 0 HIGH + Input 0 HIGH + Stimulation',
    },
    'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560},
    'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561},
    'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562},
    'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564}
}


def _channel_meta(light_source_map=None):
    """
    Return table of light source wavelengths and corresponding colour labels.

    Parameters
    ----------
    light_source_map : dict
        An optional map of light source wavelengths (nm) used and their corresponding colour name.

    Returns
    -------
    pandas.DataFrame
        A sorted table of wavelength and colour name.
    """
    light_source_map = light_source_map or LIGHT_SOURCE_MAP
    meta = pd.DataFrame.from_dict(light_source_map)
    meta.index.rename('channel_id', inplace=True)
    return meta


[docs] class FibrePhotometrySync(base_tasks.DynamicTask): priority = 90 job_size = 'small' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.device_collection = self.get_device_collection( 'neurophotometrics', device_collection='raw_photometry_data') # we will work with the first protocol here for task in self.session_params['tasks']: self.task_protocol = next(k for k in task) self.task_collection = ibllib.io.session_params.get_task_collection(self.session_params, self.task_protocol) break @property def signature(self): signature = { 'input_files': [('_neurophotometrics_fpData.raw.pqt', self.device_collection, True, True), ('_iblrig_taskData.raw.jsonable', self.task_collection, True, True), ('_neurophotometrics_fpData.channels.csv', self.device_collection, True, True), ('_neurophotometrics_fpData.digitalIntputs.pqt', self.device_collection, True)], 'output_files': [('photometry.signal.pqt', 'alf/photometry', True), ('photometryROI.locations.pqt', 'alf/photometry', True)] } return signature def _sync_bpod_neurophotometrics(self): """ Perform the linear clock correction between bpod and neurophotometrics timestamps. :return: interpolation function that outputs bpod timestamsp from neurophotometrics timestamps """ folder_raw_photometry = self.session_path.joinpath(self.device_collection) df_digital_inputs = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt')) # normally we should disregard the states and use the sync label. But bpod doesn't log TTL outs, # only the states. This will change in the future but for now we are stuck with this. if 'habituation' in self.task_protocol: sync_states_names = ['iti', 'reward'] else: sync_states_names = ['trial_start', 'reward', 'exit_state'] # read in the raw behaviour data for syncing file_jsonable = self.session_path.joinpath(self.task_collection, '_iblrig_taskData.raw.jsonable') trials_table, bpod_data = jsonable.load_task_jsonable(file_jsonable) # we get the timestamps of the states from the bpod data tbpod = [] for sname in sync_states_names: tbpod.append(np.array( [bd['States timestamps'][sname][0][0] + bd['Trial start timestamp'] for bd in bpod_data if sname in bd['States timestamps']])) tbpod = np.sort(np.concatenate(tbpod)) tbpod = tbpod[~np.isnan(tbpod)] # we get the timestamps for the photometry data tph = df_digital_inputs['SystemTimestamp'].values[df_digital_inputs['Channel'] == self.kwargs['sync_channel']] tph = tph[15:] # TODO: we may want to detect the spacers before removing it, especially for successive sessions # sync the behaviour events to the photometry timestamps fcn_nph_to_bpod_times, drift_ppm, iph, ibpod = ibldsp.utils.sync_timestamps( tph, tbpod, return_indices=True, linear=True) # then we check the alignment, should be less than the screen refresh rate tcheck = fcn_nph_to_bpod_times(tph[iph]) - tbpod[ibpod] _logger.info( f'sync: n trials {len(bpod_data)}, n bpod sync {len(tbpod)}, n photometry {len(tph)}, n match {len(iph)}') assert np.all(np.abs(tcheck) < 1 / 60), 'Sync issue detected, residual above 1/60s' assert len(iph) / len(tbpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched' valid_bounds = [bpod_data[0]['Trial start timestamp'] - 2, bpod_data[-1]['Trial end timestamp'] + 2] return fcn_nph_to_bpod_times, valid_bounds def _run(self, **kwargs): """ Extract photometry data from the raw neurophotometrics data in parquet The extraction has 3 main steps: 1. Synchronise the bpod and neurophotometrics timestamps. 2. Extract the photometry data from the raw neurophotometrics data. 3. Label the fibers correspondance with brain regions in a small table :param kwargs: :return: """ # 1) sync: we check the synchronisation, right now we only have bpod but soon the daq will be used match list(self.session_params['sync'].keys())[0]: case 'bpod': fcn_nph_to_bpod_times, valid_bounds = self._sync_bpod_neurophotometrics() case _: raise NotImplementedError('Syncing with daq is not supported yet.') # 2) reformat the raw data with wavelengths and meta-data folder_raw_photometry = self.session_path.joinpath(self.device_collection) fp_data = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.raw.pqt')) # Load channels and wavelength information channel_meta_map = _channel_meta() if (fn := folder_raw_photometry.joinpath('_neurophotometrics_fpData.channels.csv')).exists(): led_states = pd.read_csv(fn) else: led_states = pd.DataFrame(LED_STATES) led_states = led_states.set_index('Condition') # Extract signal columns into 2D array rois = list(self.kwargs['fibers'].keys()) out_df = fp_data.filter(items=rois, axis=1).sort_index(axis=1) out_df['times'] = fcn_nph_to_bpod_times(fp_data['SystemTimestamp']) out_df['valid'] = np.logical_and(out_df['times'] >= valid_bounds[0], out_df['times'] <= valid_bounds[1]) out_df['wavelength'] = np.nan out_df['name'] = '' out_df['color'] = '' # Extract channel index states = fp_data.get('LedState', fp_data.get('Flags', None)) for state in states.unique(): ir, ic = np.where(led_states == state) if ic.size == 0: continue for cn in ['name', 'color', 'wavelength']: out_df.loc[states == state, cn] = channel_meta_map.iloc[ic[0]][cn] # 3) label the brain regions rois = [] c = 0 for k, v in self.kwargs['fibers'].items(): rois.append({'ROI': k, 'fiber': f'fiber{c:02d}', 'brain_region': v['location']}) df_rois = pd.DataFrame(rois).set_index('ROI') # to finish we write the dataframes to disk out_path = self.session_path.joinpath('alf', 'photometry') out_path.mkdir(parents=True, exist_ok=True) out_df.to_parquet(file_signal := out_path.joinpath('photometry.signal.pqt')) df_rois.to_parquet(file_locations := out_path.joinpath('photometryROI.locations.pqt')) return file_signal, file_locations