Source code for ibllib.tests.test_pipes

import shutil
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import random
import string
from uuid import uuid4

from one.webclient import AlyxClient
from one.api import ONE

import ibllib.tests.fixtures.utils as fu
from ibllib.pipes import misc, local_server
from ibllib.pipes.misc import sleepless
from ibllib.tests import TEST_DB
import ibllib.pipes.scan_fix_passive_files as fix
from ibllib.pipes.base_tasks import RegisterRawDataTask
from ibllib.pipes.ephys_tasks import SpikeSorting


[docs] class TestLocalServer(unittest.TestCase): """Tests for the ibllib.pipes.local_server module."""
[docs] def setUp(self): tmp = tempfile.TemporaryDirectory() self.tmpdir = Path(tmp.name) self.addCleanup(tmp.cleanup) raw_behaviour_data = fu.create_fake_raw_behavior_data_folder(self.tmpdir / 'subject/2020-01-01/001', task='ephys') raw_behaviour_data.parent.joinpath('raw_session.flag').touch() fu.populate_task_settings(raw_behaviour_data, patch={'PYBPOD_PROTOCOL': '_iblrig_ephysChoiceWorld5.2.1'}) raw_behaviour_data = fu.create_fake_raw_behavior_data_folder(self.tmpdir / 'subject/2020-01-01/002') raw_behaviour_data.parent.joinpath('raw_session.flag').touch() fu.populate_task_settings(raw_behaviour_data, patch={'PYBPOD_PROTOCOL': 'ephys_optoChoiceWorld6.0.1'})
[docs] @mock.patch('ibllib.pipes.local_server.get_local_data_repository') def test_task_queue(self, lab_repo_mock): """Test ibllib.pipes.local_server.task_queue function.""" lab_repo_mock.return_value = 'foo_repo' tasks = [ {'executable': 'ibllib.pipes.mesoscope_tasks.MesoscopePreprocess', 'priority': 80}, {'executable': 'ibllib.pipes.ephys_tasks.SpikeSorting', 'priority': SpikeSorting.priority}, # 60 {'executable': 'ibllib.pipes.base_tasks.RegisterRawDataTask', 'priority': RegisterRawDataTask.priority} # 100 ] alyx = mock.Mock(spec=AlyxClient) alyx.rest.return_value = tasks queue = local_server.task_queue(lab='foolab', alyx=alyx) alyx.rest.assert_called() self.assertEqual('Waiting', alyx.rest.call_args.kwargs.get('status')) self.assertIn('foolab', alyx.rest.call_args.kwargs.get('django', '')) self.assertIn('foo_repo', alyx.rest.call_args.kwargs.get('django', '')) # Expect to return tasks in descending priority order, without mesoscope task (different env) self.assertEqual([tasks[2]], queue) # Expect only mesoscope task returned when relevant env passed queue = local_server.task_queue(lab='foolab', alyx=alyx, env=('suite2p', 'iblsorter')) self.assertEqual([tasks[0], tasks[1]], queue) # Expect no tasks as mesoscope task is a large job queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx, env=('suite2p',)) self.assertEqual([], queue) # Expect only register task as it's the only small job queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx) self.assertEqual([tasks[2]], queue)
[docs] class TestPipesMisc(unittest.TestCase): """"""
[docs] def setUp(self): # Data emulating local rig data self.root_test_folder = tempfile.TemporaryDirectory() # Create two rig sessions, one with 3A probe data and one with 3B probe data self.session_path_3A = fu.create_fake_session_folder(self.root_test_folder.name) # fu.create_fake_raw_behavior_data_folder(self.session_path_3A) self.session_path_3B = fu.create_fake_session_folder(self.root_test_folder.name) fu.create_fake_raw_behavior_data_folder(self.session_path_3B) # Make some files fu.populate_raw_spikeglx(self.session_path_3B / 'raw_ephys_data', '3B', n_probes=3) ephys_folder = self.session_path_3A / 'raw_ephys_data' fu.populate_raw_spikeglx(ephys_folder, '3A', legacy=True, n_probes=1) # IBL protocol is for users to copy data to the right probe folder shutil.move(ephys_folder.joinpath('raw_ephys_folder'), ephys_folder.joinpath('my_run_probe00'))
[docs] def test_get_new_filename(self): different_gt = "ignoreThisPart_g1_t2.imec.ap.meta" nidaq = 'foobar_g0_t0.nidq.cbin' for suf in ('.ap.bin', '0.ap.bin', '1.ap.meta'): newfname = misc.get_new_filename(f'ignoreThisPart_g0_t0.imec{suf}') self.assertEqual(f'_spikeglx_ephysData_g0_t0.imec{suf}', newfname) newfname = misc.get_new_filename(different_gt) self.assertEqual('_spikeglx_ephysData_g1_t2.imec.ap.meta', newfname) self.assertEqual('_spikeglx_ephysData_g0_t0.nidq.cbin', misc.get_new_filename(nidaq)) # Test errors with self.assertRaises(ValueError): misc.get_new_filename('_spikeglx_ephysData_g0_t0.wiring') with self.assertRaises(ValueError): misc.get_new_filename('_spikeglx_ephysData.meta.cbin')
def _test_rename_ephys_files(self, path, expected_n): """Test SpikeGLX output files are correctly renamed""" misc.rename_ephys_files(path) n = 0 for f in path.rglob("*.*.*"): if any(x in f.name for x in ('.ap.', '.lf.', '.nidq.')): self.assertTrue(f.name.startswith('_spikeglx_ephysData_g')) n += 1 self.assertEqual(expected_n, n)
[docs] def test_rename_and_move(self): # Test for 3A self._test_rename_ephys_files(self.session_path_3A, 4) misc.move_ephys_files(self.session_path_3A) probe_folders = list(self.session_path_3A.rglob("*probe*")) self.assertTrue(len(probe_folders) == 1 and probe_folders[0].parts[-1] == 'probe00') expected = [ '_spikeglx_ephysData_g0_t0.imec.ap.bin', '_spikeglx_ephysData_g0_t0.imec.ap.meta', '_spikeglx_ephysData_g0_t0.imec.lf.bin', '_spikeglx_ephysData_g0_t0.imec.lf.meta' ] self.assertCountEqual(expected, [x.name for x in probe_folders[0].glob('*')]) # Test for 3B self._test_rename_ephys_files(self.session_path_3B, 14) misc.move_ephys_files(self.session_path_3B) probe_folders = sorted(self.session_path_3B.rglob("*probe*")) # Check moved into 'probexx' folders self.assertTrue(len(probe_folders) == 3) self.assertCountEqual((f'probe0{x}' for x in range(3)), [x.parts[-1] for x in probe_folders]) for i in range(3): expected = [ f'_spikeglx_ephysData_g0_t0.imec{i}.ap.bin', f'_spikeglx_ephysData_g0_t0.imec{i}.ap.meta', f'_spikeglx_ephysData_g0_t0.imec{i}.lf.bin', f'_spikeglx_ephysData_g0_t0.imec{i}.lf.meta' ] self.assertCountEqual(expected, [x.name for x in probe_folders[i].glob('*')]) nidq_files = self.session_path_3B.joinpath('raw_ephys_data').glob("*.nidq.*") expected = ['_spikeglx_ephysData_g0_t0.nidq.bin', '_spikeglx_ephysData_g0_t0.nidq.meta'] self.assertCountEqual(expected, [x.name for x in nidq_files])
[docs] def test_create_alyx_probe_insertions(self): # Connect to test DB one = ONE(**TEST_DB) # Create new session on database with a random date to avoid race conditions _, eid = fu.register_new_session(one, subject='ZM_1150') eid = str(eid) # Currently the task protocol of a session must contain 'ephys' in order to create an insertion! one.alyx.rest('sessions', 'partial_update', id=eid, data={'task_protocol': 'ephys'}) self.addCleanup(one.alyx.rest, 'sessions', 'delete', id=eid) # Delete after test # Force probe insertion 3A labels = [''.join(random.choices(string.ascii_letters, k=5)), ''.join(random.choices(string.ascii_letters, k=5))] misc.create_alyx_probe_insertions( eid, one=one, model="3A", labels=labels, force=True ) # Verify it's been inserted alyx_insertion = one.alyx.rest("insertions", "list", session=eid, no_cache=True) alyx_insertion = [x for x in alyx_insertion if x["model"] == "3A"] self.assertTrue(alyx_insertion[0]["model"] == "3A") self.assertTrue(alyx_insertion[0]["name"] in labels) self.assertTrue(alyx_insertion[1]["model"] == "3A") self.assertTrue(alyx_insertion[1]["name"] in labels) # Cleanup DB one.alyx.rest("insertions", "delete", id=alyx_insertion[0]["id"]) one.alyx.rest("insertions", "delete", id=alyx_insertion[1]["id"]) # Force probe insertion 3B labels = [''.join(random.choices(string.ascii_letters, k=5)), ''.join(random.choices(string.ascii_letters, k=5))] misc.create_alyx_probe_insertions(eid, one=one, model="3B2", labels=labels) # Verify it's been inserted alyx_insertion = one.alyx.rest("insertions", "list", session=eid, no_cache=True) self.assertTrue(alyx_insertion[0]["model"] == "3B2") self.assertTrue(alyx_insertion[0]["name"] in labels) self.assertTrue(alyx_insertion[1]["model"] == "3B2") self.assertTrue(alyx_insertion[1]["name"] in labels) # Cleanup DB one.alyx.rest("insertions", "delete", id=alyx_insertion[0]["id"]) one.alyx.rest("insertions", "delete", id=alyx_insertion[1]["id"])
[docs] def test_probe_names_from_session_path(self): expected_pnames = ['probe00', 'probe01', 'probe03', 'probe02a', 'probe02b', 'probe02c', 'probe02d', 'probe04'] nidq_file = Path(__file__).parent.joinpath("fixtures/pipes", "sample3B_g0_t0.nidq.meta") meta_files = { "probe00": Path(__file__).parent.joinpath("fixtures/pipes", "sample3A_g0_t0.imec.ap.meta"), "probe01": Path(__file__).parent.joinpath("fixtures/pipes", "sample3B_g0_t0.imec1.ap.meta"), "probe04": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.1_g0_t0.imec.ap.meta"), "probe03": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.4_1shank_g0_t0.imec.ap.meta"), "probe02": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.4_4shanks_g0_t0.imec.ap.meta"), } with tempfile.TemporaryDirectory() as tdir: session_path = Path(tdir).joinpath('Algernon', '2021-02-12', '001') raw_ephys_path = session_path.joinpath('raw_ephys_data') raw_ephys_path.mkdir(parents=True, exist_ok=True) shutil.copy(nidq_file, raw_ephys_path.joinpath("_spikeglx_ephysData_g0_t0.nidq.meta")) for pname, meta_file in meta_files.items(): probe_path = raw_ephys_path.joinpath(pname) probe_path.mkdir() shutil.copy(meta_file, probe_path.joinpath('_spikeglx_ephysData_g0_t0.imec0.ap.meta')) probe_path.joinpath('nested_folder').mkdir() probe_path.joinpath('nested_folder', 'toto.ap.meta').touch() self.assertEqual(set(misc.probe_labels_from_session_path(session_path)), set(expected_pnames))
[docs] def tearDown(self): self.root_test_folder.cleanup()
[docs] class TestScanFixPassiveFiles(unittest.TestCase): """"""
[docs] def setUp(self): self.tmp_dir = tempfile.TemporaryDirectory() # Session 001 and 002 are 2 sessions with ephys and passive badly transferred ( self.session_path, self.passive_session_path, ) = fu.create_fake_ephys_recording_bad_passive_transfer_sessions( self.tmp_dir.name, lab="fakelab", mouse="fakemouse", date="1900-01-01", num="001" ) # Create another complete ephys session same mouse and date self.other_good_session = fu.create_fake_complete_ephys_session( self.tmp_dir.name, lab="fakelab", mouse="fakemouse", date="1900-01-01", increment=True )
[docs] def test_scan_fix(self): from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=True) self.assertTrue(len(from_to_pairs) == 1) self.assertTrue(sum(moved_ok) == 0) from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=False) self.assertTrue(len(from_to_pairs) == 1) self.assertTrue(sum(moved_ok) == 1) # Second run Nothing to do from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=True) self.assertTrue(len(from_to_pairs) == 0) self.assertTrue(sum(moved_ok) == 0) from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=False) self.assertTrue(len(from_to_pairs) == 0) self.assertTrue(sum(moved_ok) == 0)
[docs] def test_find_pairs(self): from_to_pairs = fix.find_pairs(self.tmp_dir.name) from_path_parts = ['fakelab', 'Subjects', 'fakemouse', '1900-01-01', '002'] self.assertTrue(all(x in Path(from_to_pairs[0][0]).parts for x in from_path_parts)) to_path_parts = ['fakelab', 'Subjects', 'fakemouse', '1900-01-01', '001'] self.assertTrue(all(x in Path(from_to_pairs[0][1]).parts for x in to_path_parts))
[docs] def test_move_rename_pairs(self): # Test all outputs of find function from_to_pairs = [] moved_ok = fix.move_rename_pairs(from_to_pairs) self.assertTrue(not moved_ok) # Bad paths from_to_pairs = [("bla", "ble")] moved_ok = fix.move_rename_pairs(from_to_pairs) self.assertTrue(sum(moved_ok) == 0) # Same as execute from_to_pairs = fix.find_pairs(self.tmp_dir.name) moved_ok = fix.move_rename_pairs(from_to_pairs) self.assertTrue(sum(moved_ok) == 1)
[docs] def tearDown(self): self.tmp_dir.cleanup()
[docs] class TestRegisterRawDataTask(unittest.TestCase):
[docs] def setUp(self) -> None: self.one = ONE(**TEST_DB) self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) self.session_path = Path(self.tempdir.name).joinpath('subject', '2023-01-01', '001') self.session_path.mkdir(parents=True)
[docs] def test_rename_files(self): """Test upload of snapshots. Another test for this exists in ibllib.tests.test_base_tasks.TestRegisterRawDataTask. This test does not work on real files and works without a test db. """ # Add base dir snapshot (folder := self.session_path.joinpath('snapshots')).mkdir() folder.joinpath('snap.PNG').touch() collection = 'raw_task_data' for i, ext in enumerate(['tif', 'jpg', 'gif']): (p := self.session_path.joinpath(f'{collection}_{i:02}', 'snapshots')).mkdir(parents=True) p.joinpath(f'snapshot.{ext}').touch() # Stuff with text note p = self.session_path.joinpath(f'{collection}_00', 'snapshots', 'pic.jpeg') with open(p, 'wb') as fp: fp.write('foo'.encode()) with open(p.with_name('pic.txt'), 'w') as fp: fp.write('bar') task = RegisterRawDataTask(self.session_path, one=self.one) # Mock the _is_animated_gif function to return true for any GIF file as_png_side_effect = lambda x: x.with_suffix('.png').touch() or x.with_suffix('.png') # noqa with mock.patch.object(self.one.alyx, 'rest') as rest, \ mock.patch.object(self.one, 'path2eid', return_value=str(uuid4())), \ mock.patch.object(task, '_save_as_png', side_effect=as_png_side_effect), \ mock.patch.object(task, '_is_animated_gif', side_effect=lambda x: x.suffix == '.gif'): task.register_snapshots(collection=['', f'{collection}*']) self.assertEqual(5, rest.call_count) files = [] for args, kwargs in rest.call_args_list: self.assertEqual(('notes', 'create'), args) files.append(Path(kwargs['files']['image'].name).name) width = kwargs['data'].get('width') # Test that original size passed as width only for gif file self.assertEqual('orig', width) if files[-1].endswith('gif') else self.assertIsNone(width) expected = ('snap.PNG', 'pic.jpeg', 'snapshot.png', 'snapshot.jpg', 'snapshot.gif') self.assertCountEqual(expected, files)
[docs] class TestSleeplessDecorator(unittest.TestCase):
[docs] def test_decorator_argument_passing(self): def dummy_function(arg1, arg2): return arg1, arg2 # Applying the decorator to the dummy function decorated_func = sleepless(dummy_function) # Check if the function name is maintained self.assertEqual(decorated_func.__name__, 'dummy_function') # Check if arguments are passed correctly result = decorated_func("test1", "test2") self.assertEqual(result, ("test1", "test2"))
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)