Source code for ibllib.pipes.base_tasks

from ibllib.pipes.tasks import Task
import ibllib.io.session_params as sess_params
import logging

_logger = logging.getLogger(__name__)


[docs]class DynamicTask(Task): def __init__(self, session_path, **kwargs): super().__init__(session_path, **kwargs) self.session_params = self.read_params_file() # TODO Which should be default? # Sync collection self.sync_collection = self.get_sync_collection(kwargs.get('sync_collection', None)) # Sync type self.sync = self.get_sync(kwargs.get('sync', None)) # Sync extension self.sync_ext = self.get_sync_extension(kwargs.get('sync_ext', None)) # Sync namespace self.sync_namespace = self.get_sync_namespace(kwargs.get('sync_namespace', None))
[docs] def get_sync_collection(self, sync_collection=None): return sync_collection if sync_collection else sess_params.get_sync_collection(self.session_params)
[docs] def get_sync(self, sync=None): return sync if sync else sess_params.get_sync(self.session_params)
[docs] def get_sync_extension(self, sync_ext=None): return sync_ext if sync_ext else sess_params.get_sync_extension(self.session_params)
[docs] def get_sync_namespace(self, sync_namespace=None): return sync_namespace if sync_namespace else sess_params.get_sync_namespace(self.session_params)
[docs] def get_protocol(self, protocol=None, task_collection=None): return protocol if protocol else sess_params.get_task_protocol(self.session_params, task_collection)
[docs] def get_task_collection(self, collection=None): if not collection: collection = sess_params.get_task_collection(self.session_params) # If inferring the collection from the experiment description, assert only one returned assert collection is None or isinstance(collection, str) or len(collection) == 1 return collection
[docs] def get_device_collection(self, device, device_collection=None): return device_collection if device_collection else sess_params.get_device_collection(self.session_params, device)
[docs] def read_params_file(self): params = sess_params.read_params(self.session_path) if params is None: return {} # TODO figure out the best way # if params is None and self.one: # # Try to read params from alyx or try to download params file # params = self.one.load_dataset(self.one.path2eid(self.session_path), 'params.yml') # params = self.one.alyx.rest() return params
[docs]class VideoTask(DynamicTask): def __init__(self, session_path, cameras, **kwargs): super().__init__(session_path, cameras=cameras, **kwargs) self.cameras = cameras self.device_collection = self.get_device_collection('cameras', kwargs.get('device_collection', 'raw_video_data'))
[docs]class AudioTask(DynamicTask): def __init__(self, session_path, **kwargs): super().__init__(session_path, **kwargs) self.device_collection = self.get_device_collection('microphone', kwargs.get('device_collection', 'raw_behavior_data'))
[docs]class EphysTask(DynamicTask): def __init__(self, session_path, **kwargs): super().__init__(session_path, **kwargs) self.pname = self.get_pname(kwargs.get('pname', None)) self.nshanks, self.pextra = self.get_nshanks(kwargs.get('nshanks', None)) self.device_collection = self.get_device_collection('neuropixel', kwargs.get('device_collection', 'raw_ephys_data'))
[docs] def get_pname(self, pname): # pname can be a list or a string pname = self.kwargs.get('pname', pname) return pname
[docs] def get_nshanks(self, nshanks=None): nshanks = self.kwargs.get('nshanks', nshanks) if nshanks is not None: pextra = [chr(97 + int(shank)) for shank in range(nshanks)] else: pextra = [] return nshanks, pextra
[docs]class WidefieldTask(DynamicTask): def __init__(self, session_path, **kwargs): super().__init__(session_path, **kwargs) self.device_collection = self.get_device_collection('widefield', kwargs.get('device_collection', 'raw_widefield_data'))
[docs]class RegisterRawDataTask(DynamicTask): # TODO write test """ Base register raw task. To rename files 1. input and output must have the same length 2. output files must have full filename """ priority = 100 job_size = 'small'
[docs] def rename_files(self, symlink_old=False, **kwargs): # If no inputs are given, we don't do any renaming if len(self.input_files) == 0: return # Otherwise we need to make sure there is one to one correspondence for renaming files assert len(self.input_files) == len(self.output_files) for before, after in zip(self.input_files, self.output_files): old_file, old_collection, required = before old_path = self.session_path.joinpath(old_collection).glob(old_file) old_path = next(old_path, None) # if the file doesn't exist and it is not required we are okay to continue if not old_path and not required: continue new_file, new_collection, _ = after new_path = self.session_path.joinpath(new_collection, new_file) new_path.parent.mkdir(parents=True, exist_ok=True) old_path.replace(new_path) if symlink_old: old_path.symlink_to(new_path)
def _run(self, **kwargs): self.rename_files(**kwargs) out_files = [] n_required = 0 for file_sig in self.output_files: file_name, collection, required = file_sig n_required += required file_path = self.session_path.joinpath(collection).glob(file_name) file_path = next(file_path, None) if not file_path and not required: continue elif not file_path and required: _logger.error(f'expected {file_sig} missing') else: out_files.append(file_path) if len(out_files) < n_required: self.status = -1 return out_files
[docs]class ExperimentDescriptionRegisterRaw(RegisterRawDataTask): @property def signature(self): signature = { 'input_files': [], 'output_files': [('*experiment.description.yaml', '', True)] } return signature