Source code for ibllib.tests.test_tasks

import shutil
import tempfile
import unittest
from unittest import mock
from pathlib import Path
from collections import OrderedDict
import numpy as np

import ibllib.pipes.tasks
from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
from ibllib.pipes.video_tasks import VideoConvert
from ibllib.io import session_params
from one.api import ONE
from one.webclient import no_cache
from ibllib.tests import TEST_DB

one = ONE(**TEST_DB)
SUBJECT_NAME = 'algernon'
USER_NAME = 'test_user'

ses_dict = {
    'subject': SUBJECT_NAME,
    'start_time': '2018-04-01T12:48:26.795526',
    'number': 1,
    'users': [USER_NAME]
}

desired_statuses = {
    'Task00': 'Complete',
    'Task01_void': 'Empty',
    'Task02_error': 'Errored',
    'Task10': 'Complete',
    'Task11': 'Held',
    'TaskIncomplete': 'Incomplete',
    'TaskGpuLock': 'Waiting'
}

desired_datasets = ['spikes.times.npy', 'spikes.amps.npy', 'spikes.clusters.npy']
desired_versions = {'spikes.times.npy': 'custom_job00',
                    'spikes.amps.npy': ibllib.__version__,
                    'spikes.clusters.npy': ibllib.__version__}
desired_logs = 'Running on machine: testmachine'
desired_logs_rerun = {
    'Task00': 1,
    'Task01_void': 2,
    'Task02_error': 1,
    'Task10': 1,
    'Task11': 1,
    'TaskIncomplete': 1,
    'TaskGpuLock': 2
}


#  job to output a single file (pathlib.Path)
[docs] class Task00(ibllib.pipes.tasks.Task): version = 'custom_job00' def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'spikes.times.npy') out_files.touch() return out_files
# job that outputs nothing
[docs] class Task01_void(ibllib.pipes.tasks.Task): def _run(self, overwrite=False): out_files = None return out_files
# job that raises an error on first run
[docs] class Task02_error(ibllib.pipes.tasks.Task): run_count = 0 def _run(self, overwrite=False): Task02_error.run_count += 1 if Task02_error.run_count == 1: raise Exception('Something dumb happened') out_files = self.session_path.joinpath('alf', 'spikes.templates.npy') out_files.touch() return out_files
# job that outputs a list of files
[docs] class Task10(ibllib.pipes.tasks.Task): level = 1 def _run(self, overwrite=False): out_files = [ self.session_path.joinpath('alf', 'spikes.amps.npy'), self.session_path.joinpath('alf', 'spikes.clusters.npy')] [f.touch() for f in out_files] return out_files
# job to output a single file (pathlib.Path)
[docs] class Task11(ibllib.pipes.tasks.Task): level = 1 def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'spikes.samples.npy') out_files.touch() return out_files
# Job that encounters a GPU lock and is set to Waiting
[docs] class TaskGpuLock(ibllib.pipes.tasks.Task): gpu = 1 # Overwrite setUp to create a lock file before running the task and remove it after
[docs] def setUp(self): self.make_lock_file() self.data_handler = self.get_data_handler() return True
def _run(self, overwrite=False): pass
# Job that encounters a GPU lock and is set to Waiting
[docs] class TaskIncomplete(ibllib.pipes.tasks.Task): def _run(self, overwrite=False): self.status = -3
[docs] class SomePipeline(ibllib.pipes.tasks.Pipeline): def __init__(self, session_path=None, **kwargs): super(SomePipeline, self).__init__(session_path, **kwargs) tasks = OrderedDict() self.session_path = session_path # level 0 tasks['Task00'] = Task00(self.session_path) tasks['Task01_void'] = Task01_void(self.session_path) tasks['Task02_error'] = Task02_error(self.session_path) tasks['TaskGpuLock'] = TaskGpuLock(self.session_path) tasks['TaskIncomplete'] = TaskIncomplete(self.session_path) tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']]) # When both its parents Complete, this task should be set to Waiting and should finally complete tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'], tasks['Task00']]) self.tasks = tasks
[docs] class TestPipelineAlyx(unittest.TestCase):
[docs] def setUp(self) -> None: self.td = tempfile.TemporaryDirectory() # ses = one.alyx.rest('sessions', 'list', subject=ses_dict['subject'], # date_range=[ses_dict['start_time'][:10]] * 2, # number=ses_dict['number'], # no_cache=True) # if len(ses): # one.alyx.rest('sessions', 'delete', ses[0]['url'][-36:]) # randomise number ses_dict['number'] = np.random.randint(1, 30) ses = one.alyx.rest('sessions', 'create', data=ses_dict) session_path = Path(self.td.name).joinpath( ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)) session_path.joinpath('alf').mkdir(exist_ok=True, parents=True) self.session_path = session_path self.eid = ses['url'][-36:]
[docs] def tearDown(self) -> None: self.td.cleanup() one.alyx.rest('sessions', 'delete', id=self.eid)
[docs] @mock.patch('ibllib.pipes.tasks.get_lab') def test_pipeline_alyx(self, mock_ep): mock_ep.return_value = 'cortexlab' eid = self.eid pipeline = SomePipeline(self.session_path, one=one, eid=eid) # prepare by deleting all jobs/tasks related tasks = one.alyx.rest('tasks', 'list', session=eid, no_cache=True) self.assertEqual(len(tasks), 0) # create tasks and jobs from scratch NTASKS = len(pipeline.tasks) pipeline.make_graph(show=False) alyx_tasks = pipeline.create_alyx_tasks() self.assertTrue(len(alyx_tasks) == NTASKS) # get the pending jobs from alyx tasks = one.alyx.rest('tasks', 'list', session=eid, status='Waiting', no_cache=True) self.assertTrue(len(tasks) == NTASKS) # run them and make sure their statuses got updated appropriately with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.session_path).joinpath('.gpu_lock')): task_deck, datasets = pipeline.run(machine='testmachine') check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck) # [(t['name'], t['status'], desired_statuses[t['name']]) for t in task_deck] self.assertTrue(all(check_statuses)) self.assertEqual(set(d['name'] for d in datasets), set(desired_datasets)) # check logs check_logs = (desired_logs in t['log'] if t['log'] else True for t in task_deck) self.assertTrue(all(check_logs)) # also checks that the datasets have been labeled with the proper version dsets = one.alyx.rest('datasets', 'list', session=eid, no_cache=True) check_versions = (desired_versions[d['name']] == d['version'] for d in dsets) self.assertTrue(all(check_versions)) # make sure that re-running the make job by default doesn't change complete jobs pipeline.create_alyx_tasks() task_deck = one.alyx.rest('tasks', 'list', session=eid, no_cache=True) check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck) self.assertTrue(all(check_statuses)) # test the rerun option with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.td.name).joinpath('.gpu_lock')): task_deck, dsets = pipeline.rerun_failed(machine='testmachine') task_02 = next(t for t in task_deck if t['name'] == 'Task02_error') self.assertEqual('Complete', task_02['status']) dep_task = next(x for x in task_deck if task_02['id'] in x['parents']) assert dep_task['name'] == 'Task11' self.assertEqual('Complete', dep_task['status'], 'Failed to set dependent task from "Held" to "Waiting"') # check that logs were correctly overwritten check_logs = (t['log'].count(desired_logs) == 1 if t['log'] else True for t in task_deck) check_rerun = ('===RERUN===' not in t['log'] if t['log'] else True for t in task_deck) self.assertTrue(all(check_logs)) self.assertTrue(all(check_rerun)) # Rerun without clobber and check that logs are not overwritten with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.td.name).joinpath('.gpu_lock')): task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False) check_logs = (t['log'].count(desired_logs) == desired_logs_rerun[t['name']] if t['log'] else t['log'] == desired_logs_rerun[t['name']] for t in task_deck) check_rerun = ('===RERUN===' in t['log'] if desired_logs_rerun[t['name']] == 2 else True for t in task_deck) self.assertTrue(all(check_logs)) self.assertTrue(all(check_rerun))
[docs] class GpuTask(ibllib.pipes.tasks.Task): gpu = 1 def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'gpu.times.npy') out_files.touch() return out_files
[docs] class TestLocks(unittest.TestCase):
[docs] def test_gpu_lock_and_local_data_handler(self) -> None: # Remove any existing locks first with tempfile.TemporaryDirectory() as td: session_path = Path(td).joinpath('algernon', '2021/02/12', '001') session_path.joinpath('alf').mkdir(parents=True) task = GpuTask(session_path, one=None, location='local') # Patch _lock_file_path method to point to different lock file location with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(td).joinpath('.gpu_lock')): self.assertFalse(task.is_locked()) task.run() self.assertEqual(0, task.status) self.assertFalse(task.is_locked()) # then make a lock file and make sure it fails and is still locked afterwards task._make_lock_file() task.run() self.assertEqual(-2, task.status) self.assertTrue(task.is_locked()) # test the time out feature task.time_out_secs = - 1 task._make_lock_file() self.assertFalse(task.is_locked()) task.run() self.assertEqual(0, task.status)
[docs] class TestExperimentDescriptionRegisterRaw(unittest.TestCase):
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) session_path = Path(self.tempdir.name).joinpath( SUBJECT_NAME, ses_dict['start_time'][:10], str(ses_dict['number']).zfill(3)) session_path.mkdir(parents=True) # Check for session on Alyx with no_cache(one.alyx): # If the session exists, ensure sign_off_checklist key not in JSON if eid := one.path2eid(session_path, query_type='remote'): json_field = one.get_details(eid, full=True).get('json') or {} if json_field.pop('sign_off_checklist', False): one.alyx.json_field_remove_key('sessions', eid, key='sign_off_checklist') else: # Create a new session and add cleanup hook ses = one.alyx.rest('sessions', 'create', data=ses_dict) self.addCleanup(one.alyx.rest, 'sessions', 'delete', id=eid) eid = ses['id'] fixture = Path(__file__).parent.joinpath( 'fixtures', 'io', '_ibl_experiment.description.yaml') shutil.copy(fixture, session_path.joinpath('_ibl_experiment.description.yaml')) self.session_path, self.eid = session_path, eid
[docs] def test_experiment_description_registration(self): task = ExperimentDescriptionRegisterRaw(self.session_path, one=one) # Add a custom sign off key task.sign_off_categories['microphone'] = ['foo', 'bar'] task.run() # Check that description file was registered ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True) # Check keys added to JSON expected = {'_widefield': None, '_microphone_foo': None, '_microphone_bar': None, '_neuropixel_raw_probe00': None, '_neuropixel_spike_sorting_probe00': None, '_neuropixel_alignment_probe00': None, '_neuropixel_raw_probe01': None, '_neuropixel_spike_sorting_probe01': None, '_neuropixel_alignment_probe01': None, '_ephysChoiceWorld_01': None, '_passiveChoiceWorld_00': None} self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {})) # Run again without a custom sign off for neuropixels one.alyx.json_field_remove_key('sessions', self.eid, key='sign_off_checklist') task.sign_off_categories.pop('neuropixel') task.run() ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True) expected = {'_widefield': None, '_microphone_foo': None, '_microphone_bar': None, '_neuropixel_probe00': None, '_neuropixel_probe01': None, '_ephysChoiceWorld_01': None, '_passiveChoiceWorld_00': None} self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
[docs] class TestDynamicTask(unittest.TestCase):
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) self.task = VideoConvert(self.tempdir.name, ['left'])
[docs] def test_get_device_collection(self): """Test for DynamicTask.get_device_collection method""" device = 'probe00' collection = self.task.get_device_collection(device, 'raw_ephys_data') self.assertEqual('raw_ephys_data', collection) fixture = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml') assert fixture.exists() self.task.session_params = session_params.read_params(fixture) collection = self.task.get_device_collection(device) self.assertEqual('raw_ephys_data/probe00', collection)
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)