"""Test ibllib.pipes.tasks module and Task class."""
import sys
import re
import shutil
import tempfile
import unittest
from unittest import mock
from pathlib import Path
from collections import OrderedDict
import numpy as np
import ibllib.pipes.tasks
from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
from ibllib.pipes.video_tasks import VideoConvert
from ibllib.io import session_params
from ibllib.oneibl.data_handlers import ExpectedDataset
from one.api import ONE
from one.webclient import no_cache
from ibllib.tests import TEST_DB
one = ONE(**TEST_DB)
SUBJECT_NAME = 'algernon'
USER_NAME = 'test_user'
ses_dict = {
'subject': SUBJECT_NAME,
'start_time': '2018-04-01T12:48:26.795526',
'number': 1,
'users': [USER_NAME]
}
desired_statuses = {
'Task00': 'Complete',
'Task01_void': 'Empty',
'Task02_error': 'Errored',
'Task10': 'Complete',
'Task11': 'Held',
'TaskIncomplete': 'Incomplete',
'TaskGpuLock': 'Waiting'
}
desired_datasets = ['spikes.times.npy', 'spikes.amps.npy', 'spikes.clusters.npy']
desired_versions = {'spikes.times.npy': 'custom_job00',
'spikes.amps.npy': ibllib.__version__,
'spikes.clusters.npy': ibllib.__version__}
desired_logs = 'Running on machine: testmachine'
desired_logs_rerun = {
'Task00': 1,
'Task01_void': 2,
'Task02_error': 1,
'Task10': 1,
'Task11': 1,
'TaskIncomplete': 1,
'TaskGpuLock': 2
}
# job to output a single file (pathlib.Path)
[docs]
class Task00(ibllib.pipes.tasks.Task):
version = 'custom_job00'
def _run(self, overwrite=False):
out_files = self.session_path.joinpath('alf', 'spikes.times.npy')
out_files.touch()
return out_files
# job that outputs nothing
[docs]
class Task01_void(ibllib.pipes.tasks.Task):
def _run(self, overwrite=False):
out_files = None
return out_files
# job that raises an error on first run
[docs]
class Task02_error(ibllib.pipes.tasks.Task):
run_count = 0
def _run(self, overwrite=False):
Task02_error.run_count += 1
if Task02_error.run_count == 1:
raise Exception('Something dumb happened')
out_files = self.session_path.joinpath('alf', 'spikes.templates.npy')
out_files.touch()
return out_files
# job that outputs a list of files
[docs]
class Task10(ibllib.pipes.tasks.Task):
level = 1
def _run(self, overwrite=False):
out_files = [
self.session_path.joinpath('alf', 'spikes.amps.npy'),
self.session_path.joinpath('alf', 'spikes.clusters.npy')]
[f.touch() for f in out_files]
return out_files
# job to output a single file (pathlib.Path)
[docs]
class Task11(ibllib.pipes.tasks.Task):
level = 1
def _run(self, overwrite=False):
out_files = self.session_path.joinpath('alf', 'spikes.samples.npy')
out_files.touch()
return out_files
# Job that encounters a GPU lock and is set to Waiting
[docs]
class TaskGpuLock(ibllib.pipes.tasks.Task):
gpu = 1
# Overwrite setUp to create a lock file before running the task and remove it after
[docs]
def setUp(self):
self.make_lock_file()
self.data_handler = self.get_data_handler()
self.input_files = []
return True
def _run(self, overwrite=False):
pass
# Job that encounters a GPU lock and is set to Waiting
[docs]
class TaskIncomplete(ibllib.pipes.tasks.Task):
def _run(self, overwrite=False):
self.status = -3
[docs]
class SomePipeline(ibllib.pipes.tasks.Pipeline):
def __init__(self, session_path=None, **kwargs):
super(SomePipeline, self).__init__(session_path, **kwargs)
tasks = OrderedDict()
self.session_path = session_path
# level 0
tasks['Task00'] = Task00(self.session_path)
tasks['Task01_void'] = Task01_void(self.session_path)
tasks['Task02_error'] = Task02_error(self.session_path)
tasks['TaskGpuLock'] = TaskGpuLock(self.session_path)
tasks['TaskIncomplete'] = TaskIncomplete(self.session_path)
tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']])
# When both its parents Complete, this task should be set to Waiting and should finally complete
tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'],
tasks['Task00']])
self.tasks = tasks
[docs]
class TestPipelineAlyx(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.td = tempfile.TemporaryDirectory()
self.ses_dict = ses_dict.copy()
self.ses_dict['number'] = np.random.randint(1, 999)
ses = one.alyx.rest('sessions', 'create', data=self.ses_dict)
session_path = Path(self.td.name).joinpath(
ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))
session_path.joinpath('alf').mkdir(exist_ok=True, parents=True)
self.session_path = session_path
self.eid = ses['url'][-36:]
[docs]
def tearDown(self) -> None:
self.td.cleanup()
one.alyx.rest('sessions', 'delete', id=self.eid)
[docs]
@mock.patch('ibllib.pipes.tasks.get_lab')
def test_pipeline_alyx(self, mock_ep):
mock_ep.return_value = 'cortexlab'
eid = self.eid
pipeline = SomePipeline(self.session_path, one=one, eid=eid)
# prepare by deleting all jobs/tasks related
tasks = one.alyx.rest('tasks', 'list', session=eid, no_cache=True)
self.assertEqual(len(tasks), 0)
# create tasks and jobs from scratch
NTASKS = len(pipeline.tasks)
pipeline.make_graph(show=False)
alyx_tasks = pipeline.create_alyx_tasks()
self.assertTrue(len(alyx_tasks) == NTASKS)
# get the pending jobs from alyx
tasks = one.alyx.rest('tasks', 'list', session=eid, status='Waiting', no_cache=True)
self.assertTrue(len(tasks) == NTASKS)
# run them and make sure their statuses got updated appropriately
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.session_path).joinpath('.gpu_lock')):
task_deck, datasets = pipeline.run(machine='testmachine')
self.assertCountEqual(desired_statuses.items(), [(t['name'], t['status']) for t in task_deck])
self.assertEqual(set(d['name'] for d in datasets), set(desired_datasets))
# check logs
check_logs = (desired_logs in t['log'] if t['log'] else True for t in task_deck)
self.assertTrue(all(check_logs))
# also checks that the datasets have been labeled with the proper version
dsets = one.alyx.rest('datasets', 'list', session=eid, no_cache=True)
check_versions = (desired_versions[d['name']] == d['version'] for d in dsets)
self.assertTrue(all(check_versions))
# make sure that re-running the make job by default doesn't change complete jobs
pipeline.create_alyx_tasks()
task_deck = one.alyx.rest('tasks', 'list', session=eid, no_cache=True)
check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck)
self.assertTrue(all(check_statuses))
# test the rerun option
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.td.name).joinpath('.gpu_lock')):
task_deck, dsets = pipeline.rerun_failed(machine='testmachine')
task_02 = next(t for t in task_deck if t['name'] == 'Task02_error')
self.assertEqual('Complete', task_02['status'])
dep_task = next(x for x in task_deck if task_02['id'] in x['parents'])
assert dep_task['name'] == 'Task11'
self.assertEqual('Complete', dep_task['status'], 'Failed to set dependent task from "Held" to "Waiting"')
# check that logs were correctly overwritten
check_logs = (t['log'].count(desired_logs) == 1 if t['log'] else True for t in task_deck)
check_rerun = ('===RERUN===' not in t['log'] if t['log'] else True for t in task_deck)
self.assertTrue(all(check_logs))
self.assertTrue(all(check_rerun))
# Rerun without clobber and check that logs are not overwritten
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(self.td.name).joinpath('.gpu_lock')):
task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False)
check_logs = (t['log'].count(desired_logs) == desired_logs_rerun[t['name']] if t['log']
else t['log'] == desired_logs_rerun[t['name']] for t in task_deck)
check_rerun = ('===RERUN===' in t['log'] if desired_logs_rerun[t['name']] == 2
else True for t in task_deck)
self.assertTrue(all(check_logs))
self.assertTrue(all(check_rerun))
[docs]
class GpuTask(ibllib.pipes.tasks.Task):
gpu = 1
def _run(self, overwrite=False):
out_files = self.session_path.joinpath('alf', 'gpu.times.npy')
out_files.touch()
return out_files
[docs]
class TestLocks(unittest.TestCase):
[docs]
def test_gpu_lock_and_local_data_handler(self) -> None:
# Remove any existing locks first
with tempfile.TemporaryDirectory() as td:
session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
session_path.joinpath('alf').mkdir(parents=True)
task = GpuTask(session_path, one=None, location='local')
# Patch _lock_file_path method to point to different lock file location
with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
return_value=Path(td).joinpath('.gpu_lock')):
self.assertFalse(task.is_locked())
task.run()
self.assertEqual(0, task.status)
self.assertFalse(task.is_locked())
# then make a lock file and make sure it fails and is still locked afterwards
task._make_lock_file()
task.run()
self.assertEqual(-2, task.status)
self.assertTrue(task.is_locked())
# test the time out feature
task.time_out_secs = - 1
task._make_lock_file()
self.assertFalse(task.is_locked())
task.run()
self.assertEqual(0, task.status)
[docs]
class TestExperimentDescriptionRegisterRaw(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.tempdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tempdir.cleanup)
session_path = Path(self.tempdir.name).joinpath(
SUBJECT_NAME, ses_dict['start_time'][:10], str(ses_dict['number']).zfill(3))
session_path.mkdir(parents=True)
# Check for session on Alyx
with no_cache(one.alyx):
# If the session exists, ensure sign_off_checklist key not in JSON
if eid := one.path2eid(session_path, query_type='remote'):
json_field = one.get_details(eid, full=True).get('json') or {}
if json_field.pop('sign_off_checklist', False):
one.alyx.json_field_remove_key('sessions', eid, key='sign_off_checklist')
else: # Create a new session and add cleanup hook
ses = one.alyx.rest('sessions', 'create', data=ses_dict)
self.addCleanup(one.alyx.rest, 'sessions', 'delete', id=eid)
eid = ses['id']
fixture = Path(__file__).parent.joinpath(
'fixtures', 'io', '_ibl_experiment.description.yaml')
shutil.copy(fixture, session_path.joinpath('_ibl_experiment.description.yaml'))
self.session_path, self.eid = session_path, eid
[docs]
def test_experiment_description_registration(self):
task = ExperimentDescriptionRegisterRaw(self.session_path, one=one)
# Add a custom sign off key
task.sign_off_categories['microphone'] = ['foo', 'bar']
task.run()
# Check that description file was registered
ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True)
# Check keys added to JSON
expected = {'_widefield': None,
'_microphone_foo': None,
'_microphone_bar': None,
'_neuropixel_raw_probe00': None,
'_neuropixel_spike_sorting_probe00': None,
'_neuropixel_alignment_probe00': None,
'_neuropixel_raw_probe01': None,
'_neuropixel_spike_sorting_probe01': None,
'_neuropixel_alignment_probe01': None,
'_ephysChoiceWorld_01': None,
'_passiveChoiceWorld_00': None}
self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
# Run again without a custom sign off for neuropixels
one.alyx.json_field_remove_key('sessions', self.eid, key='sign_off_checklist')
task.sign_off_categories.pop('neuropixel')
task.run()
ses = one.alyx.rest('sessions', 'read', id=self.eid, no_cache=True)
expected = {'_widefield': None,
'_microphone_foo': None,
'_microphone_bar': None,
'_neuropixel_probe00': None,
'_neuropixel_probe01': None,
'_ephysChoiceWorld_01': None,
'_passiveChoiceWorld_00': None}
self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
[docs]
class TestDynamicTask(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.tempdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tempdir.cleanup)
self.task = VideoConvert(self.tempdir.name, ['left'])
[docs]
def test_get_device_collection(self):
"""Test for DynamicTask.get_device_collection method"""
device = 'probe00'
collection = self.task.get_device_collection(device, 'raw_ephys_data')
self.assertEqual('raw_ephys_data', collection)
fixture = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml')
assert fixture.exists()
self.task.session_params = session_params.read_params(fixture)
collection = self.task.get_device_collection(device)
self.assertEqual('raw_ephys_data/probe00', collection)
[docs]
class TestTask(unittest.TestCase):
[docs]
def setUp(self):
tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(tmpdir.cleanup)
self.tmpdir = Path(tmpdir.name)
self.session_path = self.tmpdir.joinpath('subject', '1900-01-01', '001')
self.session_path.mkdir(parents=True)
[docs]
class TestMisc(unittest.TestCase):
"""Tests for misc functions in ibllib.pipes.tasks module."""
[docs]
def test_str2class(self):
"""Test ibllib.pipes.tasks.str2class function."""
task_str = 'ibllib.pipes.base_tasks.ExperimentDescriptionRegisterRaw'
self.assertIs(ibllib.pipes.tasks.str2class(task_str), ExperimentDescriptionRegisterRaw)
self.assertRaises(AttributeError, ibllib.pipes.tasks.str2class, 'ibllib.pipes.base_tasks.Foo')
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)