Source code for ibllib.pipes.tasks

"""The abstract Pipeline and Task superclasses and concrete task runner.

Examples
--------

1. Running a task on your local computer.
|  Download: via ONE.
|  Upload: N/A.

>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
>>> task.run()

2. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab).
|  Download: all data expected to be present.
|  Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
   jobs on Alyx transfer the data.

>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))

3. Running a task on the local server that belongs to that subject and forcing redownload of
missing data.
|  Download: via Globus (TODO we should change this to use boto3 as globus is slow).
|  Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
   jobs on Alyx transfer the data.

>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.force = True
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
>>> task.cleanUp()  # Delete the files that have been downloaded

4. Running a task on the local server that doesn't belongs to that subject
(e.g SWC054 on angelakilab).
|  Download: via boto3, the AWS file records must exist and be set to exists = True.
|  Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.
   Creates FlatIron filerecords and sets these to True once the globus task has completed.

>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
>>> task.run()
>>> task.register_datasets()
>>> task.cleanUp()  # Delete the files that have been downloaded

5. Running a task on SDSC.
|  Download: via creating symlink to relevant datasets on SDSC.
|  Upload: via copying files to relevant location on SDSC.

>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
>>> task.run()
>>> response = task.register_datasets()
>>> # Here we just make sure filerecords are all correct
>>> for resp in response:
...    fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
...    if fi is not None:
...        if not fi['exists']:
...            one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
...
...     aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
...     if aws is not None:
...         one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
...
...     sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
...     if sr is not None:
...         one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
... # Finally remove symlinks once the task has completed
... task.cleanUp()

"""
from pathlib import Path
import abc
import logging
import io
import importlib
import time
from collections import OrderedDict
import traceback
import json
from typing import List, Dict

from graphviz import Digraph

import ibllib
from ibllib.oneibl import data_handlers
from ibllib.oneibl.data_handlers import get_local_data_repository
from ibllib.oneibl.registration import get_lab
from iblutil.util import Bunch, flatten, ensure_list
import one.params
from one.api import ONE
from one import webclient
import one.alf.io as alfio

_logger = logging.getLogger(__name__)
TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'}


[docs] class Task(abc.ABC): log = '' # placeholder to keep the log of the task for registration cpu = 1 # CPU resource gpu = 0 # GPU resources: as of now, either 0 or 1 io_charge = 5 # integer percentage priority = 30 # integer percentage, 100 means highest priority ram = 4 # RAM needed to run (GB) one = None # one instance (optional) level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task outputs = None # placeholder for a list of Path containing output files time_elapsed_secs = None time_out_secs = 3600 * 2 # time-out after which a task is considered dead version = ibllib.__version__ force = False # whether to re-download missing input files on local server if not present job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services env = None # the environment name within which to run the task (NB: the env is not activated automatically!) def __init__(self, session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', scratch_folder=None, **kwargs): """ Base task class :param session_path: session path :param parents: parents :param taskid: alyx task id :param one: one instance :param machine: :param clobber: whether or not to overwrite log on rerun :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node, data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS), or 'SDSC' (SDSC flatiron compute node) :param scratch_folder: optional: Path where to write intermediate temporary data :param args: running arguments """ self.taskid = taskid self.one = one self.session_path = session_path self.register_kwargs = {} if parents: self.parents = parents self.level = max(p.level for p in self.parents) + 1 else: self.parents = [] self.machine = machine self.clobber = clobber self.location = location self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task self.scratch_folder = scratch_folder self.kwargs = kwargs @property def signature(self) -> Dict[str, List]: """ The signature of the task specifies inputs and outputs for the given task. For some tasks it is dynamic and calculated. The legacy code specifies those as tuples. The preferred way is to use the ExpectedDataset input and output constructors. I = ExpectedDataset.input O = ExpectedDataset.output signature = { 'input_files': [ I(name='extract.me.npy', collection='raw_data', required=True, register=False, unique=False), ], 'output_files': [ O(name='look.atme.npy', collection='shiny_data', required=True, register=True, unique=False) ]} is equivalent to: signature = { 'input_files': [('extract.me.npy', 'raw_data', True, True)], 'output_files': [('look.atme.npy', 'shiny_data', True)], } :return: """ return {'input_files': [], 'output_files': []} @property def name(self): return self.__class__.__name__
[docs] def path2eid(self): """ Fetch the experiment UUID from the Task session path, without using the REST cache. This method ensures that the eid will be returned for newly created sessions. Returns ------- str The experiment UUID corresponding to the session path. """ assert self.session_path and self.one and not self.one.offline with webclient.no_cache(self.one.alyx): return self.one.path2eid(self.session_path, query_type='remote')
[docs] def run(self, **kwargs): """ --- do not overload, see _run() below--- wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as: 0: Complete -1: Errored -2: Didn't run as a lock was encountered -3: Incomplete Notes ----- - The `run_alyx_task` will update the Alyx Task status depending on both status and outputs (i.e. the output of subclassed `_run` method): Assuming a return value of 0... if Task.outputs is None, the status will be Empty; if Task.outputs is a list (empty or otherwise), the status will be Complete. """ # if task id of one properties are not available, local run only without alyx use_alyx = self.one is not None and self.taskid is not None if use_alyx: # check that alyx user is logged in if not self.one.alyx.is_logged_in: self.one.alyx.authenticate() tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, data={'status': 'Started'}) self.log = ('' if not tdict['log'] else tdict['log'] + '\n\n=============================RERUN=============================\n') # Setup the console handler with a StringIO object logger_level = _logger.level log_capture_string = io.StringIO() ch = logging.StreamHandler(log_capture_string) str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' ch.setFormatter(logging.Formatter(str_format)) _logger.parent.addHandler(ch) _logger.parent.setLevel(logging.INFO) _logger.info(f'Starting job {self.__class__}') if self.machine: _logger.info(f'Running on machine: {self.machine}') _logger.info(f'running ibllib version {ibllib.__version__}') # setup start_time = time.time() try: setup = self.setUp(**kwargs) self.outputs = self._input_files_to_register() _logger.info(f'Setup value is: {setup}') self.status = 0 if not setup: # case where outputs are present but don't have input files locally to rerun task # label task as complete _, outputs = self.assert_expected_outputs() else: # run task if self.gpu >= 1: if not self._creates_lock(): self.status = -2 _logger.info(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}') new_log = log_capture_string.getvalue() self.log = new_log if self.clobber else self.log + new_log _logger.removeHandler(ch) ch.close() return self.status outputs = self._run(**kwargs) _logger.info(f'Job {self.__class__} complete') if outputs is None: # If run method returns None and no raw input files were registered, self.outputs # should be None, meaning task will have an 'Empty' status. If run method returns # a list, the status will be 'Complete' regardless of whether there are output files. self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered else: self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register except Exception: _logger.error(traceback.format_exc()) _logger.info(f'Job {self.__class__} errored') self.status = -1 self.time_elapsed_secs = time.time() - start_time # log the outputs if isinstance(self.outputs, list): nout = len(self.outputs) elif self.outputs is None: nout = 0 else: nout = 1 _logger.info(f'N outputs: {nout}') _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') # after the run, capture the log output, amend to any existing logs if not overwrite new_log = log_capture_string.getvalue() self.log = new_log if self.clobber else self.log + new_log _logger.removeHandler(ch) ch.close() _logger.setLevel(logger_level) # tear down self.tearDown() return self.status
[docs] def register_datasets(self, **kwargs): """ Register output datasets from the task to Alyx. Parameters ---------- kwargs Directly passed to the `DataHandler.upload_data` method. Returns ------- list The output of the `DataHandler.upload_data` method, e.g. a list of registered datasets. """ _ = self.register_images() return self.data_handler.uploadData(self.outputs, self.version, **kwargs)
def _input_files_to_register(self, assert_all_exist=False): """ Return input datasets to be registered to Alyx. These datasets are typically raw data files and are registered even if the task fails to complete. Parameters ---------- assert_all_exist Raise AssertionError if not all required input datasets exist on disk. Returns ------- list of pathlib.Path A list of input files to register. # TODO This method currently does not support wildcards """ I = data_handlers.ExpectedDataset.input # noqa try: # Ensure all input files are ExpectedDataset instances input_files = [I(*i) if isinstance(i, tuple) else i for i in self.input_files or []] except AttributeError: raise RuntimeError('Task.setUp must be run before calling this method.') to_register, missing = [], set() for dataset in input_files: ok, filepaths, _missing = dataset.find_files(self.session_path, register=True) to_register.extend(filepaths) if not ok and _missing is not None: missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) if any(missing): # NB: These are either glob patterns that have no matches or match files that should be absent missing_str = ', '.join(missing) if assert_all_exist: raise AssertionError(f'Missing required input files: {missing_str}') else: _logger.error(f'Missing required input files: {missing_str}') return to_register
[docs] def register_images(self, **kwargs): """ Registers images to alyx database :return: """ if self.one and len(self.plot_tasks) > 0: for plot_task in self.plot_tasks: try: _ = plot_task.register_images(widths=['orig']) except Exception: _logger.error(traceback.format_exc()) continue
[docs] def rerun(self): self.run(overwrite=True)
[docs] def get_signatures(self, **kwargs): """ This is the default but should be overwritten for each task :return: """ signature = data_handlers._parse_signature(self.signature) self.input_files = signature['input_files'] self.output_files = signature['output_files']
@abc.abstractmethod def _run(self, overwrite=False): """Execute main task code. This method contains a task's principal data processing code and should implemented by each subclass. Parameters ---------- overwrite : bool When false (default), the task may re-use any intermediate data from a previous run. When true, the task will disregard or delete any intermediate files before running. Returns ------- pathlib.Path, list of pathlib.Path, None One or more files to be registered, or an empty list if no files are registered. Within the pipeline, there is a distinction between a job that returns an empty list and a job that returns None. If the function returns None, the job will be labeled as "empty" status in the database, otherwise, the job has an expected behaviour of not returning any dataset. """
[docs] def setUp(self, **kwargs): """Get the data handler and ensure all data is available locally to run task.""" if self.location == 'server': self.get_signatures(**kwargs) input_status, _ = self.assert_expected_inputs(raise_error=False) output_status, _ = self.assert_expected(self.output_files, silent=True) if input_status: self.data_handler = self.get_data_handler() _logger.info('All input files found: running task') return True if not self.force: self.data_handler = self.get_data_handler() _logger.warning('Not all input files found locally: will still attempt to rerun task') # TODO in the future once we are sure that input output task signatures work properly should return False # _logger.info('All output files found but input files required not available locally: task not rerun') return True else: # Attempts to download missing data using globus _logger.info('Not all input files found locally: attempting to re-download required files') self.data_handler = self.get_data_handler(location='serverglobus') self.data_handler.setUp(task=self) # Double check we now have the required files to run the task # TODO in future should raise error if even after downloading don't have the correct files self.assert_expected_inputs(raise_error=False) return True else: self.data_handler = self.get_data_handler() self.data_handler.setUp(task=self) self.get_signatures(**kwargs) self.assert_expected_inputs(raise_error=False) return True
[docs] def tearDown(self): """ Function after runs() Does not run if a lock is encountered by the task (status -2) """ if self.gpu >= 1: if self._lock_file_path().exists(): self._lock_file_path().unlink()
[docs] def cleanUp(self): """ Function to optionally overload to clean up :return: """ self.data_handler.cleanUp(task=self)
[docs] def assert_expected_outputs(self, raise_error=True): """ After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return: """ assert self.status == 0 _logger.info('Checking output files') everything_is_fine, files = self.assert_expected(self.output_files) if not everything_is_fine: for out in self.outputs: _logger.error(f'{out}') if raise_error: raise FileNotFoundError('Missing outputs after task completion') return everything_is_fine, files
[docs] def assert_expected_inputs(self, raise_error=True, raise_ambiguous=False): """ Check that all the files necessary to run the task have been are present on disk. Parameters ---------- raise_error : bool If true, raise FileNotFoundError if required files are missing. Returns ------- bool True if all required files were found. list of pathlib.Path A list of file paths that exist on disk. """ _logger.info('Checking input files') everything_is_fine, files = self.assert_expected(self.input_files) if not everything_is_fine and raise_error: raise FileNotFoundError('Missing inputs to run task') # Check for duplicate datasets that may complicate extraction. # Some sessions may contain revisions and without ONE it's difficult to determine which # are the default datasets. Likewise SDSC may contain multiple datasets with different # UUIDs in the name after patching data. variant_datasets = alfio.find_variants(files, extra=False) if any(map(len, variant_datasets.values())): # Keep those with variants and make paths relative to session for logging purposes to_frag = lambda x: x.relative_to(self.session_path).as_posix() # noqa ambiguous = { to_frag(k): [to_frag(x) for x in v] for k, v in variant_datasets.items() if any(v)} _logger.error('Ambiguous input datasets found: %s', ambiguous) if raise_ambiguous or self.location == 'sdsc': # take no chances on SDSC # This could be mitigated if loading with data OneSDSC raise NotImplementedError( 'Multiple variant datasets found. Loading for these is undefined.') return everything_is_fine, files
[docs] def assert_expected(self, expected_files, silent=False): """ Assert that expected files are present. Parameters ---------- expected_files : list of ExpectedDataset A list of expected files in the form (file_pattern_str, collection_str, required_bool). silent : bool If true, log an error if any required files are not found. Returns ------- bool True if all required files were found. list of pathlib.Path A list of file paths that exist on disk. """ if not any(expected_files): return True, [] ok, actual_files, missing = zip(*(x.find_files(self.session_path) for x in expected_files)) everything_is_fine = all(ok) # For unknown reasons only the first file of each expected dataset was returned and this # behaviour was preserved after refactoring the code files = [file_list[0] for file_list in actual_files if len(file_list) > 0] if not everything_is_fine and not silent: for missing_pattern in filter(None, flatten(missing)): _logger.error('Signature file pattern %s not found', missing_pattern) return everything_is_fine, files
[docs] def get_data_handler(self, location=None): """ Gets the relevant data handler based on location argument :return: """ location = str.lower(location or self.location) if location == 'local': return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) self.one = self.one or ONE() if location == 'server': dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) elif location == 'serverglobus': dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one) elif location == 'remote': dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one) elif location == 'aws': dhandler = data_handlers.RemoteAwsDataHandler(self.session_path, self.signature, one=self.one) elif location == 'sdsc': dhandler = data_handlers.SDSCDataHandler(self.session_path, self.signature, one=self.one) elif location == 'popeye': dhandler = data_handlers.PopeyeDataHandler(self.session_path, self.signature, one=self.one) elif location == 'ec2': dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one) else: raise ValueError(f'Unknown location "{location}"') return dhandler
[docs] @staticmethod def make_lock_file(taskname='', time_out_secs=7200): """Creates a GPU lock file with a timeout of""" d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} with open(Task._lock_file_path(), 'w+') as fid: json.dump(d, fid) return d
@staticmethod def _lock_file_path(): """the lock file is in ~/.one/gpu.lock""" folder = Path.home().joinpath('.one') folder.mkdir(exist_ok=True) return folder.joinpath('gpu.lock') def _make_lock_file(self): """creates a lock file with the current time""" return Task.make_lock_file(self.name, self.time_out_secs)
[docs] def is_locked(self): """Checks if there is a lock file for this given task""" lock_file = self._lock_file_path() if not lock_file.exists(): return False with open(lock_file) as fid: d = json.load(fid) now = time.time() if (now - d['start']) > d['time_out_secs']: lock_file.unlink() return False else: return True
def _creates_lock(self): if self.location == "popeye": return True if self.is_locked(): return False else: self._make_lock_file() return True
[docs] class Pipeline(abc.ABC): """ Pipeline class: collection of related and potentially interdependent tasks """ tasks = OrderedDict() one = None def __init__(self, session_path=None, one=None, eid=None): assert session_path or eid self.one = one if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: _logger.warning('Alyx client REST cache active; this may cause issues with jobs') self.eid = eid if self.one and not self.one.offline: self.data_repo = get_local_data_repository(self.one.alyx) else: self.data_repo = None if session_path: self.session_path = session_path if not self.eid: # eID for newer sessions may not be in cache so use remote query self.eid = one.path2eid(session_path, query_type='remote') if self.one else None self.label = self.__module__ + '.' + type(self).__name__ @staticmethod def _get_exec_name(obj): """ For a class, get the executable name as it should be stored in Alyx. When the class is created dynamically using the type() built-in function, need to revert to the base class to be able to re-instantiate the class from the alyx dictionary on the client side :param obj: :return: string containing the full module plus class name """ if obj.__module__ == 'abc': exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' else: exec_name = f'{obj.__module__}.{obj.name}' return exec_name
[docs] def make_graph(self, out_dir=None, show=True): if not out_dir: out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) m.attr(rankdir='TD') e = Digraph(name='cluster_' + self.label) e.attr('node', shape='box') e.node('root', label=self.label) e.attr('node', shape='ellipse') for k in self.tasks: j = self.tasks[k] if len(j.parents) == 0: e.edge('root', j.name) else: [e.edge(p.name, j.name) for p in j.parents] m.subgraph(e) m.attr(label=r'\n\Pre-processing\n') m.attr(fontsize='20') if show: m.view() return m
[docs] def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None): """ Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session. If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting. Parameters ---------- rerun__status__in : list, str To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or '__all__' to re-run all tasks. tasks_list : list The list of tasks to create on Alyx. If None, uses self.tasks. Returns ------- list List of Alyx task dictionaries (existing and/or created). """ rerun__status__in = ([rerun__status__in] if isinstance(rerun__status__in, str) else rerun__status__in or []) if '__all__' in rerun__status__in: rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned'] assert self.eid if self.one is None: _logger.warning('No ONE instance found for Alyx connection, set the one property') return tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) tasks_alyx = [] # creates all the tasks by iterating through the ordered dict if tasks_list is not None: task_items = tasks_list # need to add in the session eid and the parents else: task_items = self.tasks.values() for t in task_items: # get the parents' alyx ids to reference in the database if isinstance(t, dict): t = Bunch(t) executable = t.executable arguments = t.arguments t['time_out_secs'] = t['time_out_sec'] if len(t.parents) > 0: pnames = t.parents else: executable = self._get_exec_name(t) arguments = t.kwargs if len(t.parents): pnames = [p.name for p in t.parents] if len(t.parents): parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] else: parents_ids = [] task_dict = {'executable': executable, 'priority': t.priority, 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 'ram': t.ram, 'module': self.label, 'parents': parents_ids, 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 'arguments': arguments} if self.data_repo: task_dict.update({'data_repository': self.data_repo}) # if the task already exists, patch it otherwise, create it talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) if len(talyx) == 0: talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) elif talyx['status'] in rerun__status__in: talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict) tasks_alyx.append(talyx) return tasks_alyx
[docs] def create_tasks_list_from_pipeline(self): """ From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return: """ tasks_list = [] for k, t in self.tasks.items(): # get the parents' alyx ids to reference in the database if len(t.parents): parent_names = [p.name for p in t.parents] else: parent_names = [] task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 'ram': t.ram, 'module': self.label, 'parents': parent_names, 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 'arguments': t.kwargs} if self.data_repo: task_dict.update({'data_repository': self.data_repo}) tasks_list.append(task_dict) return tasks_list
[docs] def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs): """ Get all the session related jobs from alyx and run them :param status__in: lists of status strings to run in ['Waiting', 'Started', 'Errored', 'Empty', 'Complete'] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints """ assert self.session_path, 'Pipeline object has to be declared with a session path to run' if self.one is None: _logger.warning('No ONE instance found for Alyx connection, set the one property') return task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) # [(t['name'], t['level']) for t in task_deck] all_datasets = [] for i, j in enumerate(task_deck): if j['status'] not in status__in: continue # here we update the status in-place to avoid another hit to the database task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, one=self.one, job_deck=task_deck, machine=machine, clobber=clobber, **kwargs) if dsets is not None: all_datasets.extend(dsets) return task_deck, all_datasets
[docs] def rerun_failed(self, **kwargs): return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs)
[docs] def rerun(self, **kwargs): return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs)
@property def name(self): return self.__class__.__name__
[docs] def str2class(task_executable: str): """ Convert task name to class. Parameters ---------- task_executable : str A Task class name, e.g. 'ibllib.pipes.behavior_tasks.TrialRegisterRaw'. Returns ------- class The imported class. """ strmodule, strclass = task_executable.rsplit('.', 1) return getattr(importlib.import_module(strmodule), strclass)
[docs] def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log'): """ Runs a single Alyx job and registers output datasets. Parameters ---------- tdict : dict An Alyx task dictionary to instantiate and run. session_path : str, pathlib.Path A session path containing the task input data. one : one.api.OneAlyx An instance of ONE. job_deck : list of dict, optional A list of all tasks in the same pipeline. If None, queries Alyx to get this. max_md5_size : int, optional An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time. machine : str, optional A string identifying the machine the task is run on. clobber : bool, default=True If true any existing logs are overwritten on Alyx. location : {'remote', 'server', 'sdsc', 'aws'} Where you are running the task, 'server' - local lab server, 'remote' - any compute node/ computer, 'sdsc' - Flatiron compute node, 'aws' - using data from AWS S3 node. mode : {'log', 'raise}, default='log' Behaviour to adopt if an error occurred. If 'raise', it will raise the error at the very end of this function (i.e. after having labeled the tasks). Returns ------- dict The updated task dict. list of pathlib.Path A list of registered datasets. """ registered_dsets = [] # here we need to check parents' status, get the job_deck if not available if not job_deck: job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) if len(tdict['parents']): # check the dependencies parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) parent_statuses = [j['status'] for j in parent_tasks] # if any of the parent tasks is not complete, throw a warning if not set(parent_statuses) <= {'Complete', 'Incomplete'}: _logger.warning(f"{tdict['name']} has unmet dependencies") # if parents are waiting or failed, set the current task status to Held # once the parents ran, the descendent tasks will be set from Held to Waiting (see below) if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) return tdict, registered_dsets # creates the job from the module name in the database classe = str2class(tdict['executable']) tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, location=location, **tkwargs) # sets the status flag to started before running one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) status = task.run() patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 'version': task.version} # if there is no data to register, set status to Empty if task.outputs is None: # NB: an empty list is still considered Complete. patch_data['status'] = 'Empty' # otherwise register data and set (provisional) status to Complete else: try: kwargs = dict(max_md5_size=max_md5_size) if location == 'server': # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do). # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1. kwargs['labs'] = get_lab(session_path, one.alyx) registered_dsets = task.register_datasets(**kwargs) patch_data['status'] = 'Complete' except Exception: _logger.error(traceback.format_exc()) status = -1 # overwrite status to errored if status == -1: patch_data['status'] = 'Errored' # Status -2 means a lock was encountered during run, should be rerun elif status == -2: patch_data['status'] = 'Waiting' # Status -3 should be returned if a task is Incomplete elif status == -3: patch_data['status'] = 'Incomplete' # update task status on Alyx t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) # check for dependent held tasks # NB: Assumes dependent tasks are all part of the same session! next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) for d in dependent_tasks: assert d['id'] != t['id'], 'task its own parent' # if all their parent tasks now complete, set to waiting parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] if set(parent_status) <= {'Complete', 'Incomplete'}: one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) task.cleanUp() if mode == 'raise' and status != 0: raise ValueError(task.log) return t, registered_dsets