"""Extraction tasks for fibrephotometry"""
import logging
import numpy as np
import pandas as pd
import ibldsp.utils
import ibllib.io.session_params
from ibllib.pipes import base_tasks
from iblutil.io import jsonable
_logger = logging.getLogger('ibllib')
Neurophotometrics FP3002 specific information.
The light source map refers to the available LEDs on the system.
The flags refers to the byte encoding of led states in the system.
'color': ['None', 'Violet', 'Blue', 'Green'],
'wavelength': [0, 415, 470, 560],
'name': ['None', 'Isosbestic', 'GCaMP', 'RCaMP'],
'Condition': {
0: 'No additional signal',
1: 'Output 1 signal HIGH',
2: 'Output 0 signal HIGH',
3: 'Stimulation ON',
4: 'GPIO Line 2 HIGH',
5: 'GPIO Line 3 HIGH',
6: 'Input 1 HIGH',
7: 'Input 0 HIGH',
8: 'Output 0 signal HIGH + Stimulation',
9: 'Output 0 signal HIGH + Input 0 signal HIGH',
10: 'Input 0 signal HIGH + Stimulation',
11: 'Output 0 HIGH + Input 0 HIGH + Stimulation',
'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560},
'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561},
'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562},
'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564}
def _channel_meta(light_source_map=None):
Return table of light source wavelengths and corresponding colour labels.
light_source_map : dict
An optional map of light source wavelengths (nm) used and their corresponding colour name.
A sorted table of wavelength and colour name.
light_source_map = light_source_map or LIGHT_SOURCE_MAP
meta = pd.DataFrame.from_dict(light_source_map)
meta.index.rename('channel_id', inplace=True)
return meta
class FibrePhotometrySync(base_tasks.DynamicTask):
priority = 90
job_size = 'small'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.device_collection = self.get_device_collection(
'neurophotometrics', device_collection='raw_photometry_data')
# we will work with the first protocol here
for task in self.session_params['tasks']:
self.task_protocol = next(k for k in task)
self.task_collection = ibllib.io.session_params.get_task_collection(self.session_params, self.task_protocol)
def signature(self):
signature = {
'input_files': [('_neurophotometrics_fpData.raw.pqt', self.device_collection, True, True),
('_iblrig_taskData.raw.jsonable', self.task_collection, True, True),
('_neurophotometrics_fpData.channels.csv', self.device_collection, True, True),
('_neurophotometrics_fpData.digitalIntputs.pqt', self.device_collection, True)],
'output_files': [('photometry.signal.pqt', 'alf/photometry', True),
('photometryROI.locations.pqt', 'alf/photometry', True)]
return signature
def _sync_bpod_neurophotometrics(self):
Perform the linear clock correction between bpod and neurophotometrics timestamps.
:return: interpolation function that outputs bpod timestamsp from neurophotometrics timestamps
folder_raw_photometry = self.session_path.joinpath(self.device_collection)
df_digital_inputs = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt'))
# normally we should disregard the states and use the sync label. But bpod doesn't log TTL outs,
# only the states. This will change in the future but for now we are stuck with this.
if 'habituation' in self.task_protocol:
sync_states_names = ['iti', 'reward']
sync_states_names = ['trial_start', 'reward', 'exit_state']
# read in the raw behaviour data for syncing
file_jsonable = self.session_path.joinpath(self.task_collection, '_iblrig_taskData.raw.jsonable')
trials_table, bpod_data = jsonable.load_task_jsonable(file_jsonable)
# we get the timestamps of the states from the bpod data
tbpod = []
for sname in sync_states_names:
[bd['States timestamps'][sname][0][0] + bd['Trial start timestamp'] for bd in bpod_data if
sname in bd['States timestamps']]))
tbpod = np.sort(np.concatenate(tbpod))
tbpod = tbpod[~np.isnan(tbpod)]
# we get the timestamps for the photometry data
tph = df_digital_inputs['SystemTimestamp'].values[df_digital_inputs['Channel'] == self.kwargs['sync_channel']]
tph = tph[15:] # TODO: we may want to detect the spacers before removing it, especially for successive sessions
# sync the behaviour events to the photometry timestamps
fcn_nph_to_bpod_times, drift_ppm, iph, ibpod = ibldsp.utils.sync_timestamps(
tph, tbpod, return_indices=True, linear=True)
# then we check the alignment, should be less than the screen refresh rate
tcheck = fcn_nph_to_bpod_times(tph[iph]) - tbpod[ibpod]
f'sync: n trials {len(bpod_data)}, n bpod sync {len(tbpod)}, n photometry {len(tph)}, n match {len(iph)}')
assert np.all(np.abs(tcheck) < 1 / 60), 'Sync issue detected, residual above 1/60s'
assert len(iph) / len(tbpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched'
valid_bounds = [bpod_data[0]['Trial start timestamp'] - 2, bpod_data[-1]['Trial end timestamp'] + 2]
return fcn_nph_to_bpod_times, valid_bounds
def _run(self, **kwargs):
Extract photometry data from the raw neurophotometrics data in parquet
The extraction has 3 main steps:
1. Synchronise the bpod and neurophotometrics timestamps.
2. Extract the photometry data from the raw neurophotometrics data.
3. Label the fibers correspondance with brain regions in a small table
:param kwargs:
# 1) sync: we check the synchronisation, right now we only have bpod but soon the daq will be used
match list(self.session_params['sync'].keys())[0]:
case 'bpod':
fcn_nph_to_bpod_times, valid_bounds = self._sync_bpod_neurophotometrics()
case _:
raise NotImplementedError('Syncing with daq is not supported yet.')
# 2) reformat the raw data with wavelengths and meta-data
folder_raw_photometry = self.session_path.joinpath(self.device_collection)
fp_data = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.raw.pqt'))
# Load channels and wavelength information
channel_meta_map = _channel_meta()
if (fn := folder_raw_photometry.joinpath('_neurophotometrics_fpData.channels.csv')).exists():
led_states = pd.read_csv(fn)
led_states = pd.DataFrame(LED_STATES)
led_states = led_states.set_index('Condition')
# Extract signal columns into 2D array
rois = list(self.kwargs['fibers'].keys())
out_df = fp_data.filter(items=rois, axis=1).sort_index(axis=1)
out_df['times'] = fcn_nph_to_bpod_times(fp_data['SystemTimestamp'])
out_df['valid'] = np.logical_and(out_df['times'] >= valid_bounds[0], out_df['times'] <= valid_bounds[1])
out_df['wavelength'] = np.nan
out_df['name'] = ''
out_df['color'] = ''
# Extract channel index
states = fp_data.get('LedState', fp_data.get('Flags', None))
for state in states.unique():
ir, ic = np.where(led_states == state)
if ic.size == 0:
for cn in ['name', 'color', 'wavelength']:
out_df.loc[states == state, cn] = channel_meta_map.iloc[ic[0]][cn]
# 3) label the brain regions
rois = []
c = 0
for k, v in self.kwargs['fibers'].items():
rois.append({'ROI': k, 'fiber': f'fiber{c:02d}', 'brain_region': v['location']})
df_rois = pd.DataFrame(rois).set_index('ROI')
# to finish we write the dataframes to disk
out_path = self.session_path.joinpath('alf', 'photometry')
out_path.mkdir(parents=True, exist_ok=True)
out_df.to_parquet(file_signal := out_path.joinpath('photometry.signal.pqt'))
df_rois.to_parquet(file_locations := out_path.joinpath('photometryROI.locations.pqt'))
return file_signal, file_locations