ibllib.pipes.tasks
The abstract Pipeline and Task superclasses and concrete task runner.
Functions
Runs a single Alyx job and registers output datasets. |
Classes
Pipeline class: collection of related and potentially interdependent tasks |
|
- class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', **kwargs)[source]
Bases:
ABC
- log = ''
- cpu = 1
- gpu = 0
- io_charge = 5
- priority = 30
- ram = 4
- outputs = None
- time_elapsed_secs = None
- time_out_secs = 7200
- version = '2.27'
- signature = {'input_files': [], 'output_files': []}
- force = False
- job_size = 'small'
- one = None
- level = 0
- property name
- path2eid()[source]
Fetch the experiment UUID from the Task session path, without using the REST cache.
This method ensures that the eid will be returned for newly created sessions.
- Returns:
The experiment UUID corresponding to the session path.
- Return type:
str
- run(**kwargs)[source]
— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:
0: Complete
-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete
- register_datasets(one=None, **kwargs)[source]
Register output datasets form the task to Alyx
- Parameters:
one –
jobid –
kwargs – directly passed to the register_dataset function
- Returns:
- get_signatures(**kwargs)[source]
This is the default but should be overwritten for each task :return:
- setUp(**kwargs)[source]
Setup method to get the data handler and ensure all data is available locally to run task
- Parameters:
kwargs –
- Returns:
- tearDown()[source]
Function after runs() Does not run if a lock is encountered by the task (status -2)
- assert_expected_outputs(raise_error=True)[source]
After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:
- assert_expected_inputs(raise_error=True)[source]
Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file system already :return:
- get_data_handler(location=None)[source]
Gets the relevant data handler based on location argument :return:
- class Pipeline(session_path=None, one=None, eid=None)[source]
Bases:
ABC
Pipeline class: collection of related and potentially interdependent tasks
- tasks = {}
- one = None
- create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]
Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.
- Parameters:
rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.
tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.
- Returns:
List of Alyx task dictionaries (existing and/or created).
- Return type:
list
- create_tasks_list_from_pipeline()[source]
From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:
- run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]
Get all the session related jobs from alyx and run them
- Parameters:
status__in – lists of status strings to run in
[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints
- property name
- run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]
Runs a single Alyx job and registers output datasets.
- Parameters:
tdict (dict) – An Alyx task dictionary to instantiate and run.
session_path (str, pathlib.Path) – A session path containing the task input data.
one (one.api.OneAlyx) – An instance of ONE.
job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.
max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.
machine (str, optional) – A string identifying the machine the task is run on.
clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.
location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.
mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).
- Returns:
Task – The instantiated task object that was run.
list of pathlib.Path – A list of registered datasets.