Source code for ibllib.tests.test_tasks

"""Test ibllib.pipes.tasks module and Task class."""
import sys
import re
import shutil
import tempfile
import unittest
from unittest import mock
from pathlib import Path
from collections import OrderedDict
import numpy as np

import ibllib.pipes.tasks
from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
from ibllib.pipes.video_tasks import VideoConvert
from ibllib.io import session_params
from ibllib.oneibl.data_handlers import ExpectedDataset
from one.api import ONE
from one.webclient import no_cache
from ibllib.tests import TEST_DB

one = ONE(**TEST_DB)
SUBJECT_NAME = 'algernon'
USER_NAME = 'test_user'

ses_dict = {
    'subject': SUBJECT_NAME,
    'start_time': '2018-04-01T12:48:26.795526',
    'number': 1,
    'users': [USER_NAME]
}

desired_statuses = {
    'Task00': 'Complete',
    'Task01_void': 'Empty',
    'Task02_error': 'Errored',
    'Task10': 'Complete',
    'Task11': 'Held',
    'TaskIncomplete': 'Incomplete',
    'TaskGpuLock': 'Waiting'
}

desired_datasets = ['spikes.times.npy', 'spikes.amps.npy', 'spikes.clusters.npy']
desired_versions = {'spikes.times.npy': 'custom_job00',
                    'spikes.amps.npy': ibllib.__version__,
                    'spikes.clusters.npy': ibllib.__version__}
desired_logs = 'Running on machine: testmachine'
desired_logs_rerun = {
    'Task00': 1,
    'Task01_void': 2,
    'Task02_error': 1,
    'Task10': 1,
    'Task11': 1,
    'TaskIncomplete': 1,
    'TaskGpuLock': 2
}


#  job to output a single file (pathlib.Path)
[docs] class Task00(ibllib.pipes.tasks.Task): version = 'custom_job00' def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'spikes.times.npy') out_files.touch() return out_files
# job that outputs nothing
[docs] class Task01_void(ibllib.pipes.tasks.Task): def _run(self, overwrite=False): out_files = None return out_files
# job that raises an error on first run
[docs] class Task02_error(ibllib.pipes.tasks.Task): run_count = 0 def _run(self, overwrite=False): Task02_error.run_count += 1 if Task02_error.run_count == 1: raise Exception('Something dumb happened') out_files = self.session_path.joinpath('alf', 'spikes.templates.npy') out_files.touch() return out_files
# job that outputs a list of files
[docs] class Task10(ibllib.pipes.tasks.Task): level = 1 def _run(self, overwrite=False): out_files = [ self.session_path.joinpath('alf', 'spikes.amps.npy'), self.session_path.joinpath('alf', 'spikes.clusters.npy')] [f.touch() for f in out_files] return out_files
# job to output a single file (pathlib.Path)
[docs] class Task11(ibllib.pipes.tasks.Task): level = 1 def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'spikes.samples.npy') out_files.touch() return out_files
# Job that encounters a GPU lock and is set to Waiting
[docs] class TaskGpuLock(ibllib.pipes.tasks.Task): gpu = 1 # Overwrite setUp to create a lock file before running the task and remove it after
[docs] def setUp(self): self.make_lock_file() self.data_handler = self.get_data_handler() self.input_files = [] return True
def _run(self, overwrite=False): pass
# Job that encounters a GPU lock and is set to Waiting
[docs] class TaskIncomplete(ibllib.pipes.tasks.Task): def _run(self, overwrite=False): self.status = -3
[docs] class SomePipeline(ibllib.pipes.tasks.Pipeline): def __init__(self, session_path=None, **kwargs): super(SomePipeline, self).__init__(session_path, **kwargs) tasks = OrderedDict() self.session_path = session_path # level 0 tasks['Task00'] = Task00(self.session_path) tasks['Task01_void'] = Task01_void(self.session_path) tasks['Task02_error'] = Task02_error(self.session_path) tasks['TaskGpuLock'] = TaskGpuLock(self.session_path) tasks['TaskIncomplete'] = TaskIncomplete(self.session_path) tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']]) # When both its parents Complete, this task should be set to Waiting and should finally complete tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'], tasks['Task00']]) self.tasks = tasks
[docs] class TestPipelineAlyx(unittest.TestCase):
[docs] def setUp(self) -> None: self.td = tempfile.TemporaryDirectory() self.ses_dict = ses_dict.copy() self.ses_dict['number'] = np.random.randint(1, 999) ses = one.alyx.rest('sessions', 'create', data=self.ses_dict) session_path = Path(self.td.name).joinpath( ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)) session_path.joinpath('alf').mkdir(exist_ok=True, parents=True) self.session_path = session_path self.eid = ses['url'][-36:]
[docs] def tearDown(self) -> None: self.td.cleanup() one.alyx.rest('sessions', 'delete', id=self.eid)
[docs] @mock.patch('ibllib.pipes.tasks.get_lab') def test_pipeline_alyx(self, mock_ep): mock_ep.return_value = 'cortexlab' eid = self.eid pipeline = SomePipeline(self.session_path, one=one, eid=eid) # prepare by deleting all jobs/tasks related tasks = one.alyx.rest('tasks', 'list', session=eid, no_cache=True) self.assertEqual(len(tasks), 0) # create tasks and jobs from scratch NTASKS = len(pipeline.tasks) pipeline.make_graph(show=False) alyx_tasks = pipeline.create_alyx_tasks() self.assertTrue(len(alyx_tasks) == NTASKS) # get the pending jobs from alyx tasks = one.alyx.rest('tasks', 'list', session=eid, status='Waiting', no_cache=True) self.assertTrue(len(tasks) == NTASKS) # run them and make sure their statuses got updated appropriately with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.session_path).joinpath('.gpu_lock')): task_deck, datasets = pipeline.run(machine='testmachine') self.assertCountEqual(desired_statuses.items(), [(t['name'], t['status']) for t in task_deck]) self.assertEqual(set(d['name'] for d in datasets), set(desired_datasets)) # check logs check_logs = (desired_logs in t['log'] if t['log'] else True for t in task_deck) self.assertTrue(all(check_logs)) # also checks that the datasets have been labeled with the proper version dsets = one.alyx.rest('datasets', 'list', session=eid, no_cache=True) check_versions = (desired_versions[d['name']] == d['version'] for d in dsets) self.assertTrue(all(check_versions)) # make sure that re-running the make job by default doesn't change complete jobs pipeline.create_alyx_tasks() task_deck = one.alyx.rest('tasks', 'list', session=eid, no_cache=True) check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck) self.assertTrue(all(check_statuses)) # test the rerun option with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.td.name).joinpath('.gpu_lock')): task_deck, dsets = pipeline.rerun_failed(machine='testmachine') task_02 = next(t for t in task_deck if t['name'] == 'Task02_error') self.assertEqual('Complete', task_02['status']) dep_task = next(x for x in task_deck if task_02['id'] in x['parents']) assert dep_task['name'] == 'Task11' self.assertEqual('Complete', dep_task['status'], 'Failed to set dependent task from "Held" to "Waiting"') # check that logs were correctly overwritten check_logs = (t['log'].count(desired_logs) == 1 if t['log'] else True for t in task_deck) check_rerun = ('===RERUN===' not in t['log'] if t['log'] else True for t in task_deck) self.assertTrue(all(check_logs)) self.assertTrue(all(check_rerun)) # Rerun without clobber and check that logs are not overwritten with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(self.td.name).joinpath('.gpu_lock')): task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False) check_logs = (t['log'].count(desired_logs) == desired_logs_rerun[t['name']] if t['log'] else t['log'] == desired_logs_rerun[t['name']] for t in task_deck) check_rerun = ('===RERUN===' in t['log'] if desired_logs_rerun[t['name']] == 2 else True for t in task_deck) self.assertTrue(all(check_logs)) self.assertTrue(all(check_rerun))
[docs] class GpuTask(ibllib.pipes.tasks.Task): gpu = 1 def _run(self, overwrite=False): out_files = self.session_path.joinpath('alf', 'gpu.times.npy') out_files.touch() return out_files
[docs] class TestLocks(unittest.TestCase):
[docs] def test_gpu_lock_and_local_data_handler(self) -> None: # Remove any existing locks first with tempfile.TemporaryDirectory() as td: session_path = Path(td).joinpath('algernon', '2021/02/12', '001') session_path.joinpath('alf').mkdir(parents=True) task = GpuTask(session_path, one=None, location='local') # Patch _lock_file_path method to point to different lock file location with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path', return_value=Path(td).joinpath('.gpu_lock')): self.assertFalse(task.is_locked()) task.run() self.assertEqual(0, task.status) self.assertFalse(task.is_locked()) # then make a lock file and make sure it fails and is still locked afterwards task._make_lock_file() task.run() self.assertEqual(-2, task.status) self.assertTrue(task.is_locked()) # test the time out feature task.time_out_secs = - 1 task._make_lock_file() self.assertFalse(task.is_locked()) task.run() self.assertEqual(0, task.status)
[docs] class TestExperimentDescriptionRegisterRaw(unittest.TestCase):
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) session_path = Path(self.tempdir.name).joinpath( SUBJECT_NAME, ses_dict['start_time'][:10], str(ses_dict['number']).zfill(3)) session_path.mkdir(parents=True) # Check for session on Alyx with no_cache(one.alyx): # If the session exists, ensure sign_off_checklist key not in JSON if eid := one.path2eid(session_path, query_type='remote'): json_field = one.get_details(eid, full=True).get('json') or {} if json_field.pop('sign_off_checklist', False): one.alyx.json_field_remove_key('sessions', eid, key='sign_off_checklist') else: # Create a new session and add cleanup hook ses = one.alyx.rest('sessions', 'create', data=ses_dict) self.addCleanup(one.alyx.rest, 'sessions', 'delete', id=eid) eid = ses['id'] fixture = Path(__file__).parent.joinpath( 'fixtures', 'io', '_ibl_experiment.description.yaml') shutil.copy(fixture, session_path.joinpath('_ibl_experiment.description.yaml')) self.session_path, self.eid = session_path, eid
[docs] def test_experiment_description_registration(self): task = ExperimentDescriptionRegisterRaw(self.session_path, one=one) # Add a custom sign off key task.sign_off_categories['microphone'] = ['foo', 'bar'] task.run() # Check that description file was registered ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True) # Check keys added to JSON expected = {'_widefield': None, '_microphone_foo': None, '_microphone_bar': None, '_neuropixel_raw_probe00': None, '_neuropixel_spike_sorting_probe00': None, '_neuropixel_alignment_probe00': None, '_neuropixel_raw_probe01': None, '_neuropixel_spike_sorting_probe01': None, '_neuropixel_alignment_probe01': None, '_ephysChoiceWorld_01': None, '_passiveChoiceWorld_00': None} self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {})) # Run again without a custom sign off for neuropixels one.alyx.json_field_remove_key('sessions', self.eid, key='sign_off_checklist') task.sign_off_categories.pop('neuropixel') task.run() ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True) expected = {'_widefield': None, '_microphone_foo': None, '_microphone_bar': None, '_neuropixel_probe00': None, '_neuropixel_probe01': None, '_ephysChoiceWorld_01': None, '_passiveChoiceWorld_00': None} self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
[docs] class TestDynamicTask(unittest.TestCase):
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) self.task = VideoConvert(self.tempdir.name, ['left'])
[docs] def test_get_device_collection(self): """Test for DynamicTask.get_device_collection method""" device = 'probe00' collection = self.task.get_device_collection(device, 'raw_ephys_data') self.assertEqual('raw_ephys_data', collection) fixture = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml') assert fixture.exists() self.task.session_params = session_params.read_params(fixture) collection = self.task.get_device_collection(device) self.assertEqual('raw_ephys_data/probe00', collection)
[docs] class TestTask(unittest.TestCase):
[docs] def setUp(self): tmpdir = tempfile.TemporaryDirectory() self.addCleanup(tmpdir.cleanup) self.tmpdir = Path(tmpdir.name) self.session_path = self.tmpdir.joinpath('subject', '1900-01-01', '001') self.session_path.mkdir(parents=True)
[docs] def test_input_files_to_register(self): """Test for Task._input_files_to_register method.""" task = Task00(self.session_path) self.assertRaises(RuntimeError, task._input_files_to_register) I = ExpectedDataset.input # noqa task.input_files = [I('register.optional.ext', 'alf', False, True), I('register.optional_foo.ext', 'alf', False, True), I('register.required.ext', 'alf', True, True), I('ignore.required.ext', 'alf', True), I('ignore.optional.ext', 'alf', False)] self.session_path.joinpath('alf').mkdir() for f in task.input_files: self.session_path.joinpath(f.glob_pattern).touch() files = task._input_files_to_register(assert_all_exist=True) expected = [self.session_path.joinpath('alf', 'register.required.ext'), self.session_path.joinpath('alf', 'register.optional.ext'), self.session_path.joinpath('alf', 'register.optional_foo.ext')] self.assertCountEqual(files, expected) expected[2].unlink() # assertNoLogs added in py 3.10 if sys.version_info.minor >= 10: # py3.10 with self.assertNoLogs(ibllib.pipes.tasks.__name__, level='ERROR'): files = task._input_files_to_register(assert_all_exist=True) self.assertCountEqual(files, expected[:2]) expected[0].unlink() with self.assertLogs(ibllib.pipes.tasks.__name__, level='ERROR'): files = task._input_files_to_register(assert_all_exist=False) self.assertEqual(files, expected[1:2]) self.assertRaises(AssertionError, task._input_files_to_register, assert_all_exist=True) # Test with wildcards task.input_files = [I('foo.bar.*', 'alf', True, True), I('bar.baz.npy', 'alf', True, True) | I('baz.foo.npy', 'alf', True, True)] with self.assertLogs(ibllib.pipes.tasks.__name__, level='ERROR') as cm: self.assertFalse(task._input_files_to_register(assert_all_exist=False)) for f in ('alf/foo.bar.*', 'alf/bar.baz.npy', 'alf/baz.foo.npy'): self.assertRegex(cm.output[-1], re.escape(f))
[docs] class TestMisc(unittest.TestCase): """Tests for misc functions in ibllib.pipes.tasks module."""
[docs] def test_str2class(self): """Test ibllib.pipes.tasks.str2class function.""" task_str = 'ibllib.pipes.base_tasks.ExperimentDescriptionRegisterRaw' self.assertIs(ibllib.pipes.tasks.str2class(task_str), ExperimentDescriptionRegisterRaw) self.assertRaises(AttributeError, ibllib.pipes.tasks.str2class, 'ibllib.pipes.base_tasks.Foo')
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)