Source code for ibllib.tests.test_tasks

import shutil
import tempfile
import unittest
from unittest import mock
from pathlib import Path
from collections import OrderedDict
import numpy as np

import ibllib.pipes.tasks
from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
from ibllib.pipes.video_tasks import VideoConvert
from import session_params
from one.api import ONE
from one.webclient import no_cache
from ibllib.tests import TEST_DB

one = ONE(**TEST_DB)
SUBJECT_NAME = 'algernon'
USER_NAME = 'test_user'

ses_dict = {
    'subject': SUBJECT_NAME,
    'start_time': '2018-04-01T12:48:26.795526',
    'number': 1,
    'users': [USER_NAME]

desired_statuses = {
    'Task00': 'Complete',
    'Task01_void': 'Empty',
    'Task02_error': 'Errored',
    'Task10': 'Complete',
    'Task11': 'Held',
    'TaskIncomplete': 'Incomplete',
    'TaskGpuLock': 'Waiting'

desired_datasets = ['spikes.times.npy', 'spikes.amps.npy', 'spikes.clusters.npy']
desired_versions = {'spikes.times.npy': 'custom_job00',
                    'spikes.amps.npy': ibllib.__version__,
                    'spikes.clusters.npy': ibllib.__version__}
desired_logs = 'Running on machine: testmachine'
desired_logs_rerun = {
    'Task00': 1,
    'Task01_void': 2,
    'Task02_error': 1,
    'Task10': 1,
    'Task11': 1,
    'TaskIncomplete': 1,
    'TaskGpuLock': 2

#  job to output a single file (pathlib.Path)
class Task00(ibllib.pipes.tasks.Task):
    version = 'custom_job00'

    def _run(self, overwrite=False):
        out_files = self.session_path.joinpath('alf', 'spikes.times.npy')
        out_files.touch()
        return out_files
# job that outputs nothing
class Task01_void(ibllib.pipes.tasks.Task):

    def _run(self, overwrite=False):
        out_files = None
        return out_files
# job that raises an error on first run
class Task02_error(ibllib.pipes.tasks.Task):
    run_count = 0

    def _run(self, overwrite=False):
        Task02_error.run_count += 1
        if Task02_error.run_count == 1:
            raise Exception('Something dumb happened')
        out_files = self.session_path.joinpath('alf', 'spikes.templates.npy')
        out_files.touch()
        return out_files
# job that outputs a list of files
class Task10(ibllib.pipes.tasks.Task):
    level = 1

    def _run(self, overwrite=False):
        out_files = [
            self.session_path.joinpath('alf', 'spikes.amps.npy'),
            self.session_path.joinpath('alf', 'spikes.clusters.npy')]
        [f.touch() for f in out_files]
        return out_files
# job to output a single file (pathlib.Path)
class Task11(ibllib.pipes.tasks.Task):
    level = 1

    def _run(self, overwrite=False):
        out_files = self.session_path.joinpath('alf', 'spikes.samples.npy')
        out_files.touch()
        return out_files
# Job that encounters a GPU lock and is set to Waiting
class TaskGpuLock(ibllib.pipes.tasks.Task):
    gpu = 1

    # Overwrite setUp to create a lock file before running the task and remove it after
def setUp(self):
        self.make_lock_file()
        self.data_handler = self.get_data_handler()
        return True
def _run(self, overwrite=False): pass
# Job that encounters a GPU lock and is set to Waiting
class TaskIncomplete(ibllib.pipes.tasks.Task):

    def _run(self, overwrite=False):
        self.status = -3
class SomePipeline(ibllib.pipes.tasks.Pipeline):

    def __init__(self, session_path=None, **kwargs):
        super(SomePipeline, self).__init__(session_path, **kwargs)
        tasks = OrderedDict()
        self.session_path = session_path
        # level 0
        tasks['Task00'] = Task00(self.session_path)
        tasks['Task01_void'] = Task01_void(self.session_path)
        tasks['Task02_error'] = Task02_error(self.session_path)
        tasks['TaskGpuLock'] = TaskGpuLock(self.session_path)
        tasks['TaskIncomplete'] = TaskIncomplete(self.session_path)
        tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']])
        # When both its parents Complete, this task should be set to Waiting and should finally complete
        tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'], tasks['Task00']])
        self.tasks = tasks
class TestPipelineAlyx(unittest.TestCase):
def setUp(self) -> None:
 = tempfile.TemporaryDirectory()
        # ses ='sessions', 'list', subject=ses_dict['subject'],
        #                     date_range=[ses_dict['start_time'][:10]] * 2,
        #                     number=ses_dict['number'], no_cache=True)
        # if len(ses):
        #'sessions', 'delete', ses[0]['url'][-36:])
        # randomise number
        ses_dict['number'] = np.random.randint(1, 30)
        ses ='sessions', 'create', data=ses_dict)
        session_path = Path(
            ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))
        session_path.joinpath('alf').mkdir(exist_ok=True, parents=True)
        self.session_path = session_path
        self.eid = ses['url'][-36:]
def tearDown(self) -> None:
'sessions', 'delete', id=self.eid)
@mock.patch('ibllib.pipes.tasks.get_lab')
    def test_pipeline_alyx(self, mock_ep):
        mock_ep.return_value = 'cortexlab'
        eid = self.eid
        pipeline = SomePipeline(self.session_path, one=one, eid=eid)
        # prepare by deleting all jobs/tasks related
        tasks ='tasks', 'list', session=eid, no_cache=True)
        self.assertEqual(len(tasks), 0)
        # create tasks and jobs from scratch
        NTASKS = len(pipeline.tasks)
        pipeline.make_graph(show=False)
        alyx_tasks = pipeline.create_alyx_tasks()
        self.assertTrue(len(alyx_tasks) == NTASKS)
        # get the pending jobs from alyx
        tasks ='tasks', 'list', session=eid, status='Waiting', no_cache=True)
        self.assertTrue(len(tasks) == NTASKS)
        # run them and make sure their statuses got updated appropriately
        with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
                               return_value=Path(self.session_path).joinpath('.gpu_lock')):
            task_deck, datasets ='testmachine')
        check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck)
        # [(t['name'], t['status'], desired_statuses[t['name']]) for t in task_deck]
        self.assertTrue(all(check_statuses))
        self.assertEqual(set(d['name'] for d in datasets), set(desired_datasets))
        # check logs
        check_logs = (desired_logs in t['log'] if t['log'] else True for t in task_deck)
        self.assertTrue(all(check_logs))
        # also checks that the datasets have been labeled with the proper version
        dsets ='datasets', 'list', session=eid, no_cache=True)
        check_versions = (desired_versions[d['name']] == d['version'] for d in dsets)
        self.assertTrue(all(check_versions))
        # make sure that re-running the make job by default doesn't change complete jobs
        pipeline.create_alyx_tasks()
        task_deck ='tasks', 'list', session=eid, no_cache=True)
        check_statuses = (desired_statuses[t['name']] == t['status'] for t in task_deck)
        self.assertTrue(all(check_statuses))
        # test the rerun option
        with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
                               return_value=Path('.gpu_lock')):
            task_deck, dsets = pipeline.rerun_failed(machine='testmachine')
        task_02 = next(t for t in task_deck if t['name'] == 'Task02_error')
        self.assertEqual('Complete', task_02['status'])
        dep_task = next(x for x in task_deck if task_02['id'] in x['parents'])
        assert dep_task['name'] == 'Task11'
        self.assertEqual('Complete', dep_task['status'],
                         'Failed to set dependent task from "Held" to "Waiting"')
        # check that logs were correctly overwritten
        check_logs = (t['log'].count(desired_logs) == 1 if t['log'] else True for t in task_deck)
        check_rerun = ('===RERUN===' not in t['log'] if t['log'] else True for t in task_deck)
        self.assertTrue(all(check_logs))
        self.assertTrue(all(check_rerun))
        # Rerun without clobber and check that logs are not overwritten
        with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
                               return_value=Path('.gpu_lock')):
            task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False)
        check_logs = (t['log'].count(desired_logs) == desired_logs_rerun[t['name']]
                      if t['log'] else t['log'] == desired_logs_rerun[t['name']]
                      for t in task_deck)
        check_rerun = ('===RERUN===' in t['log'] if desired_logs_rerun[t['name']] == 2 else True
                       for t in task_deck)
        self.assertTrue(all(check_logs))
        self.assertTrue(all(check_rerun))
class GpuTask(ibllib.pipes.tasks.Task):
    gpu = 1

    def _run(self, overwrite=False):
        out_files = self.session_path.joinpath('alf', 'gpu.times.npy')
        out_files.touch()
        return out_files
class TestLocks(unittest.TestCase):
def test_gpu_lock_and_local_data_handler(self) -> None:
        # Remove any existing locks first
        with tempfile.TemporaryDirectory() as td:
            session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
            session_path.joinpath('alf').mkdir(parents=True)
            task = GpuTask(session_path, one=None, location='local')
            # Patch _lock_file_path method to point to different lock file location
            with mock.patch.object(ibllib.pipes.tasks.Task, '_lock_file_path',
                                   return_value=Path(td).joinpath('.gpu_lock')):
                self.assertFalse(task.is_locked())
                self.assertEqual(0, task.status)
                self.assertFalse(task.is_locked())
                # then make a lock file and make sure it fails and is still locked afterwards
                task._make_lock_file()
                self.assertEqual(-2, task.status)
                self.assertTrue(task.is_locked())
                # test the time out feature
                task.time_out_secs = - 1
                task._make_lock_file()
                self.assertFalse(task.is_locked())
                self.assertEqual(0, task.status)
class TestExperimentDescriptionRegisterRaw(unittest.TestCase):
def setUp(self) -> None:
        self.tempdir = tempfile.TemporaryDirectory()
        self.addCleanup(self.tempdir.cleanup)
        session_path = Path(
            SUBJECT_NAME, ses_dict['start_time'][:10], str(ses_dict['number']).zfill(3))
        session_path.mkdir(parents=True)
        # Check for session on Alyx
        with no_cache(one.alyx):
            # If the session exists, ensure sign_off_checklist key not in JSON
            if eid := one.path2eid(session_path, query_type='remote'):
                json_field = one.get_details(eid, full=True).get('json') or {}
                if json_field.pop('sign_off_checklist', False):
                    one.alyx.json_field_remove_key('sessions', eid, key='sign_off_checklist')
            else:
                # Create a new session and add cleanup hook
                ses ='sessions', 'create', data=ses_dict)
                self.addCleanup(, 'sessions', 'delete', id=eid)
                eid = ses['id']
        fixture = Path(__file__).parent.joinpath(
            'fixtures', 'io', '_ibl_experiment.description.yaml')
        shutil.copy(fixture, session_path.joinpath('_ibl_experiment.description.yaml'))
        self.session_path, self.eid = session_path, eid
def test_experiment_description_registration(self):
        task = ExperimentDescriptionRegisterRaw(self.session_path, one=one)
        # Add a custom sign off key
        task.sign_off_categories['microphone'] = ['foo', 'bar']
        # Check that description file was registered
        ses ='sessions', 'read', id=self.eid, no_cache=True)
        # Check keys added to JSON
        expected = {'_widefield': None,
                    '_microphone_foo': None,
                    '_microphone_bar': None,
                    '_neuropixel_raw_probe00': None,
                    '_neuropixel_spike_sorting_probe00': None,
                    '_neuropixel_alignment_probe00': None,
                    '_neuropixel_raw_probe01': None,
                    '_neuropixel_spike_sorting_probe01': None,
                    '_neuropixel_alignment_probe01': None,
                    '_ephysChoiceWorld_01': None,
                    '_passiveChoiceWorld_00': None}
        self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
        # Run again without a custom sign off for neuropixels
        one.alyx.json_field_remove_key('sessions', self.eid, key='sign_off_checklist')
        task.sign_off_categories.pop('neuropixel')
        ses ='sessions', 'read', id=self.eid, no_cache=True)
        expected = {'_widefield': None,
                    '_microphone_foo': None,
                    '_microphone_bar': None,
                    '_neuropixel_probe00': None,
                    '_neuropixel_probe01': None,
                    '_ephysChoiceWorld_01': None,
                    '_passiveChoiceWorld_00': None}
        self.assertDictEqual(expected, ses['json'].get('sign_off_checklist', {}))
class TestDynamicTask(unittest.TestCase):
def setUp(self) -> None:
        self.tempdir = tempfile.TemporaryDirectory()
        self.addCleanup(self.tempdir.cleanup)
        self.task = VideoConvert(, ['left'])
def test_get_device_collection(self):
        """Test for DynamicTask.get_device_collection method"""
        device = 'probe00'
        collection = self.task.
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)