ibllib.pipes.tasks

The abstract Pipeline and Task superclasses and concrete task runner.

Functions

run_alyx_task

Runs a single Alyx job and registers output datasets.

Classes

Pipeline

Pipeline class: collection of related and potentially interdependent tasks

Task

class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', **kwargs)[source]

Bases: ABC

log = ''
cpu = 1
gpu = 0
io_charge = 5
priority = 30
ram = 4
outputs = None
time_elapsed_secs = None
time_out_secs = 7200
version = '2.27'
signature = {'input_files': [], 'output_files': []}
force = False
job_size = 'small'
one = None
level = 0
property name
path2eid()[source]

Fetch the experiment UUID from the Task session path, without using the REST cache.

This method ensures that the eid will be returned for newly created sessions.

Returns:

The experiment UUID corresponding to the session path.

Return type:

str

run(**kwargs)[source]

— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:

0: Complete

-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete

register_datasets(one=None, **kwargs)[source]

Register output datasets form the task to Alyx

Parameters:
  • one

  • jobid

  • kwargs – directly passed to the register_dataset function

Returns:

register_images(**kwargs)[source]

Registers images to alyx database :return:

rerun()[source]
get_signatures(**kwargs)[source]

This is the default but should be overwritten for each task :return:

setUp(**kwargs)[source]

Setup method to get the data handler and ensure all data is available locally to run task

Parameters:

kwargs

Returns:

tearDown()[source]

Function after runs() Does not run if a lock is encountered by the task (status -2)

cleanUp()[source]

Function to optionally overload to clean up :return:

assert_expected_outputs(raise_error=True)[source]

After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:

assert_expected_inputs(raise_error=True)[source]

Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file system already :return:

assert_expected(expected_files, silent=False)[source]
get_data_handler(location=None)[source]

Gets the relevant data handler based on location argument :return:

static make_lock_file(taskname='', time_out_secs=7200)[source]

Creates a GPU lock file with a timeout of

is_locked()[source]

Checks if there is a lock file for this given task

class Pipeline(session_path=None, one=None, eid=None)[source]

Bases: ABC

Pipeline class: collection of related and potentially interdependent tasks

tasks = {}
one = None
make_graph(out_dir=None, show=True)[source]
create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]

Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.

Parameters:
  • rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.

  • tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.

Returns:

List of Alyx task dictionaries (existing and/or created).

Return type:

list

create_tasks_list_from_pipeline()[source]

From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:

run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]

Get all the session related jobs from alyx and run them

Parameters:

status__in – lists of status strings to run in

[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints

rerun_failed(**kwargs)[source]
rerun(**kwargs)[source]
property name
run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]

Runs a single Alyx job and registers output datasets.

Parameters:
  • tdict (dict) – An Alyx task dictionary to instantiate and run.

  • session_path (str, pathlib.Path) – A session path containing the task input data.

  • one (one.api.OneAlyx) – An instance of ONE.

  • job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.

  • max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.

  • machine (str, optional) – A string identifying the machine the task is run on.

  • clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.

  • location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.

  • mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).

Returns:

  • Task – The instantiated task object that was run.

  • list of pathlib.Path – A list of registered datasets.