ibllib.pipes.tasks
The abstract Pipeline and Task superclasses and concrete task runner.
Examples
1. Running a task on your local computer. | Download: via ONE. | Upload: N/A.
>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
>>> task.run()
2. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). | Download: all data expected to be present. | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
jobs on Alyx transfer the data.
>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
3. Running a task on the local server that belongs to that subject and forcing redownload of missing data. | Download: via Globus (TODO we should change this to use boto3 as globus is slow). | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
jobs on Alyx transfer the data.
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.force = True
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
>>> task.cleanUp() # Delete the files that have been downloaded
4. Running a task on the local server that doesn’t belongs to that subject (e.g SWC054 on angelakilab). | Download: via boto3, the AWS file records must exist and be set to exists = True. | Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.
Creates FlatIron filerecords and sets these to True once the globus task has completed.
>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
>>> task.run()
>>> task.register_datasets()
>>> task.cleanUp() # Delete the files that have been downloaded
5. Running a task on SDSC. | Download: via creating symlink to relevant datasets on SDSC. | Upload: via copying files to relevant location on SDSC.
>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
>>> task.run()
>>> response = task.register_datasets()
>>> # Here we just make sure filerecords are all correct
>>> for resp in response:
... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
... if fi is not None:
... if not fi['exists']:
... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
...
... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
... if aws is not None:
... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
...
... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
... if sr is not None:
... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
... # Finally remove symlinks once the task has completed
... task.cleanUp()
Functions
Runs a single Alyx job and registers output datasets. |
|
Convert task name to class. |
Classes
Pipeline class: collection of related and potentially interdependent tasks |
|
- class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', **kwargs)[source]
Bases:
ABC
- log = ''
- cpu = 1
- gpu = 0
- io_charge = 5
- priority = 30
- ram = 4
- outputs = None
- time_elapsed_secs = None
- time_out_secs = 7200
- version = '2.38.0'
- signature = {'input_files': [], 'output_files': []}
- force = False
- job_size = 'small'
- env = None
- one = None
- level = 0
- property name
- path2eid()[source]
Fetch the experiment UUID from the Task session path, without using the REST cache.
This method ensures that the eid will be returned for newly created sessions.
- Returns:
The experiment UUID corresponding to the session path.
- Return type:
str
- run(**kwargs)[source]
— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:
0: Complete
-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete
Notes
The run_alyx_task will update the Alyx Task status depending on both status and outputs (i.e. the output of subclassed _run method): Assuming a return value of 0… if Task.outputs is None, the status will be Empty; if Task.outputs is a list (empty or otherwise), the status will be Complete.
- register_datasets(**kwargs)[source]
Register output datasets from the task to Alyx.
- Parameters:
kwargs – Directly passed to the DataHandler.upload_data method.
- Returns:
The output of the DataHandler.upload_data method, e.g. a list of registered datasets.
- Return type:
list
- get_signatures(**kwargs)[source]
This is the default but should be overwritten for each task :return:
- tearDown()[source]
Function after runs() Does not run if a lock is encountered by the task (status -2)
- assert_expected_outputs(raise_error=True)[source]
After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:
- assert_expected_inputs(raise_error=True)[source]
Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file system already :return:
- get_data_handler(location=None)[source]
Gets the relevant data handler based on location argument :return:
- class Pipeline(session_path=None, one=None, eid=None)[source]
Bases:
ABC
Pipeline class: collection of related and potentially interdependent tasks
- tasks = {}
- one = None
- create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]
Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.
If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.
- Parameters:
rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.
tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.
- Returns:
List of Alyx task dictionaries (existing and/or created).
- Return type:
list
- create_tasks_list_from_pipeline()[source]
From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:
- run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]
Get all the session related jobs from alyx and run them
- Parameters:
status__in – lists of status strings to run in
[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints
- property name
- str2class(task_executable: str)[source]
Convert task name to class.
- Parameters:
task_executable (str) – A Task class name, e.g. ‘ibllib.pipes.behavior_tasks.TrialRegisterRaw’.
- Returns:
The imported class.
- Return type:
class
- run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]
Runs a single Alyx job and registers output datasets.
- Parameters:
tdict (dict) – An Alyx task dictionary to instantiate and run.
session_path (str, pathlib.Path) – A session path containing the task input data.
one (one.api.OneAlyx) – An instance of ONE.
job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.
max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.
machine (str, optional) – A string identifying the machine the task is run on.
clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.
location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.
mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).
- Returns:
dict – The updated task dict.
list of pathlib.Path – A list of registered datasets.