ibllib.pipes.tasks

The abstract Pipeline and Task superclasses and concrete task runner.

Examples

1. Running a task on your local computer. | Download: via ONE. | Upload: N/A.

>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
>>> task.run()

2. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). | Download: all data expected to be present. | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer

jobs on Alyx transfer the data.

>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))

3. Running a task on the local server that belongs to that subject and forcing redownload of missing data. | Download: via Globus (TODO we should change this to use boto3 as globus is slow). | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer

jobs on Alyx transfer the data.

>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.force = True
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
>>> task.cleanUp()  # Delete the files that have been downloaded

4. Running a task on the local server that doesn’t belongs to that subject (e.g SWC054 on angelakilab). | Download: via boto3, the AWS file records must exist and be set to exists = True. | Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.

Creates FlatIron filerecords and sets these to True once the globus task has completed.

>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
>>> task.run()
>>> task.register_datasets()
>>> task.cleanUp()  # Delete the files that have been downloaded

5. Running a task on SDSC. | Download: via creating symlink to relevant datasets on SDSC. | Upload: via copying files to relevant location on SDSC.

>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
>>> task.run()
>>> response = task.register_datasets()
>>> # Here we just make sure filerecords are all correct
>>> for resp in response:
...    fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
...    if fi is not None:
...        if not fi['exists']:
...            one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
...
...     aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
...     if aws is not None:
...         one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
...
...     sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
...     if sr is not None:
...         one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
... # Finally remove symlinks once the task has completed
... task.cleanUp()

Functions

run_alyx_task

Runs a single Alyx job and registers output datasets.

str2class

Convert task name to class.

Classes

Pipeline

Pipeline class: collection of related and potentially interdependent tasks

Task

class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs)[source]

Bases: ABC

log = ''
cpu = 1
gpu = 0
io_charge = 5
priority = 30
ram = 4
outputs = None
time_elapsed_secs = None
time_out_secs = 7200
version = '3.1.0'
force = False
job_size = 'small'
env = None
on_error = 'continue'
one = None
level = 0
property signature: Dict[str, List]

The signature of the task specifies inputs and outputs for the given task. For some tasks it is dynamic and calculated. The legacy code specifies those as tuples. The preferred way is to use the ExpectedDataset input and output constructors.

I = ExpectedDataset.input O = ExpectedDataset.output signature = {

‘input_files’: [

I(name=’extract.me.npy’, collection=’raw_data’, required=True, register=False, unique=False),

], ‘output_files’: [

O(name=’look.atme.npy’, collection=’shiny_data’, required=True, register=True, unique=False)

]} is equivalent to: signature = {

‘input_files’: [(‘extract.me.npy’, ‘raw_data’, True, True)], ‘output_files’: [(‘look.atme.npy’, ‘shiny_data’, True)], }

Returns:

property name
path2eid()[source]

Fetch the experiment UUID from the Task session path, without using the REST cache.

This method ensures that the eid will be returned for newly created sessions.

Returns:

The experiment UUID corresponding to the session path.

Return type:

str

run(**kwargs)[source]

— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:

0: Complete

-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete

Notes

  • The run_alyx_task will update the Alyx Task status depending on both status and outputs (i.e. the output of subclassed _run method): Assuming a return value of 0… if Task.outputs is None, the status will be Empty; if Task.outputs is a list (empty or otherwise), the status will be Complete.

register_datasets(**kwargs)[source]

Register output datasets from the task to Alyx.

Parameters:

kwargs – Directly passed to the DataHandler.upload_data method.

Returns:

The output of the DataHandler.upload_data method, e.g. a list of registered datasets.

Return type:

list

register_images(**kwargs)[source]

Registers images to alyx database :return:

rerun()[source]
get_signatures(**kwargs)[source]

This is the default but should be overwritten for each task :return:

setUp(**kwargs)[source]

Get the data handler and ensure all data is available locally to run task.

tearDown()[source]

Function after runs() Does not run if a lock is encountered by the task (status -2)

cleanUp()[source]

Function to optionally overload to clean up :return:

assert_expected_outputs(raise_error=True)[source]

After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:

assert_expected_inputs(raise_error=True, raise_ambiguous=False)[source]

Check that all the files necessary to run the task have been are present on disk.

Parameters:

raise_error (bool) – If true, raise FileNotFoundError if required files are missing.

Returns:

  • bool – True if all required files were found.

  • list of pathlib.Path – A list of file paths that exist on disk.

assert_expected(expected_files, silent=False)[source]

Assert that expected files are present.

Parameters:
  • expected_files (list of ExpectedDataset) – A list of expected files in the form (file_pattern_str, collection_str, required_bool).

  • silent (bool) – If true, log an error if any required files are not found.

Returns:

  • bool – True if all required files were found.

  • list of pathlib.Path – A list of file paths that exist on disk.

get_data_handler(location=None)[source]

Gets the relevant data handler based on location argument :return:

static make_lock_file(taskname='', time_out_secs=7200)[source]

Creates a GPU lock file with a timeout of

is_locked()[source]

Checks if there is a lock file for this given task

class Pipeline(session_path=None, one=None, eid=None)[source]

Bases: ABC

Pipeline class: collection of related and potentially interdependent tasks

tasks = {}
one = None
make_graph(out_dir=None, show=True)[source]
create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]

Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.

If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.

Parameters:
  • rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.

  • tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.

Returns:

List of Alyx task dictionaries (existing and/or created).

Return type:

list

create_tasks_list_from_pipeline()[source]

From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:

run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]

Get all the session related jobs from alyx and run them

Parameters:

status__in – lists of status strings to run in

[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints

rerun_failed(**kwargs)[source]
rerun(**kwargs)[source]
property name
str2class(task_executable: str)[source]

Convert task name to class.

Parameters:

task_executable (str) – A Task class name, e.g. ‘ibllib.pipes.behavior_tasks.TrialRegisterRaw’.

Returns:

The imported class.

Return type:

class

run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]

Runs a single Alyx job and registers output datasets.

Parameters:
  • tdict (dict) – An Alyx task dictionary to instantiate and run.

  • session_path (str, pathlib.Path) – A session path containing the task input data.

  • one (one.api.OneAlyx) – An instance of ONE.

  • job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.

  • max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.

  • machine (str, optional) – A string identifying the machine the task is run on.

  • clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.

  • location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.

  • mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).

Returns:

  • dict – The updated task dict.

  • list of pathlib.Path – A list of registered datasets.