"""Task pipeline creation from an acquisition description.
The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml`
file and determines the set of tasks required to preprocess the session.
In the experiment description file there is a 'tasks' key that defines each task protocol and the
location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors'
field that should contain a list of dynamic pipeline task class names for extracting the task data.
These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the
extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name.
NB: The standard behvaiour extraction task classes (e.g.
:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`)
handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod
trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod
trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor`
map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while
non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal
projects repo. The Bpod trials extractor class must be a subclass of the
:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the
personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module.
"""
import logging
import re
from fnmatch import fnmatch
from collections import OrderedDict
from pathlib import Path
from itertools import chain
import yaml
import spikeglx
import ibllib.io.raw_data_loaders as rawio
import ibllib.io.session_params as sess_params
import ibllib.pipes.tasks as mtasks
import ibllib.pipes.base_tasks as bstasks
import ibllib.pipes.widefield_tasks as wtasks
import ibllib.pipes.mesoscope_tasks as mscope_tasks
import ibllib.pipes.sync_tasks as stasks
import ibllib.pipes.behavior_tasks as btasks
import ibllib.pipes.video_tasks as vtasks
import ibllib.pipes.ephys_tasks as etasks
import ibllib.pipes.audio_tasks as atasks
import ibllib.pipes.neurophotometrics as ptasks
_logger = logging.getLogger(__name__)
[docs]
def acquisition_description_legacy_session(session_path, save=False):
"""
From a legacy session create a dictionary corresponding to the acquisition description.
Parameters
----------
session_path : str, pathlib.Path
A path to a session to describe.
save : bool
If true, saves the acquisition description file to _ibl_experiment.description.yaml.
Returns
-------
dict
The legacy acquisition description.
"""
settings = rawio.load_settings(session_path)
protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN')
dict_ad = get_acquisition_description(protocol)
if save:
sess_params.write_params(session_path=session_path, data=dict_ad)
return dict_ad
[docs]
def get_acquisition_description(protocol):
""""
This is a set of example acquisition descriptions for experiments
- choice_world_recording
- choice_world_biased
- choice_world_training
- choice_world_habituation
- choice_world_passive
That are part of the IBL pipeline
"""
if 'ephys' in protocol: # canonical ephys
devices = {
'cameras': {
'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
},
'neuropixel': {
'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
},
'microphone': {
'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
},
}
acquisition_description = { # this is the current ephys pipeline description
'devices': devices,
'tasks': [
{'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
{'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
],
'sync': {
'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
},
'procedures': ['Ephys recording with acute probe(s)'],
'projects': ['ibl_neuropixel_brainwide_01']
}
else:
devices = {
'cameras': {
'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
},
'microphone': {
'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
},
}
acquisition_description = { # this is the current ephys pipeline description
'devices': devices,
'sync': {'bpod': {'collection': 'raw_behavior_data'}},
'procedures': ['Behavior training/tasks'],
'projects': ['ibl_neuropixel_brainwide_01']
}
if 'biased' in protocol:
key = 'biasedChoiceWorld'
elif 'training' in protocol:
key = 'trainingChoiceWorld'
elif 'habituation' in protocol:
key = 'habituationChoiceWorld'
else:
raise ValueError(f'Unknown protocol "{protocol}"')
acquisition_description['tasks'] = [{key: {
'collection': 'raw_behavior_data',
'sync_label': 'bpod'
}}]
acquisition_description['version'] = '1.0.0'
return acquisition_description
def _sync_label(sync, acquisition_software=None, **_):
"""
Returns the sync label based on the sync type and acquisition software.
The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'.
The 'acquisition_software' refers to the software used to acquire the data, e.g.
for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect
how the data are loaded and extracted, and therefore which tasks to use.
The naming convention here is not ideal, and may be changed in the future.
Parameters
----------
sync : str
The sync type, e.g. 'nidq', 'tdms', 'bpod'.
acquisition_software : str
The acquisition software used to acquire the sync data.
Returns
-------
str
The sync label for determining the extractor tasks.
"""
return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync
def _load_acquisition_description(session_path):
"""
Load a session's acquisition description.
Attempts to load from the session path and upon failure, attempts to generate one based on the
task protocol (this only works for legacy pipeline sessions).
Parameters
----------
session_path : str, pathlib.Path
A session path.
Returns
-------
dict
The acquisition description file.
"""
acquisition_description = sess_params.read_params(session_path)
if not acquisition_description:
try:
# v7 sessions used a different folder name for task data;
# v8 sessions should always have a description file
assert session_path.joinpath('raw_behavior_data').exists()
acquisition_description = acquisition_description_legacy_session(session_path)
assert acquisition_description
except (AssertionError, ValueError):
raise ValueError('Experiment description file not found or is empty')
return acquisition_description
def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None):
"""
Generate behaviour tasks from acquisition description.
This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects.
Parameters
----------
session_path : str, pathlib.Path
A session path.
acquisition_description : dict
An acquisition description.
sync_tasks : list
A list of sync tasks to use as behaviour task parents.
one : One
An instance of ONE to pass to each task.
Returns
-------
dict[str, ibllib.pipes.tasks.Task]
A map of Alyx task name to behaviour task object.
"""
if not acquisition_description:
acquisition_description = _load_acquisition_description(session_path)
tasks = OrderedDict()
sync_tasks = sync_tasks or []
kwargs = {'session_path': session_path, 'one': one}
# Syncing tasks
(sync, sync_args), = acquisition_description['sync'].items()
sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task
sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments
sync_args['sync_ext'] = sync_args.pop('extension', None)
sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None)
sync_kwargs = {'sync': sync, **sync_args}
# Behavior tasks
task_protocols = acquisition_description.get('tasks', [])
for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))):
collection = task_info.get('collection', f'raw_task_data_{i:02}')
task_kwargs = {'protocol': protocol, 'collection': collection}
# For now the order of protocols in the list will take precedence. If collections are numbered,
# check that the numbers match the order. This may change in the future.
if re.match(r'^raw_task_data_\d{2}$', collection):
task_kwargs['protocol_number'] = i
if int(collection.split('_')[-1]) != i:
_logger.warning('Number in collection name does not match task order')
if extractors := task_info.get('extractors', False):
extractors = (extractors,) if isinstance(extractors, str) else extractors
task_name = None # to avoid unbound variable issue in the first round
for j, extractor in enumerate(extractors):
# Assume previous task in the list is a parent
parents = [] if j == 0 else [tasks[task_name]]
# Make sure extractor and sync task don't collide
for sync_option in ('nidq', 'bpod', 'timeline'):
if sync_option in extractor.lower() and not sync_label == sync_option:
raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match')
# Look for the extractor in the behavior extractors module
if hasattr(btasks, extractor):
task = getattr(btasks, extractor)
# This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example
elif hasattr(btasks, extractor + sync_label.capitalize()):
task = getattr(btasks, extractor + sync_label.capitalize())
else:
# lookup in the project extraction repo if we find an extractor class
import projects.extraction_tasks
if hasattr(projects.extraction_tasks, extractor):
task = getattr(projects.extraction_tasks, extractor)
elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()):
task = getattr(btasks, extractor + sync_label.capitalize())
else:
raise NotImplementedError(
f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects')
_logger.debug('%s (protocol #%i, task #%i) = %s.%s',
protocol, i, j, task.__module__, task.__name__)
# Rename the class to something more informative
task_name = f'{task.__name__}_{i:02}'
if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')):
task_name = f'Trials_{task_name}'
# For now we assume that the second task in the list is always the trials extractor, which is dependent
# on the sync task and sync arguments
if j == 1:
tasks[task_name] = type(task_name, (task,), {})(
**kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
)
else:
tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents)
# For the next task, we assume that the previous task is the parent
else: # Legacy block to handle sessions without defined extractors
# - choice_world_recording
# - choice_world_biased
# - choice_world_training
# - choice_world_habituation
if 'passiveChoiceWorld' in protocol:
registration_class = btasks.PassiveRegisterRaw
try:
behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize())
except AttributeError:
raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"')
compute_status = False
elif 'habituation' in protocol:
registration_class = btasks.HabituationRegisterRaw
behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize())
compute_status = False
else:
registration_class = btasks.TrialRegisterRaw
try:
behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize())
except AttributeError:
raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"')
compute_status = True
tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})(
**kwargs, **task_kwargs)
parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks
tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})(
**kwargs, **sync_kwargs, **task_kwargs, parents=parents)
if compute_status:
tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', (
btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
return tasks
[docs]
def get_trials_tasks(session_path, one=None, bpod_only=False):
"""
Return a list of pipeline trials extractor task objects for a given session.
This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned
for both recent and legacy sessions. Only Trials tasks are returned, not the training status
or raw registration tasks.
Parameters
----------
session_path : str, pathlib.Path
An absolute path to a session.
one : one.api.One
An ONE instance.
bpod_only : bool
If true, extract trials from Bpod clock instead of the main DAQ's.
Returns
-------
list of pipes.tasks.Task
A list of task objects for the provided session.
Examples
--------
Return the tasks for active choice world extraction
>>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path)))
"""
# Check for an experiment.description file; ensure downloaded if possible
if one and one.to_eid(session_path): # to_eid returns None if session not registered
one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False)
acquisition_description = _load_acquisition_description(session_path)
if bpod_only and acquisition_description:
acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}}
try:
trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one)
return [v for k, v in trials_tasks.items() if k.startswith('Trials_')]
except NotImplementedError as ex:
_logger.warning('Failed to get trials tasks: %s', ex)
return []
[docs]
def is_active_trials_task(task) -> bool:
"""
Check if task is for active choice world extraction.
Parameters
----------
task : ibllib.pipes.tasks.Task
A task instance to test.
Returns
-------
bool
True if the task name starts with 'Trials_' and outputs a trials.table dataset.
"""
trials_task = task.name.lower().startswith('trials_')
output_names = [x[0] for x in task.signature.get('output_files', [])]
return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names)
[docs]
def make_pipeline(session_path, **pkwargs):
"""
Creates a pipeline of extractor tasks from a session's experiment description file.
Parameters
----------
session_path : str, Path
The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
pkwargs
Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
Returns
-------
ibllib.pipes.tasks.Pipeline
A task pipeline object.
"""
# NB: this pattern is a pattern for dynamic class creation
# tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
if not session_path or not (session_path := Path(session_path)).exists():
raise ValueError('Session path does not exist')
tasks = OrderedDict()
acquisition_description = _load_acquisition_description(session_path)
devices = acquisition_description.get('devices', {})
kwargs = {'session_path': session_path, 'one': pkwargs.get('one')}
# Registers the experiment description file
tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw',
(bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
# Syncing tasks
(sync, sync_args), = acquisition_description['sync'].items()
sync_args = sync_args.copy() # ensure acquisition_description unchanged
sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task
sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments
sync_args['sync_ext'] = sync_args.pop('extension', None)
sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None)
sync_kwargs = {'sync': sync, **sync_args}
sync_tasks = []
if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data':
tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})(
**kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
sync_tasks = [tasks[f'SyncPulses_{sync}']]
elif sync_label == 'timeline':
tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
elif sync_label == 'nidq':
tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs)
tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})(
**kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
sync_tasks = [tasks[f'SyncPulses_{sync}']]
elif sync_label == 'tdms':
tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
elif sync_label == 'bpod':
pass # ATM we don't have anything for this; it may not be needed in the future
# Behavior tasks
tasks.update(
_get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one'))
)
# Ephys tasks
if 'neuropixel' in devices:
ephys_kwargs = {'device_collection': 'raw_ephys_data'}
tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs)
all_probes = []
register_tasks = []
for pname, probe_info in devices['neuropixel'].items():
# Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4
# extractions, however.
probe_collection = next(session_path.glob(probe_info['collection'] + '*'))
meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta')
meta_file = meta_file[0].get('ap')
nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file))
nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file))
if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1):
tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
**kwargs, **ephys_kwargs, pname=pname)
all_probes.append(pname)
register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
elif nptype == 'NP2.4' and nshanks > 1:
tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})(
**kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
register_tasks.append(tasks[f'EphyCompressNP24_{pname}'])
all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)]
else:
tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})(
**kwargs, **ephys_kwargs, pname=pname)
register_tasks.append(tasks[f'EphysCompressNP1_{pname}'])
all_probes.append(pname)
if nptype == '3A':
tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})(
**kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
for pname in all_probes:
register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name]
if nptype != '3A':
tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})(
**kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})(
**kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
else:
tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})(
**kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})(
**kwargs, **ephys_kwargs, pname=pname, parents=register_task)
# Video tasks
if 'cameras' in devices:
cams = list(devices['cameras'].keys())
subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')]
video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams}
video_compressed = sess_params.get_video_compressed(acquisition_description)
if video_compressed:
# This is for widefield case where the video is already compressed
tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs)
dlc_parent_task = tasks['VideoConvert']
tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})(
**kwargs, **video_kwargs, **sync_kwargs)
else:
tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})(
**kwargs, **video_kwargs)
tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})(
**kwargs, **video_kwargs, **sync_kwargs)
dlc_parent_task = tasks['VideoCompress']
if sync == 'bpod':
tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})(
**kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
elif sync == 'nidq':
# Here we restrict to videos that we support (left, right or body)
video_kwargs['cameras'] = subset_cams
tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})(
**kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
if sync_kwargs['sync'] != 'bpod':
# Here we restrict to videos that we support (left, right or body)
# Currently there is no plan to run DLC on the belly cam
subset_cams = [c for c in cams if c in ('left', 'right', 'body')]
video_kwargs['cameras'] = subset_cams
tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})(
**kwargs, **video_kwargs, parents=[dlc_parent_task])
# The PostDLC plots require a trials object for QC
# Find the first task that outputs a trials.table dataset
trials_task = (
t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', []))
)
if trials_task := next(trials_task, None):
parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task]
trials_collection = getattr(trials_task, 'output_collection', 'alf')
else:
parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]
trials_collection = 'alf'
tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})(
**kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents)
# Audio tasks
if 'microphone' in devices:
(microphone, micro_kwargs), = devices['microphone'].items()
micro_kwargs['device_collection'] = micro_kwargs.pop('collection')
if sync_kwargs['sync'] == 'bpod':
tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})(
**kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
elif sync_kwargs['sync'] == 'nidq':
tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs)
# Widefield tasks
if 'widefield' in devices:
(_, wfield_kwargs), = devices['widefield'].items()
wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection')
tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})(
**kwargs, **wfield_kwargs)
tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})(
**kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})(
**kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})(
**kwargs, **wfield_kwargs, **sync_kwargs,
parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})(
**kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
# Mesoscope tasks
if 'mesoscope' in devices:
(_, mscope_kwargs), = devices['mesoscope'].items()
mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection')
tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})(
**kwargs, **mscope_kwargs)
tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})(
**kwargs, **mscope_kwargs)
tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})(
**kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})(
**kwargs, **mscope_kwargs, **sync_kwargs)
tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})(
**kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
if 'neurophotometrics' in devices:
# {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000',
# 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1}
photometry_kwargs = devices['neurophotometrics']
tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', (
ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs)
p = mtasks.Pipeline(session_path=session_path, **pkwargs)
p.tasks = tasks
return p
[docs]
def make_pipeline_dict(pipeline, save=True):
task_dicts = pipeline.create_tasks_list_from_pipeline()
# TODO better name
if save:
with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
_ = yaml.dump(task_dicts, file)
return task_dicts
[docs]
def load_pipeline_dict(path):
with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file:
task_list = yaml.full_load(file)
return task_list