ibllib.pipes.tasks

The abstract Pipeline and Task superclasses and concrete task runner.

Examples

1. Running a task on your local computer. | Download: via ONE. | Upload: N/A.

>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
>>> task.run()

2. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). | Download: all data expected to be present. | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer

jobs on Alyx transfer the data.

>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))

3. Running a task on the local server that belongs to that subject and forcing redownload of missing data. | Download: via Globus (TODO we should change this to use boto3 as globus is slow). | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer

jobs on Alyx transfer the data.

>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.force = True
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
>>> task.cleanUp()  # Delete the files that have been downloaded

4. Running a task on the local server that doesn’t belongs to that subject (e.g SWC054 on angelakilab). | Download: via boto3, the AWS file records must exist and be set to exists = True. | Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.

Creates FlatIron filerecords and sets these to True once the globus task has completed.

>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
>>> task.run()
>>> task.register_datasets()
>>> task.cleanUp()  # Delete the files that have been downloaded

5. Running a task on SDSC. | Download: via creating symlink to relevant datasets on SDSC. | Upload: via copying files to relevant location on SDSC.

>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
>>> task.run()
>>> response = task.register_datasets()
>>> # Here we just make sure filerecords are all correct
>>> for resp in response:
...    fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
...    if fi is not None:
...        if not fi['exists']:
...            one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
...
...     aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
...     if aws is not None:
...         one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
...
...     sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
...     if sr is not None:
...         one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
... # Finally remove symlinks once the task has completed
... task.cleanUp()

Functions

run_alyx_task

Runs a single Alyx job and registers output datasets.

str2class

Convert task name to class.

Classes

Pipeline

Pipeline class: collection of related and potentially interdependent tasks

Task

class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', **kwargs)[source]

Bases: ABC

log = ''
cpu = 1
gpu = 0
io_charge = 5
priority = 30
ram = 4
outputs = None
time_elapsed_secs = None
time_out_secs = 7200
version = '2.38.0'
signature = {'input_files': [], 'output_files': []}
force = False
job_size = 'small'
env = None
one = None
level = 0
property name
path2eid()[source]

Fetch the experiment UUID from the Task session path, without using the REST cache.

This method ensures that the eid will be returned for newly created sessions.

Returns:

The experiment UUID corresponding to the session path.

Return type:

str

run(**kwargs)[source]

— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:

0: Complete

-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete

Notes

  • The run_alyx_task will update the Alyx Task status depending on both status and outputs (i.e. the output of subclassed _run method): Assuming a return value of 0… if Task.outputs is None, the status will be Empty; if Task.outputs is a list (empty or otherwise), the status will be Complete.

register_datasets(**kwargs)[source]

Register output datasets from the task to Alyx.

Parameters:

kwargs – Directly passed to the DataHandler.upload_data method.

Returns:

The output of the DataHandler.upload_data method, e.g. a list of registered datasets.

Return type:

list

register_images(**kwargs)[source]

Registers images to alyx database :return:

rerun()[source]
get_signatures(**kwargs)[source]

This is the default but should be overwritten for each task :return:

setUp(**kwargs)[source]

Get the data handler and ensure all data is available locally to run task.

tearDown()[source]

Function after runs() Does not run if a lock is encountered by the task (status -2)

cleanUp()[source]

Function to optionally overload to clean up :return:

assert_expected_outputs(raise_error=True)[source]

After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:

assert_expected_inputs(raise_error=True)[source]

Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file system already :return:

assert_expected(expected_files, silent=False)[source]
get_data_handler(location=None)[source]

Gets the relevant data handler based on location argument :return:

static make_lock_file(taskname='', time_out_secs=7200)[source]

Creates a GPU lock file with a timeout of

is_locked()[source]

Checks if there is a lock file for this given task

class Pipeline(session_path=None, one=None, eid=None)[source]

Bases: ABC

Pipeline class: collection of related and potentially interdependent tasks

tasks = {}
one = None
make_graph(out_dir=None, show=True)[source]
create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]

Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.

If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.

Parameters:
  • rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.

  • tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.

Returns:

List of Alyx task dictionaries (existing and/or created).

Return type:

list

create_tasks_list_from_pipeline()[source]

From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:

run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]

Get all the session related jobs from alyx and run them

Parameters:

status__in – lists of status strings to run in

[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints

rerun_failed(**kwargs)[source]
rerun(**kwargs)[source]
property name
str2class(task_executable: str)[source]

Convert task name to class.

Parameters:

task_executable (str) – A Task class name, e.g. ‘ibllib.pipes.behavior_tasks.TrialRegisterRaw’.

Returns:

The imported class.

Return type:

class

run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]

Runs a single Alyx job and registers output datasets.

Parameters:
  • tdict (dict) – An Alyx task dictionary to instantiate and run.

  • session_path (str, pathlib.Path) – A session path containing the task input data.

  • one (one.api.OneAlyx) – An instance of ONE.

  • job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.

  • max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.

  • machine (str, optional) – A string identifying the machine the task is run on.

  • clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.

  • location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.

  • mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).

Returns:

  • dict – The updated task dict.

  • list of pathlib.Path – A list of registered datasets.