ibllib.pipes.tasks
The abstract Pipeline and Task superclasses and concrete task runner.
Examples
1. Running a task on your local computer. | Download: via ONE. | Upload: N/A.
>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
>>> task.run()
2. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). | Download: all data expected to be present. | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
jobs on Alyx transfer the data.
>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
3. Running a task on the local server that belongs to that subject and forcing redownload of missing data. | Download: via Globus (TODO we should change this to use boto3 as globus is slow). | Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
jobs on Alyx transfer the data.
>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
>>> task.force = True
>>> task.run()
>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
>>> task.cleanUp() # Delete the files that have been downloaded
4. Running a task on the local server that doesn’t belongs to that subject (e.g SWC054 on angelakilab). | Download: via boto3, the AWS file records must exist and be set to exists = True. | Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.
Creates FlatIron filerecords and sets these to True once the globus task has completed.
>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
>>> task.run()
>>> task.register_datasets()
>>> task.cleanUp() # Delete the files that have been downloaded
5. Running a task on SDSC. | Download: via creating symlink to relevant datasets on SDSC. | Upload: via copying files to relevant location on SDSC.
>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
>>> task.run()
>>> response = task.register_datasets()
>>> # Here we just make sure filerecords are all correct
>>> for resp in response:
... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
... if fi is not None:
... if not fi['exists']:
... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
...
... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
... if aws is not None:
... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
...
... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
... if sr is not None:
... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
... # Finally remove symlinks once the task has completed
... task.cleanUp()
Functions
Runs a single Alyx job and registers output datasets. |
|
Convert task name to class. |
Classes
Pipeline class: collection of related and potentially interdependent tasks |
|
- class Task(session_path, parents=None, taskid=None, one=None, machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs)[source]
Bases:
ABC
- log = ''
- cpu = 1
- gpu = 0
- io_charge = 5
- priority = 30
- ram = 4
- outputs = None
- time_elapsed_secs = None
- time_out_secs = 7200
- version = '3.1.0'
- force = False
- job_size = 'small'
- env = None
- on_error = 'continue'
- one = None
- level = 0
- property signature: Dict[str, List]
The signature of the task specifies inputs and outputs for the given task. For some tasks it is dynamic and calculated. The legacy code specifies those as tuples. The preferred way is to use the ExpectedDataset input and output constructors.
I = ExpectedDataset.input O = ExpectedDataset.output signature = {
- ‘input_files’: [
I(name=’extract.me.npy’, collection=’raw_data’, required=True, register=False, unique=False),
], ‘output_files’: [
O(name=’look.atme.npy’, collection=’shiny_data’, required=True, register=True, unique=False)
]} is equivalent to: signature = {
‘input_files’: [(‘extract.me.npy’, ‘raw_data’, True, True)], ‘output_files’: [(‘look.atme.npy’, ‘shiny_data’, True)], }
- Returns:
- property name
- path2eid()[source]
Fetch the experiment UUID from the Task session path, without using the REST cache.
This method ensures that the eid will be returned for newly created sessions.
- Returns:
The experiment UUID corresponding to the session path.
- Return type:
str
- run(**kwargs)[source]
— do not overload, see _run() below— wraps the _run() method with - error management - logging to variable - writing a lock file if the GPU is used - labels the status property of the object. The status value is labeled as:
0: Complete
-1: Errored -2: Didn’t run as a lock was encountered -3: Incomplete
Notes
The run_alyx_task will update the Alyx Task status depending on both status and outputs (i.e. the output of subclassed _run method): Assuming a return value of 0… if Task.outputs is None, the status will be Empty; if Task.outputs is a list (empty or otherwise), the status will be Complete.
- register_datasets(**kwargs)[source]
Register output datasets from the task to Alyx.
- Parameters:
kwargs – Directly passed to the DataHandler.upload_data method.
- Returns:
The output of the DataHandler.upload_data method, e.g. a list of registered datasets.
- Return type:
list
- get_signatures(**kwargs)[source]
This is the default but should be overwritten for each task :return:
- tearDown()[source]
Function after runs() Does not run if a lock is encountered by the task (status -2)
- assert_expected_outputs(raise_error=True)[source]
After a run, asserts that all signature files are present at least once in the output files Mainly useful for integration tests :return:
- assert_expected_inputs(raise_error=True, raise_ambiguous=False)[source]
Check that all the files necessary to run the task have been are present on disk.
- Parameters:
raise_error (bool) – If true, raise FileNotFoundError if required files are missing.
- Returns:
bool – True if all required files were found.
list of pathlib.Path – A list of file paths that exist on disk.
- assert_expected(expected_files, silent=False)[source]
Assert that expected files are present.
- Parameters:
expected_files (list of ExpectedDataset) – A list of expected files in the form (file_pattern_str, collection_str, required_bool).
silent (bool) – If true, log an error if any required files are not found.
- Returns:
bool – True if all required files were found.
list of pathlib.Path – A list of file paths that exist on disk.
- get_data_handler(location=None)[source]
Gets the relevant data handler based on location argument :return:
- class Pipeline(session_path=None, one=None, eid=None)[source]
Bases:
ABC
Pipeline class: collection of related and potentially interdependent tasks
- tasks = {}
- one = None
- create_alyx_tasks(rerun__status__in=None, tasks_list=None)[source]
Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.
If the jobs already exist, they are left untouched. The re-run parameter will re-init the job by emptying the log and set the status to Waiting.
- Parameters:
rerun__status__in (list, str) – To re-run tasks if they already exist, specify one or more statuses strings to will be re-run, or ‘__all__’ to re-run all tasks.
tasks_list (list) – The list of tasks to create on Alyx. If None, uses self.tasks.
- Returns:
List of Alyx task dictionaries (existing and/or created).
- Return type:
list
- create_tasks_list_from_pipeline()[source]
From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to create alyx tasks :return:
- run(status__in=('Waiting',), machine=None, clobber=True, **kwargs)[source]
Get all the session related jobs from alyx and run them
- Parameters:
status__in – lists of status strings to run in
[‘Waiting’, ‘Started’, ‘Errored’, ‘Empty’, ‘Complete’] :param machine: string identifying the machine the task is run on, optional :param clobber: bool, if True any existing logs are overwritten, default is True :param kwargs: arguments passed downstream to run_alyx_task :return: jalyx: list of REST dictionaries of the job endpoints :return: job_deck: list of REST dictionaries of the jobs endpoints :return: all_datasets: list of REST dictionaries of the dataset endpoints
- property name
- str2class(task_executable: str)[source]
Convert task name to class.
- Parameters:
task_executable (str) – A Task class name, e.g. ‘ibllib.pipes.behavior_tasks.TrialRegisterRaw’.
- Returns:
The imported class.
- Return type:
class
- run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, max_md5_size=None, machine=None, clobber=True, location='server', mode='log')[source]
Runs a single Alyx job and registers output datasets.
- Parameters:
tdict (dict) – An Alyx task dictionary to instantiate and run.
session_path (str, pathlib.Path) – A session path containing the task input data.
one (one.api.OneAlyx) – An instance of ONE.
job_deck (list of dict, optional) – A list of all tasks in the same pipeline. If None, queries Alyx to get this.
max_md5_size (int, optional) – An optional maximum file size in bytes. Files with sizes larger than this will not have their MD5 checksum calculated to save time.
machine (str, optional) – A string identifying the machine the task is run on.
clobber (bool, default=True) – If true any existing logs are overwritten on Alyx.
location ({'remote', 'server', 'sdsc', 'aws'}) – Where you are running the task, ‘server’ - local lab server, ‘remote’ - any compute node/ computer, ‘sdsc’ - Flatiron compute node, ‘aws’ - using data from AWS S3 node.
mode ({'log', 'raise}, default='log') – Behaviour to adopt if an error occurred. If ‘raise’, it will raise the error at the very end of this function (i.e. after having labeled the tasks).
- Returns:
dict – The updated task dict.
list of pathlib.Path – A list of registered datasets.