Source code for ibllib.pipes.dynamic_pipeline

"""Task pipeline creation from an acquisition description.

The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml`
file and determines the set of tasks required to preprocess the session.

In the experiment description file there is a 'tasks' key that defines each task protocol and the
location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors'
field that should contain a list of dynamic pipeline task class names for extracting the task data.
These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the
extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name.

NB: The standard behvaiour extraction task classes (e.g.
:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`)
handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod
trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod
trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor`
map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while
non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal
projects repo. The Bpod trials extractor class must be a subclass of the
:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the
personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module.
"""
import logging
import re
from fnmatch import fnmatch
from collections import OrderedDict
from pathlib import Path
from itertools import chain
import yaml

import spikeglx

import ibllib.io.raw_data_loaders as rawio
import ibllib.io.session_params as sess_params
import ibllib.pipes.tasks as mtasks
import ibllib.pipes.base_tasks as bstasks
import ibllib.pipes.widefield_tasks as wtasks
import ibllib.pipes.mesoscope_tasks as mscope_tasks
import ibllib.pipes.sync_tasks as stasks
import ibllib.pipes.behavior_tasks as btasks
import ibllib.pipes.video_tasks as vtasks
import ibllib.pipes.ephys_tasks as etasks
import ibllib.pipes.audio_tasks as atasks
import ibllib.pipes.neurophotometrics as ptasks

_logger = logging.getLogger(__name__)


[docs] def acquisition_description_legacy_session(session_path, save=False): """ From a legacy session create a dictionary corresponding to the acquisition description. Parameters ---------- session_path : str, pathlib.Path A path to a session to describe. save : bool If true, saves the acquisition description file to _ibl_experiment.description.yaml. Returns ------- dict The legacy acquisition description. """ settings = rawio.load_settings(session_path) protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN') dict_ad = get_acquisition_description(protocol) if save: sess_params.write_params(session_path=session_path, data=dict_ad) return dict_ad
[docs] def get_acquisition_description(protocol): """" This is a set of example acquisition descriptions for experiments - choice_world_recording - choice_world_biased - choice_world_training - choice_world_habituation - choice_world_passive That are part of the IBL pipeline """ if 'ephys' in protocol: # canonical ephys devices = { 'cameras': { 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, }, 'neuropixel': { 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} }, 'microphone': { 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} }, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'tasks': [ {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} ], 'sync': { 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} }, 'procedures': ['Ephys recording with acute probe(s)'], 'projects': ['ibl_neuropixel_brainwide_01'] } else: devices = { 'cameras': { 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, }, 'microphone': { 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} }, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'sync': {'bpod': {'collection': 'raw_behavior_data'}}, 'procedures': ['Behavior training/tasks'], 'projects': ['ibl_neuropixel_brainwide_01'] } if 'biased' in protocol: key = 'biasedChoiceWorld' elif 'training' in protocol: key = 'trainingChoiceWorld' elif 'habituation' in protocol: key = 'habituationChoiceWorld' else: raise ValueError(f'Unknown protocol "{protocol}"') acquisition_description['tasks'] = [{key: { 'collection': 'raw_behavior_data', 'sync_label': 'bpod' }}] acquisition_description['version'] = '1.0.0' return acquisition_description
def _sync_label(sync, acquisition_software=None, **_): """ Returns the sync label based on the sync type and acquisition software. The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'. The 'acquisition_software' refers to the software used to acquire the data, e.g. for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect how the data are loaded and extracted, and therefore which tasks to use. The naming convention here is not ideal, and may be changed in the future. Parameters ---------- sync : str The sync type, e.g. 'nidq', 'tdms', 'bpod'. acquisition_software : str The acquisition software used to acquire the sync data. Returns ------- str The sync label for determining the extractor tasks. """ return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync def _load_acquisition_description(session_path): """ Load a session's acquisition description. Attempts to load from the session path and upon failure, attempts to generate one based on the task protocol (this only works for legacy pipeline sessions). Parameters ---------- session_path : str, pathlib.Path A session path. Returns ------- dict The acquisition description file. """ acquisition_description = sess_params.read_params(session_path) if not acquisition_description: try: # v7 sessions used a different folder name for task data; # v8 sessions should always have a description file assert session_path.joinpath('raw_behavior_data').exists() acquisition_description = acquisition_description_legacy_session(session_path) assert acquisition_description except (AssertionError, ValueError): raise ValueError('Experiment description file not found or is empty') return acquisition_description def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None): """ Generate behaviour tasks from acquisition description. This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects. Parameters ---------- session_path : str, pathlib.Path A session path. acquisition_description : dict An acquisition description. sync_tasks : list A list of sync tasks to use as behaviour task parents. one : One An instance of ONE to pass to each task. Returns ------- dict[str, ibllib.pipes.tasks.Task] A map of Alyx task name to behaviour task object. """ if not acquisition_description: acquisition_description = _load_acquisition_description(session_path) tasks = OrderedDict() sync_tasks = sync_tasks or [] kwargs = {'session_path': session_path, 'one': one} # Syncing tasks (sync, sync_args), = acquisition_description['sync'].items() sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments sync_args['sync_ext'] = sync_args.pop('extension', None) sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) sync_kwargs = {'sync': sync, **sync_args} # Behavior tasks task_protocols = acquisition_description.get('tasks', []) for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): collection = task_info.get('collection', f'raw_task_data_{i:02}') task_kwargs = {'protocol': protocol, 'collection': collection} # For now the order of protocols in the list will take precedence. If collections are numbered, # check that the numbers match the order. This may change in the future. if re.match(r'^raw_task_data_\d{2}$', collection): task_kwargs['protocol_number'] = i if int(collection.split('_')[-1]) != i: _logger.warning('Number in collection name does not match task order') if extractors := task_info.get('extractors', False): extractors = (extractors,) if isinstance(extractors, str) else extractors task_name = None # to avoid unbound variable issue in the first round for j, extractor in enumerate(extractors): # Assume previous task in the list is a parent parents = [] if j == 0 else [tasks[task_name]] # Make sure extractor and sync task don't collide for sync_option in ('nidq', 'bpod', 'timeline'): if sync_option in extractor.lower() and not sync_label == sync_option: raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') # Look for the extractor in the behavior extractors module if hasattr(btasks, extractor): task = getattr(btasks, extractor) # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example elif hasattr(btasks, extractor + sync_label.capitalize()): task = getattr(btasks, extractor + sync_label.capitalize()) else: # lookup in the project extraction repo if we find an extractor class import projects.extraction_tasks if hasattr(projects.extraction_tasks, extractor): task = getattr(projects.extraction_tasks, extractor) elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): task = getattr(btasks, extractor + sync_label.capitalize()) else: raise NotImplementedError( f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') _logger.debug('%s (protocol #%i, task #%i) = %s.%s', protocol, i, j, task.__module__, task.__name__) # Rename the class to something more informative task_name = f'{task.__name__}_{i:02}' if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): task_name = f'Trials_{task_name}' # For now we assume that the second task in the list is always the trials extractor, which is dependent # on the sync task and sync arguments if j == 1: tasks[task_name] = type(task_name, (task,), {})( **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks ) else: tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) # For the next task, we assume that the previous task is the parent else: # Legacy block to handle sessions without defined extractors # - choice_world_recording # - choice_world_biased # - choice_world_training # - choice_world_habituation if 'passiveChoiceWorld' in protocol: registration_class = btasks.PassiveRegisterRaw try: behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) except AttributeError: raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"') compute_status = False elif 'habituation' in protocol: registration_class = btasks.HabituationRegisterRaw behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) compute_status = False else: registration_class = btasks.TrialRegisterRaw try: behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) except AttributeError: raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') compute_status = True tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( **kwargs, **task_kwargs) parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( **kwargs, **sync_kwargs, **task_kwargs, parents=parents) if compute_status: tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) return tasks
[docs] def get_trials_tasks(session_path, one=None, bpod_only=False): """ Return a list of pipeline trials extractor task objects for a given session. This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned for both recent and legacy sessions. Only Trials tasks are returned, not the training status or raw registration tasks. Parameters ---------- session_path : str, pathlib.Path An absolute path to a session. one : one.api.One An ONE instance. bpod_only : bool If true, extract trials from Bpod clock instead of the main DAQ's. Returns ------- list of pipes.tasks.Task A list of task objects for the provided session. Examples -------- Return the tasks for active choice world extraction >>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path))) """ # Check for an experiment.description file; ensure downloaded if possible if one and one.to_eid(session_path): # to_eid returns None if session not registered one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) acquisition_description = _load_acquisition_description(session_path) if bpod_only and acquisition_description: acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}} try: trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one) return [v for k, v in trials_tasks.items() if k.startswith('Trials_')] except NotImplementedError as ex: _logger.warning('Failed to get trials tasks: %s', ex) return []
[docs] def is_active_trials_task(task) -> bool: """ Check if task is for active choice world extraction. Parameters ---------- task : ibllib.pipes.tasks.Task A task instance to test. Returns ------- bool True if the task name starts with 'Trials_' and outputs a trials.table dataset. """ trials_task = task.name.lower().startswith('trials_') output_names = [x[0] for x in task.signature.get('output_files', [])] return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names)
[docs] def make_pipeline(session_path, **pkwargs): """ Creates a pipeline of extractor tasks from a session's experiment description file. Parameters ---------- session_path : str, Path The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'. pkwargs Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor. Returns ------- ibllib.pipes.tasks.Pipeline A task pipeline object. """ # NB: this pattern is a pattern for dynamic class creation # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) if not session_path or not (session_path := Path(session_path)).exists(): raise ValueError('Session path does not exist') tasks = OrderedDict() acquisition_description = _load_acquisition_description(session_path) devices = acquisition_description.get('devices', {}) kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} # Registers the experiment description file tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) # Syncing tasks (sync, sync_args), = acquisition_description['sync'].items() sync_args = sync_args.copy() # ensure acquisition_description unchanged sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments sync_args['sync_ext'] = sync_args.pop('extension', None) sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) sync_kwargs = {'sync': sync, **sync_args} sync_tasks = [] if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync_label == 'timeline': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) elif sync_label == 'nidq': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync_label == 'tdms': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) elif sync_label == 'bpod': pass # ATM we don't have anything for this; it may not be needed in the future # Behavior tasks tasks.update( _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one')) ) # Ephys tasks if 'neuropixel' in devices: ephys_kwargs = {'device_collection': 'raw_ephys_data'} tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) all_probes = [] register_tasks = [] for pname, probe_info in devices['neuropixel'].items(): # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4 # extractions, however. probe_collection = next(session_path.glob(probe_info['collection'] + '*')) meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') meta_file = meta_file[0].get('ap') nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( **kwargs, **ephys_kwargs, pname=pname) all_probes.append(pname) register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) elif nptype == 'NP2.4' and nshanks > 1: tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] else: tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( **kwargs, **ephys_kwargs, pname=pname) register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) all_probes.append(pname) if nptype == '3A': tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) for pname in all_probes: register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] if nptype != '3A': tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) else: tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=register_task) # Video tasks if 'cameras' in devices: cams = list(devices['cameras'].keys()) subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} video_compressed = sess_params.get_video_compressed(acquisition_description) if video_compressed: # This is for widefield case where the video is already compressed tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) dlc_parent_task = tasks['VideoConvert'] tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( **kwargs, **video_kwargs, **sync_kwargs) else: tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( **kwargs, **video_kwargs) tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( **kwargs, **video_kwargs, **sync_kwargs) dlc_parent_task = tasks['VideoCompress'] if sync == 'bpod': tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) elif sync == 'nidq': # Here we restrict to videos that we support (left, right or body) video_kwargs['cameras'] = subset_cams tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) if sync_kwargs['sync'] != 'bpod': # Here we restrict to videos that we support (left, right or body) # Currently there is no plan to run DLC on the belly cam subset_cams = [c for c in cams if c in ('left', 'right', 'body')] video_kwargs['cameras'] = subset_cams tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( **kwargs, **video_kwargs, parents=[dlc_parent_task]) # The PostDLC plots require a trials object for QC # Find the first task that outputs a trials.table dataset trials_task = ( t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', [])) ) if trials_task := next(trials_task, None): parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] trials_collection = getattr(trials_task, 'output_collection', 'alf') else: parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] trials_collection = 'alf' tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents) # Audio tasks if 'microphone' in devices: (microphone, micro_kwargs), = devices['microphone'].items() micro_kwargs['device_collection'] = micro_kwargs.pop('collection') if sync_kwargs['sync'] == 'bpod': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) elif sync_kwargs['sync'] == 'nidq': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) # Widefield tasks if 'widefield' in devices: (_, wfield_kwargs), = devices['widefield'].items() wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( **kwargs, **wfield_kwargs) tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( **kwargs, **wfield_kwargs, **sync_kwargs, parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) # Mesoscope tasks if 'mesoscope' in devices: (_, mscope_kwargs), = devices['mesoscope'].items() mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( **kwargs, **mscope_kwargs) tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( **kwargs, **mscope_kwargs) tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( **kwargs, **mscope_kwargs, **sync_kwargs) tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) if 'neurophotometrics' in devices: # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000', # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1} photometry_kwargs = devices['neurophotometrics'] tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs) p = mtasks.Pipeline(session_path=session_path, **pkwargs) p.tasks = tasks return p
[docs] def make_pipeline_dict(pipeline, save=True): task_dicts = pipeline.create_tasks_list_from_pipeline() # TODO better name if save: with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: _ = yaml.dump(task_dicts, file) return task_dicts
[docs] def load_pipeline_dict(path): with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: task_list = yaml.full_load(file) return task_list