"""Extraction tasks for fibrephotometry"""
import logging
import numpy as np
import pandas as pd
import ibldsp.utils
import ibllib.io.session_params
from ibllib.pipes import base_tasks
from iblutil.io import jsonable
_logger = logging.getLogger('ibllib')
"""
Neurophotometrics FP3002 specific information.
The light source map refers to the available LEDs on the system.
The flags refers to the byte encoding of led states in the system.
"""
LIGHT_SOURCE_MAP = {
'color': ['None', 'Violet', 'Blue', 'Green'],
'wavelength': [0, 415, 470, 560],
'name': ['None', 'Isosbestic', 'GCaMP', 'RCaMP'],
}
LED_STATES = {
'Condition': {
0: 'No additional signal',
1: 'Output 1 signal HIGH',
2: 'Output 0 signal HIGH',
3: 'Stimulation ON',
4: 'GPIO Line 2 HIGH',
5: 'GPIO Line 3 HIGH',
6: 'Input 1 HIGH',
7: 'Input 0 HIGH',
8: 'Output 0 signal HIGH + Stimulation',
9: 'Output 0 signal HIGH + Input 0 signal HIGH',
10: 'Input 0 signal HIGH + Stimulation',
11: 'Output 0 HIGH + Input 0 HIGH + Stimulation',
},
'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560},
'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561},
'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562},
'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564}
}
def _channel_meta(light_source_map=None):
"""
Return table of light source wavelengths and corresponding colour labels.
Parameters
----------
light_source_map : dict
An optional map of light source wavelengths (nm) used and their corresponding colour name.
Returns
-------
pandas.DataFrame
A sorted table of wavelength and colour name.
"""
light_source_map = light_source_map or LIGHT_SOURCE_MAP
meta = pd.DataFrame.from_dict(light_source_map)
meta.index.rename('channel_id', inplace=True)
return meta
[docs]
class FibrePhotometrySync(base_tasks.DynamicTask):
priority = 90
job_size = 'small'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.device_collection = self.get_device_collection(
'neurophotometrics', device_collection='raw_photometry_data')
# we will work with the first protocol here
for task in self.session_params['tasks']:
self.task_protocol = next(k for k in task)
self.task_collection = ibllib.io.session_params.get_task_collection(self.session_params, self.task_protocol)
break
@property
def signature(self):
signature = {
'input_files': [('_neurophotometrics_fpData.raw.pqt', self.device_collection, True, True),
('_iblrig_taskData.raw.jsonable', self.task_collection, True, True),
('_neurophotometrics_fpData.channels.csv', self.device_collection, True, True),
('_neurophotometrics_fpData.digitalIntputs.pqt', self.device_collection, True)],
'output_files': [('photometry.signal.pqt', 'alf/photometry', True),
('photometryROI.locations.pqt', 'alf/photometry', True)]
}
return signature
def _sync_bpod_neurophotometrics(self):
"""
Perform the linear clock correction between bpod and neurophotometrics timestamps.
:return: interpolation function that outputs bpod timestamsp from neurophotometrics timestamps
"""
folder_raw_photometry = self.session_path.joinpath(self.device_collection)
df_digital_inputs = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt'))
# normally we should disregard the states and use the sync label. But bpod doesn't log TTL outs,
# only the states. This will change in the future but for now we are stuck with this.
if 'habituation' in self.task_protocol:
sync_states_names = ['iti', 'reward']
else:
sync_states_names = ['trial_start', 'reward', 'exit_state']
# read in the raw behaviour data for syncing
file_jsonable = self.session_path.joinpath(self.task_collection, '_iblrig_taskData.raw.jsonable')
trials_table, bpod_data = jsonable.load_task_jsonable(file_jsonable)
# we get the timestamps of the states from the bpod data
tbpod = []
for sname in sync_states_names:
tbpod.append(np.array(
[bd['States timestamps'][sname][0][0] + bd['Trial start timestamp'] for bd in bpod_data if
sname in bd['States timestamps']]))
tbpod = np.sort(np.concatenate(tbpod))
tbpod = tbpod[~np.isnan(tbpod)]
# we get the timestamps for the photometry data
tph = df_digital_inputs['SystemTimestamp'].values[df_digital_inputs['Channel'] == self.kwargs['sync_channel']]
tph = tph[15:] # TODO: we may want to detect the spacers before removing it, especially for successive sessions
# sync the behaviour events to the photometry timestamps
fcn_nph_to_bpod_times, drift_ppm, iph, ibpod = ibldsp.utils.sync_timestamps(
tph, tbpod, return_indices=True, linear=True)
# then we check the alignment, should be less than the screen refresh rate
tcheck = fcn_nph_to_bpod_times(tph[iph]) - tbpod[ibpod]
_logger.info(
f'sync: n trials {len(bpod_data)}, n bpod sync {len(tbpod)}, n photometry {len(tph)}, n match {len(iph)}')
assert np.all(np.abs(tcheck) < 1 / 60), 'Sync issue detected, residual above 1/60s'
assert len(iph) / len(tbpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched'
valid_bounds = [bpod_data[0]['Trial start timestamp'] - 2, bpod_data[-1]['Trial end timestamp'] + 2]
return fcn_nph_to_bpod_times, valid_bounds
def _run(self, **kwargs):
"""
Extract photometry data from the raw neurophotometrics data in parquet
The extraction has 3 main steps:
1. Synchronise the bpod and neurophotometrics timestamps.
2. Extract the photometry data from the raw neurophotometrics data.
3. Label the fibers correspondance with brain regions in a small table
:param kwargs:
:return:
"""
# 1) sync: we check the synchronisation, right now we only have bpod but soon the daq will be used
match list(self.session_params['sync'].keys())[0]:
case 'bpod':
fcn_nph_to_bpod_times, valid_bounds = self._sync_bpod_neurophotometrics()
case _:
raise NotImplementedError('Syncing with daq is not supported yet.')
# 2) reformat the raw data with wavelengths and meta-data
folder_raw_photometry = self.session_path.joinpath(self.device_collection)
fp_data = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.raw.pqt'))
# Load channels and wavelength information
channel_meta_map = _channel_meta()
if (fn := folder_raw_photometry.joinpath('_neurophotometrics_fpData.channels.csv')).exists():
led_states = pd.read_csv(fn)
else:
led_states = pd.DataFrame(LED_STATES)
led_states = led_states.set_index('Condition')
# Extract signal columns into 2D array
rois = list(self.kwargs['fibers'].keys())
out_df = fp_data.filter(items=rois, axis=1).sort_index(axis=1)
out_df['times'] = fcn_nph_to_bpod_times(fp_data['SystemTimestamp'])
out_df['valid'] = np.logical_and(out_df['times'] >= valid_bounds[0], out_df['times'] <= valid_bounds[1])
out_df['wavelength'] = np.nan
out_df['name'] = ''
out_df['color'] = ''
# Extract channel index
states = fp_data.get('LedState', fp_data.get('Flags', None))
for state in states.unique():
ir, ic = np.where(led_states == state)
if ic.size == 0:
continue
for cn in ['name', 'color', 'wavelength']:
out_df.loc[states == state, cn] = channel_meta_map.iloc[ic[0]][cn]
# 3) label the brain regions
rois = []
c = 0
for k, v in self.kwargs['fibers'].items():
rois.append({'ROI': k, 'fiber': f'fiber{c:02d}', 'brain_region': v['location']})
df_rois = pd.DataFrame(rois).set_index('ROI')
# to finish we write the dataframes to disk
out_path = self.session_path.joinpath('alf', 'photometry')
out_path.mkdir(parents=True, exist_ok=True)
out_df.to_parquet(file_signal := out_path.joinpath('photometry.signal.pqt'))
df_rois.to_parquet(file_locations := out_path.joinpath('photometryROI.locations.pqt'))
return file_signal, file_locations