Source code for ibllib.pipes.behavior_tasks

"""Standard task protocol extractor dynamic pipeline tasks."""
import logging
import traceback

from packaging import version
import one.alf.io as alfio
from one.alf.path import session_path_parts
from one.api import ONE

from ibllib.oneibl.registration import get_lab
from ibllib.oneibl.data_handlers import ServerDataHandler, ExpectedDataset
from ibllib.pipes import base_tasks
from ibllib.io.raw_data_loaders import load_settings, load_bpod_fronts
from ibllib.qc.task_extractors import TaskQCExtractor
from ibllib.qc.task_metrics import HabituationQC, TaskQC, update_dataset_qc
from ibllib.io.extractors.ephys_passive import PassiveChoiceWorld
from ibllib.io.extractors.bpod_trials import get_bpod_extractor
from ibllib.io.extractors.ephys_fpga import FpgaTrials, FpgaTrialsHabituation, get_sync_and_chn_map
from ibllib.io.extractors.mesoscope import TimelineTrials
from ibllib.pipes import training_status
from ibllib.plots.figures import BehaviourPlots

_logger = logging.getLogger('ibllib')


[docs] class HabituationRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, False), ('_iblrig_encoderPositions.raw*', self.collection, False), ('_iblrig_encoderTrialInfo.raw*', self.collection, False), ('_iblrig_stimPositionScreen.raw*', self.collection, False), ('_iblrig_syncSquareUpdate.raw*', self.collection, False), ('_iblrig_ambientSensorData.raw*', self.collection, False) ] } return signature
[docs] class HabituationTrialsBpod(base_tasks.BehaviourTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ], 'output_files': [ ('*trials.contrastLeft.npy', self.output_collection, True), ('*trials.contrastRight.npy', self.output_collection, True), ('*trials.feedback_times.npy', self.output_collection, True), ('*trials.feedbackType.npy', self.output_collection, True), ('*trials.goCue_times.npy', self.output_collection, True), ('*trials.goCueTrigger_times.npy', self.output_collection, True), ('*trials.intervals.npy', self.output_collection, True), ('*trials.rewardVolume.npy', self.output_collection, True), ('*trials.stimOff_times.npy', self.output_collection, True), ('*trials.stimOn_times.npy', self.output_collection, True), ('*trials.stimOffTrigger_times.npy', self.output_collection, False), ('*trials.stimOnTrigger_times.npy', self.output_collection, False), ] } return signature def _run(self, update=True, save=True): """Extracts an iblrig training session.""" trials, output_files = self.extract_behaviour(save=save) if trials is None: return None if self.one is None or self.one.offline: return output_files # Run the task QC self.run_qc(trials, update=update) return output_files
[docs] def extract_behaviour(self, **kwargs): settings = load_settings(self.session_path, self.collection) if version.parse(settings['IBLRIG_VERSION'] or '100.0.0') < version.parse('5.0.0'): _logger.error('No extraction of legacy habituation sessions') return None, None self.extractor = get_bpod_extractor(self.session_path, task_collection=self.collection) self.extractor.default_path = self.output_collection return self.extractor.extract(task_collection=self.collection, **kwargs)
[docs] def run_qc(self, trials_data=None, update=True): trials_data = self._assert_trials_data(trials_data) # validate trials data # Compile task data for QC qc = HabituationQC(self.session_path, one=self.one) qc.extractor = TaskQCExtractor(self.session_path) # Update extractor fields qc.extractor.data = qc.extractor.rename_data(trials_data.copy()) qc.extractor.frame_ttls = self.extractor.frame2ttl # used in iblapps QC viewer qc.extractor.audio_ttls = self.extractor.audio # used in iblapps QC viewer qc.extractor.settings = self.extractor.settings namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' qc.run(update=update, namespace=namespace) return qc
[docs] class HabituationTrialsNidq(HabituationTrialsBpod): priority = 90 job_size = 'small' @property def signature(self): signature = super().signature signature['input_files'] = [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), ('*wiring.json', self.sync_collection, False), ('*.meta', self.sync_collection, True)] return signature
[docs] def extract_behaviour(self, save=True, **kwargs): """Extract the habituationChoiceWorld trial data using NI DAQ clock.""" # Extract Bpod trials bpod_trials, _ = super().extract_behaviour(save=False, **kwargs) # Sync Bpod trials to FPGA sync, chmap = get_sync_and_chn_map(self.session_path, self.sync_collection) self.extractor = FpgaTrialsHabituation( self.session_path, bpod_trials=bpod_trials, bpod_extractor=self.extractor) # NB: The stimOff times are called stimCenter times for habituation choice world outputs, files = self.extractor.extract( save=save, sync=sync, chmap=chmap, path_out=self.session_path.joinpath(self.output_collection), task_collection=self.collection, protocol_number=self.protocol_number, **kwargs) return outputs, files
[docs] def run_qc(self, trials_data=None, update=True, **_): """Run and update QC. This adds the bpod TTLs to the QC object *after* the QC is run in the super call method. The raw Bpod TTLs are not used by the QC however they are used in the iblapps QC plot. """ qc = super().run_qc(trials_data=trials_data, update=update) qc.extractor.bpod_ttls = self.extractor.bpod return qc
[docs] class TrialRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, False), ('_iblrig_encoderPositions.raw*', self.collection, False), ('_iblrig_encoderTrialInfo.raw*', self.collection, False), ('_iblrig_stimPositionScreen.raw*', self.collection, False), ('_iblrig_syncSquareUpdate.raw*', self.collection, False), ('_iblrig_ambientSensorData.raw*', self.collection, False) ] } return signature
[docs] class PassiveRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, False), ('_iblrig_encoderPositions.raw*', self.collection, False), ('_iblrig_encoderTrialInfo.raw*', self.collection, False), ('_iblrig_stimPositionScreen.raw*', self.collection, False), ('_iblrig_syncSquareUpdate.raw*', self.collection, False), ('_iblrig_RFMapStim.raw*', self.collection, True)] } return signature
[docs] class PassiveTaskNidq(base_tasks.BehaviourTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True), ('_iblrig_RFMapStim.raw*', self.collection, True), (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, True), ('*.wiring.json', self.sync_collection, False), ('*.meta', self.sync_collection, False)], 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, False), ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True), ('_ibl_passiveRFM.times.npy', self.output_collection, True), ('_ibl_passiveStims.table.csv', self.output_collection, False)] } return signature def _run(self, **kwargs): """returns a list of pathlib.Paths. """ data, paths = PassiveChoiceWorld(self.session_path).extract( sync_collection=self.sync_collection, task_collection=self.collection, save=True, path_out=self.session_path.joinpath(self.output_collection), protocol_number=self.protocol_number) if any(x is None for x in paths): self.status = -1 return paths
[docs] class PassiveTaskTimeline(base_tasks.BehaviourTask, base_tasks.MesoscopeTask): """TODO should be mesoscope invariant, using wiring file""" priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True), ('_iblrig_RFMapStim.raw*', self.collection, True), (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, False), (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, False), (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, False)], 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, False), ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True), ('_ibl_passiveRFM.times.npy', self.output_collection, True), ('_ibl_passiveStims.table.csv', self.output_collection, False)] } return signature def _run(self, **kwargs): """returns a list of pathlib.Paths. This class exists to load the sync file and set the protocol_number to None """ settings = load_settings(self.session_path, self.collection) ver = settings.get('IBLRIG_VERSION') or '100.0.0' if ver == '100.0.0' or version.parse(ver) <= version.parse('7.1.0'): _logger.warning('Protocol spacers not supported; setting protocol_number to None') self.protocol_number = None sync, chmap = self.load_sync() data, paths = PassiveChoiceWorld(self.session_path).extract( sync_collection=self.sync_collection, task_collection=self.collection, save=True, path_out=self.session_path.joinpath(self.output_collection), protocol_number=self.protocol_number, sync=sync, sync_map=chmap) if any(x is None for x in paths): self.status = -1 return paths
[docs] class ChoiceWorldTrialsBpod(base_tasks.BehaviourTask): priority = 90 job_size = 'small' extractor = None """ibllib.io.extractors.base.BaseBpodTrialsExtractor: An instance of the Bpod trials extractor.""" @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True)], 'output_files': [ ('*trials.goCueTrigger_times.npy', self.output_collection, True), ('*trials.stimOffTrigger_times.npy', self.output_collection, False), ('*trials.stimOnTrigger_times.npy', self.output_collection, False), ('*trials.table.pqt', self.output_collection, True), ('*wheel.position.npy', self.output_collection, True), ('*wheel.timestamps.npy', self.output_collection, True), ('*wheelMoves.intervals.npy', self.output_collection, True), ('*wheelMoves.peakAmplitude.npy', self.output_collection, True) ] } return signature def _run(self, update=True, save=True, **kwargs): """Extracts an iblrig training session.""" trials, output_files = self.extract_behaviour(save=save) if trials is None: return None if self.one is None or self.one.offline: return output_files # Run the task QC qc = self.run_qc(trials, update=update, **kwargs) if update and not self.one.offline: on_server = self.location == 'server' and isinstance(self.data_handler, ServerDataHandler) if not on_server: _logger.warning('Updating dataset QC only supported on local servers') else: labs = get_lab(self.session_path, self.one.alyx) # registered_dsets = self.register_datasets(labs=labs) datasets = self.data_handler.uploadData(output_files, self.version, labs=labs) update_dataset_qc(qc, datasets, self.one) return output_files
[docs] def extract_behaviour(self, **kwargs): self.extractor = get_bpod_extractor(self.session_path, task_collection=self.collection) _logger.info('Bpod trials extractor: %s.%s', self.extractor.__module__, self.extractor.__class__.__name__) self.extractor.default_path = self.output_collection return self.extractor.extract(task_collection=self.collection, **kwargs)
[docs] def run_qc(self, trials_data=None, update=True, QC=None): """ Run the task QC. Parameters ---------- trials_data : dict The complete extracted task data. update : bool If True, updates the session QC fields on Alyx. QC : ibllib.qc.task_metrics.TaskQC An optional QC class to instantiate. Returns ------- ibllib.qc.task_metrics.TaskQC The task QC object. """ trials_data = self._assert_trials_data(trials_data) # validate trials data # Compile task data for QC qc_extractor = TaskQCExtractor(self.session_path) qc_extractor.data = qc_extractor.rename_data(trials_data) if not QC: QC = HabituationQC if type(self.extractor).__name__ == 'HabituationTrials' else TaskQC _logger.debug('Running QC with %s.%s', QC.__module__, QC.__name__) qc = QC(self.session_path, one=self.one, log=_logger) if QC is not HabituationQC: qc_extractor.wheel_encoding = 'X1' qc_extractor.settings = self.extractor.settings qc_extractor.frame_ttls, qc_extractor.audio_ttls = load_bpod_fronts( self.session_path, task_collection=self.collection) qc.extractor = qc_extractor # Aggregate and update Alyx QC fields namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' qc.run(update=update, namespace=namespace) return qc
[docs] class ChoiceWorldTrialsNidq(ChoiceWorldTrialsBpod): priority = 90 job_size = 'small' @property def signature(self): I = ExpectedDataset.input # noqa ns = self.sync_namespace # Neuropixels 3A sync data are kept in individual probe collections v3A = ( I(f'_{ns}_sync.channels.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) & I(f'_{ns}_sync.polarities.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) & I(f'_{ns}_sync.times.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) & I(f'_{ns}_*.ap.meta', f'{self.sync_collection}/probe??', True, unique=False) & I(f'_{ns}_*wiring.json', f'{self.sync_collection}/probe??', False, unique=False) ) # Neuropixels 3B sync data are kept in probe-independent datasets v3B = ( I(f'_{ns}_sync.channels.npy', self.sync_collection, True) & I(f'_{ns}_sync.polarities.npy', self.sync_collection, True) & I(f'_{ns}_sync.times.npy', self.sync_collection, True) & I(f'_{ns}_*.meta', self.sync_collection, True) & I(f'_{ns}_*wiring.json', self.sync_collection, False) ) signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True), v3B | (~v3B & v3A) # either 3B datasets OR 3A datasets must be present ], 'output_files': [ ('*trials.goCueTrigger_times.npy', self.output_collection, True), ('*trials.intervals_bpod.npy', self.output_collection, False), ('*trials.stimOff_times.npy', self.output_collection, False), ('*trials.stimOffTrigger_times.npy', self.output_collection, False), ('*trials.stimOnTrigger_times.npy', self.output_collection, False), ('*trials.table.pqt', self.output_collection, True), ('*wheel.position.npy', self.output_collection, True), ('*wheel.timestamps.npy', self.output_collection, True), ('*wheelMoves.intervals.npy', self.output_collection, True), ('*wheelMoves.peakAmplitude.npy', self.output_collection, True) ] } return signature def _behaviour_criterion(self, update=True, truncate_to_pass=True): """ Computes and update the behaviour criterion on Alyx """ from brainbox.behavior import training trials = alfio.load_object(self.session_path.joinpath(self.output_collection), 'trials').to_df() good_enough = training.criterion_delay( n_trials=trials.shape[0], perf_easy=training.compute_performance_easy(trials), ) if truncate_to_pass and not good_enough: n_trials = trials.shape[0] while not good_enough and n_trials > 400: n_trials -= 1 good_enough = training.criterion_delay( n_trials=n_trials, perf_easy=training.compute_performance_easy(trials[:n_trials]), ) if update: eid = self.one.path2eid(self.session_path, query_type='remote') self.one.alyx.json_field_update( "sessions", eid, "extended_qc", {"behavior": int(good_enough)} )
[docs] def extract_behaviour(self, save=True, **kwargs): # Extract Bpod trials bpod_trials, _ = super().extract_behaviour(save=False, **kwargs) # Sync Bpod trials to FPGA sync, chmap = get_sync_and_chn_map(self.session_path, self.sync_collection) self.extractor = FpgaTrials(self.session_path, bpod_trials=bpod_trials, bpod_extractor=self.extractor) outputs, files = self.extractor.extract( save=save, sync=sync, chmap=chmap, path_out=self.session_path.joinpath(self.output_collection), task_collection=self.collection, protocol_number=self.protocol_number, **kwargs) return outputs, files
[docs] def run_qc(self, trials_data=None, update=False, plot_qc=False, QC=None): trials_data = self._assert_trials_data(trials_data) # validate trials data # Compile task data for QC qc_extractor = TaskQCExtractor(self.session_path) qc_extractor.data = qc_extractor.rename_data(trials_data.copy()) if not QC: QC = HabituationQC if type(self.extractor).__name__ == 'HabituationTrials' else TaskQC _logger.debug('Running QC with %s.%s', QC.__module__, QC.__name__) qc = QC(self.session_path, one=self.one, log=_logger) if QC is not HabituationQC: # Add Bpod wheel data wheel_ts_bpod = self.extractor.bpod2fpga(self.extractor.bpod_trials['wheel_timestamps']) qc_extractor.data['wheel_timestamps_bpod'] = wheel_ts_bpod qc_extractor.data['wheel_position_bpod'] = self.extractor.bpod_trials['wheel_position'] qc_extractor.wheel_encoding = 'X4' qc_extractor.frame_ttls = self.extractor.frame2ttl qc_extractor.audio_ttls = self.extractor.audio qc_extractor.bpod_ttls = self.extractor.bpod qc_extractor.settings = self.extractor.settings qc.extractor = qc_extractor # Aggregate and update Alyx QC fields namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' qc.run(update=update, namespace=namespace) if plot_qc: _logger.info('Creating Trials QC plots') try: session_id = self.one.path2eid(self.session_path) plot_task = BehaviourPlots( session_id, self.session_path, one=self.one, task_collection=self.output_collection) _ = plot_task.run() self.plot_tasks.append(plot_task) except Exception: _logger.error('Could not create Trials QC Plot') _logger.error(traceback.format_exc()) self.status = -1 return qc
def _run(self, update=True, plot_qc=True, save=True): output_files = super()._run(update=update, save=save, plot_qc=plot_qc) if update and not self.one.offline: self._behaviour_criterion(update=update) return output_files
[docs] class ChoiceWorldTrialsTimeline(ChoiceWorldTrialsNidq): """Behaviour task extractor with DAQdata.raw NPY datasets.""" @property def signature(self): signature = super().signature signature['input_files'] = [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True), (f'_{self.sync_namespace}_DAQdata.raw.npy', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.timestamps.npy', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.meta.json', self.sync_collection, True), ] if self.protocol: extractor = get_bpod_extractor(self.session_path, protocol=self.protocol) if extractor.save_names: signature['output_files'] = [(fn, self.output_collection, True) for fn in filter(None, extractor.save_names)] return signature
[docs] def extract_behaviour(self, save=True, **kwargs): """Extract the Bpod trials data and Timeline acquired signals.""" # First determine the extractor from the task protocol bpod_trials, _ = ChoiceWorldTrialsBpod.extract_behaviour(self, save=False, **kwargs) # Sync Bpod trials to DAQ self.extractor = TimelineTrials(self.session_path, bpod_trials=bpod_trials, bpod_extractor=self.extractor) save_path = self.session_path / self.output_collection if not self._spacer_support(self.extractor.settings): _logger.warning('Protocol spacers not supported; setting protocol_number to None') self.protocol_number = None dsets, out_files = self.extractor.extract( save=save, path_out=save_path, sync_collection=self.sync_collection, task_collection=self.collection, protocol_number=self.protocol_number, **kwargs) return dsets, out_files
[docs] class TrainingStatus(base_tasks.BehaviourTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('*trials.table.pqt', self.output_collection, True)], 'output_files': [] } return signature def _run(self, upload=True): """ Extracts training status for subject. """ lab = get_lab(self.session_path, self.one.alyx) if lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: _logger.info('Switching from cortexlab Alyx to IBL Alyx for training status queries.') one = ONE(base_url='https://alyx.internationalbrainlab.org') else: one = self.one df = training_status.get_latest_training_information(self.session_path, one) if df is not None: training_status.make_plots( self.session_path, self.one, df=df, save=True, upload=upload, task_collection=self.collection) # Update status map in JSON field of subjects endpoint if self.one and not self.one.offline: _logger.debug('Updating JSON field of subjects endpoint') status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( subset='training_status', keep='first').to_dict()) date, sess = status.items() data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) for k, v in date[1].items()}} _, subject, *_ = session_path_parts(self.session_path) self.one.alyx.json_field_update('subjects', subject, data=data) output_files = [] return output_files