"""Standard task protocol extractor dynamic pipeline tasks."""
import logging
import traceback
from packaging import version
import one.alf.io as alfio
from one.alf.path import session_path_parts
from one.api import ONE
from ibllib.oneibl.registration import get_lab
from ibllib.oneibl.data_handlers import ServerDataHandler, ExpectedDataset
from ibllib.pipes import base_tasks
from ibllib.io.raw_data_loaders import load_settings, load_bpod_fronts
from ibllib.qc.task_extractors import TaskQCExtractor
from ibllib.qc.task_metrics import HabituationQC, TaskQC, update_dataset_qc
from ibllib.io.extractors.ephys_passive import PassiveChoiceWorld
from ibllib.io.extractors.bpod_trials import get_bpod_extractor
from ibllib.io.extractors.ephys_fpga import FpgaTrials, FpgaTrialsHabituation, get_sync_and_chn_map
from ibllib.io.extractors.mesoscope import TimelineTrials
from ibllib.pipes import training_status
from ibllib.plots.figures import BehaviourPlots
_logger = logging.getLogger('ibllib')
[docs]
class HabituationRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
priority = 100
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [],
'output_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, False),
('_iblrig_encoderPositions.raw*', self.collection, False),
('_iblrig_encoderTrialInfo.raw*', self.collection, False),
('_iblrig_stimPositionScreen.raw*', self.collection, False),
('_iblrig_syncSquareUpdate.raw*', self.collection, False),
('_iblrig_ambientSensorData.raw*', self.collection, False)
]
}
return signature
[docs]
class HabituationTrialsBpod(base_tasks.BehaviourTask):
priority = 90
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
],
'output_files': [
('*trials.contrastLeft.npy', self.output_collection, True),
('*trials.contrastRight.npy', self.output_collection, True),
('*trials.feedback_times.npy', self.output_collection, True),
('*trials.feedbackType.npy', self.output_collection, True),
('*trials.goCue_times.npy', self.output_collection, True),
('*trials.goCueTrigger_times.npy', self.output_collection, True),
('*trials.intervals.npy', self.output_collection, True),
('*trials.rewardVolume.npy', self.output_collection, True),
('*trials.stimOff_times.npy', self.output_collection, True),
('*trials.stimOn_times.npy', self.output_collection, True),
('*trials.stimOffTrigger_times.npy', self.output_collection, False),
('*trials.stimOnTrigger_times.npy', self.output_collection, False),
]
}
return signature
def _run(self, update=True, save=True):
"""Extracts an iblrig training session."""
trials, output_files = self.extract_behaviour(save=save)
if trials is None:
return None
if self.one is None or self.one.offline:
return output_files
# Run the task QC
self.run_qc(trials, update=update)
return output_files
[docs]
def run_qc(self, trials_data=None, update=True):
trials_data = self._assert_trials_data(trials_data) # validate trials data
# Compile task data for QC
qc = HabituationQC(self.session_path, one=self.one)
qc.extractor = TaskQCExtractor(self.session_path)
# Update extractor fields
qc.extractor.data = qc.extractor.rename_data(trials_data.copy())
qc.extractor.frame_ttls = self.extractor.frame2ttl # used in iblapps QC viewer
qc.extractor.audio_ttls = self.extractor.audio # used in iblapps QC viewer
qc.extractor.settings = self.extractor.settings
namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}'
qc.run(update=update, namespace=namespace)
return qc
[docs]
class HabituationTrialsNidq(HabituationTrialsBpod):
priority = 90
job_size = 'small'
@property
def signature(self):
signature = super().signature
signature['input_files'] = [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
('*wiring.json', self.sync_collection, False),
('*.meta', self.sync_collection, True)]
return signature
[docs]
def run_qc(self, trials_data=None, update=True, **_):
"""Run and update QC.
This adds the bpod TTLs to the QC object *after* the QC is run in the super call method.
The raw Bpod TTLs are not used by the QC however they are used in the iblapps QC plot.
"""
qc = super().run_qc(trials_data=trials_data, update=update)
qc.extractor.bpod_ttls = self.extractor.bpod
return qc
[docs]
class TrialRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
priority = 100
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [],
'output_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, False),
('_iblrig_encoderPositions.raw*', self.collection, False),
('_iblrig_encoderTrialInfo.raw*', self.collection, False),
('_iblrig_stimPositionScreen.raw*', self.collection, False),
('_iblrig_syncSquareUpdate.raw*', self.collection, False),
('_iblrig_ambientSensorData.raw*', self.collection, False)
]
}
return signature
[docs]
class PassiveRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
priority = 100
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [],
'output_files': [('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, False),
('_iblrig_encoderPositions.raw*', self.collection, False),
('_iblrig_encoderTrialInfo.raw*', self.collection, False),
('_iblrig_stimPositionScreen.raw*', self.collection, False),
('_iblrig_syncSquareUpdate.raw*', self.collection, False),
('_iblrig_RFMapStim.raw*', self.collection, True)]
}
return signature
[docs]
class PassiveTaskNidq(base_tasks.BehaviourTask):
priority = 90
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('_iblrig_taskSettings.raw*', self.collection, True),
('_iblrig_RFMapStim.raw*', self.collection, True),
(f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.times.*', self.sync_collection, True),
('*.wiring.json', self.sync_collection, False),
('*.meta', self.sync_collection, False)],
'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, False),
('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True),
('_ibl_passiveRFM.times.npy', self.output_collection, True),
('_ibl_passiveStims.table.csv', self.output_collection, False)]
}
return signature
def _run(self, **kwargs):
"""returns a list of pathlib.Paths. """
data, paths = PassiveChoiceWorld(self.session_path).extract(
sync_collection=self.sync_collection, task_collection=self.collection, save=True,
path_out=self.session_path.joinpath(self.output_collection), protocol_number=self.protocol_number)
if any(x is None for x in paths):
self.status = -1
return paths
[docs]
class PassiveTaskTimeline(base_tasks.BehaviourTask, base_tasks.MesoscopeTask):
"""TODO should be mesoscope invariant, using wiring file"""
priority = 90
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('_iblrig_taskSettings.raw*', self.collection, True),
('_iblrig_RFMapStim.raw*', self.collection, True),
(f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, False),
(f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, False),
(f'_{self.sync_namespace}_sync.times.*', self.sync_collection, False)],
'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, False),
('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True),
('_ibl_passiveRFM.times.npy', self.output_collection, True),
('_ibl_passiveStims.table.csv', self.output_collection, False)]
}
return signature
def _run(self, **kwargs):
"""returns a list of pathlib.Paths.
This class exists to load the sync file and set the protocol_number to None
"""
settings = load_settings(self.session_path, self.collection)
ver = settings.get('IBLRIG_VERSION') or '100.0.0'
if ver == '100.0.0' or version.parse(ver) <= version.parse('7.1.0'):
_logger.warning('Protocol spacers not supported; setting protocol_number to None')
self.protocol_number = None
sync, chmap = self.load_sync()
data, paths = PassiveChoiceWorld(self.session_path).extract(
sync_collection=self.sync_collection, task_collection=self.collection, save=True,
path_out=self.session_path.joinpath(self.output_collection),
protocol_number=self.protocol_number, sync=sync, sync_map=chmap)
if any(x is None for x in paths):
self.status = -1
return paths
[docs]
class ChoiceWorldTrialsBpod(base_tasks.BehaviourTask):
priority = 90
job_size = 'small'
extractor = None
"""ibllib.io.extractors.base.BaseBpodTrialsExtractor: An instance of the Bpod trials extractor."""
@property
def signature(self):
signature = {
'input_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, True),
('_iblrig_encoderPositions.raw*', self.collection, True)],
'output_files': [
('*trials.goCueTrigger_times.npy', self.output_collection, True),
('*trials.stimOffTrigger_times.npy', self.output_collection, False),
('*trials.stimOnTrigger_times.npy', self.output_collection, False),
('*trials.table.pqt', self.output_collection, True),
('*wheel.position.npy', self.output_collection, True),
('*wheel.timestamps.npy', self.output_collection, True),
('*wheelMoves.intervals.npy', self.output_collection, True),
('*wheelMoves.peakAmplitude.npy', self.output_collection, True)
]
}
return signature
def _run(self, update=True, save=True, **kwargs):
"""Extracts an iblrig training session."""
trials, output_files = self.extract_behaviour(save=save)
if trials is None:
return None
if self.one is None or self.one.offline:
return output_files
# Run the task QC
qc = self.run_qc(trials, update=update, **kwargs)
if update and not self.one.offline:
on_server = self.location == 'server' and isinstance(self.data_handler, ServerDataHandler)
if not on_server:
_logger.warning('Updating dataset QC only supported on local servers')
else:
labs = get_lab(self.session_path, self.one.alyx)
# registered_dsets = self.register_datasets(labs=labs)
datasets = self.data_handler.uploadData(output_files, self.version, labs=labs)
update_dataset_qc(qc, datasets, self.one)
return output_files
[docs]
def run_qc(self, trials_data=None, update=True, QC=None):
"""
Run the task QC.
Parameters
----------
trials_data : dict
The complete extracted task data.
update : bool
If True, updates the session QC fields on Alyx.
QC : ibllib.qc.task_metrics.TaskQC
An optional QC class to instantiate.
Returns
-------
ibllib.qc.task_metrics.TaskQC
The task QC object.
"""
trials_data = self._assert_trials_data(trials_data) # validate trials data
# Compile task data for QC
qc_extractor = TaskQCExtractor(self.session_path)
qc_extractor.data = qc_extractor.rename_data(trials_data)
if not QC:
QC = HabituationQC if type(self.extractor).__name__ == 'HabituationTrials' else TaskQC
_logger.debug('Running QC with %s.%s', QC.__module__, QC.__name__)
qc = QC(self.session_path, one=self.one, log=_logger)
if QC is not HabituationQC:
qc_extractor.wheel_encoding = 'X1'
qc_extractor.settings = self.extractor.settings
qc_extractor.frame_ttls, qc_extractor.audio_ttls = load_bpod_fronts(
self.session_path, task_collection=self.collection)
qc.extractor = qc_extractor
# Aggregate and update Alyx QC fields
namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}'
qc.run(update=update, namespace=namespace)
return qc
[docs]
class ChoiceWorldTrialsNidq(ChoiceWorldTrialsBpod):
priority = 90
job_size = 'small'
@property
def signature(self):
I = ExpectedDataset.input # noqa
ns = self.sync_namespace
# Neuropixels 3A sync data are kept in individual probe collections
v3A = (
I(f'_{ns}_sync.channels.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) &
I(f'_{ns}_sync.polarities.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) &
I(f'_{ns}_sync.times.probe??.npy', f'{self.sync_collection}/probe??', True, unique=False) &
I(f'_{ns}_*.ap.meta', f'{self.sync_collection}/probe??', True, unique=False) &
I(f'_{ns}_*wiring.json', f'{self.sync_collection}/probe??', False, unique=False)
)
# Neuropixels 3B sync data are kept in probe-independent datasets
v3B = (
I(f'_{ns}_sync.channels.npy', self.sync_collection, True) &
I(f'_{ns}_sync.polarities.npy', self.sync_collection, True) &
I(f'_{ns}_sync.times.npy', self.sync_collection, True) &
I(f'_{ns}_*.meta', self.sync_collection, True) &
I(f'_{ns}_*wiring.json', self.sync_collection, False)
)
signature = {
'input_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, True),
('_iblrig_encoderPositions.raw*', self.collection, True),
v3B | (~v3B & v3A) # either 3B datasets OR 3A datasets must be present
],
'output_files': [
('*trials.goCueTrigger_times.npy', self.output_collection, True),
('*trials.intervals_bpod.npy', self.output_collection, False),
('*trials.stimOff_times.npy', self.output_collection, False),
('*trials.stimOffTrigger_times.npy', self.output_collection, False),
('*trials.stimOnTrigger_times.npy', self.output_collection, False),
('*trials.table.pqt', self.output_collection, True),
('*wheel.position.npy', self.output_collection, True),
('*wheel.timestamps.npy', self.output_collection, True),
('*wheelMoves.intervals.npy', self.output_collection, True),
('*wheelMoves.peakAmplitude.npy', self.output_collection, True)
]
}
return signature
def _behaviour_criterion(self, update=True, truncate_to_pass=True):
"""
Computes and update the behaviour criterion on Alyx
"""
from brainbox.behavior import training
trials = alfio.load_object(self.session_path.joinpath(self.output_collection), 'trials').to_df()
good_enough = training.criterion_delay(
n_trials=trials.shape[0],
perf_easy=training.compute_performance_easy(trials),
)
if truncate_to_pass and not good_enough:
n_trials = trials.shape[0]
while not good_enough and n_trials > 400:
n_trials -= 1
good_enough = training.criterion_delay(
n_trials=n_trials,
perf_easy=training.compute_performance_easy(trials[:n_trials]),
)
if update:
eid = self.one.path2eid(self.session_path, query_type='remote')
self.one.alyx.json_field_update(
"sessions", eid, "extended_qc", {"behavior": int(good_enough)}
)
[docs]
def run_qc(self, trials_data=None, update=False, plot_qc=False, QC=None):
trials_data = self._assert_trials_data(trials_data) # validate trials data
# Compile task data for QC
qc_extractor = TaskQCExtractor(self.session_path)
qc_extractor.data = qc_extractor.rename_data(trials_data.copy())
if not QC:
QC = HabituationQC if type(self.extractor).__name__ == 'HabituationTrials' else TaskQC
_logger.debug('Running QC with %s.%s', QC.__module__, QC.__name__)
qc = QC(self.session_path, one=self.one, log=_logger)
if QC is not HabituationQC:
# Add Bpod wheel data
wheel_ts_bpod = self.extractor.bpod2fpga(self.extractor.bpod_trials['wheel_timestamps'])
qc_extractor.data['wheel_timestamps_bpod'] = wheel_ts_bpod
qc_extractor.data['wheel_position_bpod'] = self.extractor.bpod_trials['wheel_position']
qc_extractor.wheel_encoding = 'X4'
qc_extractor.frame_ttls = self.extractor.frame2ttl
qc_extractor.audio_ttls = self.extractor.audio
qc_extractor.bpod_ttls = self.extractor.bpod
qc_extractor.settings = self.extractor.settings
qc.extractor = qc_extractor
# Aggregate and update Alyx QC fields
namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}'
qc.run(update=update, namespace=namespace)
if plot_qc:
_logger.info('Creating Trials QC plots')
try:
session_id = self.one.path2eid(self.session_path)
plot_task = BehaviourPlots(
session_id, self.session_path, one=self.one, task_collection=self.output_collection)
_ = plot_task.run()
self.plot_tasks.append(plot_task)
except Exception:
_logger.error('Could not create Trials QC Plot')
_logger.error(traceback.format_exc())
self.status = -1
return qc
def _run(self, update=True, plot_qc=True, save=True):
output_files = super()._run(update=update, save=save, plot_qc=plot_qc)
if update and not self.one.offline:
self._behaviour_criterion(update=update)
return output_files
[docs]
class ChoiceWorldTrialsTimeline(ChoiceWorldTrialsNidq):
"""Behaviour task extractor with DAQdata.raw NPY datasets."""
@property
def signature(self):
signature = super().signature
signature['input_files'] = [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('_iblrig_encoderEvents.raw*', self.collection, True),
('_iblrig_encoderPositions.raw*', self.collection, True),
(f'_{self.sync_namespace}_DAQdata.raw.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.timestamps.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.meta.json', self.sync_collection, True),
]
if self.protocol:
extractor = get_bpod_extractor(self.session_path, protocol=self.protocol)
if extractor.save_names:
signature['output_files'] = [(fn, self.output_collection, True)
for fn in filter(None, extractor.save_names)]
return signature
[docs]
class TrainingStatus(base_tasks.BehaviourTask):
priority = 90
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [
('_iblrig_taskData.raw.*', self.collection, True),
('_iblrig_taskSettings.raw.*', self.collection, True),
('*trials.table.pqt', self.output_collection, True)],
'output_files': []
}
return signature
def _run(self, upload=True):
"""
Extracts training status for subject.
"""
lab = get_lab(self.session_path, self.one.alyx)
if lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
_logger.info('Switching from cortexlab Alyx to IBL Alyx for training status queries.')
one = ONE(base_url='https://alyx.internationalbrainlab.org')
else:
one = self.one
df = training_status.get_latest_training_information(self.session_path, one)
if df is not None:
training_status.make_plots(
self.session_path, self.one, df=df, save=True, upload=upload, task_collection=self.collection)
# Update status map in JSON field of subjects endpoint
if self.one and not self.one.offline:
_logger.debug('Updating JSON field of subjects endpoint')
status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates(
subset='training_status', keep='first').to_dict())
date, sess = status.items()
data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k]))
for k, v in date[1].items()}}
_, subject, *_ = session_path_parts(self.session_path)
self.one.alyx.json_field_update('subjects', subject, data=data)
output_files = []
return output_files