import shutil
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import random
import string
from uuid import uuid4
from one.webclient import AlyxClient
from one.api import ONE
import ibllib.tests.fixtures.utils as fu
from ibllib.pipes import misc, local_server
from ibllib.pipes.misc import sleepless
from ibllib.tests import TEST_DB
import ibllib.pipes.scan_fix_passive_files as fix
from ibllib.pipes.base_tasks import RegisterRawDataTask
from ibllib.pipes.ephys_tasks import SpikeSorting
[docs]
class TestLocalServer(unittest.TestCase):
"""Tests for the ibllib.pipes.local_server module."""
[docs]
def setUp(self):
tmp = tempfile.TemporaryDirectory()
self.tmpdir = Path(tmp.name)
self.addCleanup(tmp.cleanup)
raw_behaviour_data = fu.create_fake_raw_behavior_data_folder(self.tmpdir / 'subject/2020-01-01/001', task='ephys')
raw_behaviour_data.parent.joinpath('raw_session.flag').touch()
fu.populate_task_settings(raw_behaviour_data, patch={'PYBPOD_PROTOCOL': '_iblrig_ephysChoiceWorld5.2.1'})
raw_behaviour_data = fu.create_fake_raw_behavior_data_folder(self.tmpdir / 'subject/2020-01-01/002')
raw_behaviour_data.parent.joinpath('raw_session.flag').touch()
fu.populate_task_settings(raw_behaviour_data, patch={'PYBPOD_PROTOCOL': 'ephys_optoChoiceWorld6.0.1'})
[docs]
@mock.patch('ibllib.pipes.local_server.get_local_data_repository')
def test_task_queue(self, lab_repo_mock):
"""Test ibllib.pipes.local_server.task_queue function."""
lab_repo_mock.return_value = 'foo_repo'
tasks = [
{'executable': 'ibllib.pipes.mesoscope_tasks.MesoscopePreprocess', 'priority': 80},
{'executable': 'ibllib.pipes.ephys_tasks.SpikeSorting', 'priority': SpikeSorting.priority}, # 60
{'executable': 'ibllib.pipes.base_tasks.RegisterRawDataTask', 'priority': RegisterRawDataTask.priority} # 100
]
alyx = mock.Mock(spec=AlyxClient)
alyx.rest.return_value = tasks
queue = local_server.task_queue(lab='foolab', alyx=alyx)
alyx.rest.assert_called()
self.assertEqual('Waiting', alyx.rest.call_args.kwargs.get('status'))
self.assertIn('foolab', alyx.rest.call_args.kwargs.get('django', ''))
self.assertIn('foo_repo', alyx.rest.call_args.kwargs.get('django', ''))
# Expect to return tasks in descending priority order, without mesoscope task (different env)
self.assertEqual([tasks[2]], queue)
# Expect only mesoscope task returned when relevant env passed
queue = local_server.task_queue(lab='foolab', alyx=alyx, env=('suite2p', 'iblsorter'))
self.assertEqual([tasks[0], tasks[1]], queue)
# Expect no tasks as mesoscope task is a large job
queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx, env=('suite2p',))
self.assertEqual([], queue)
# Expect only register task as it's the only small job
queue = local_server.task_queue(mode='small', lab='foolab', alyx=alyx)
self.assertEqual([tasks[2]], queue)
[docs]
class TestPipesMisc(unittest.TestCase):
""""""
[docs]
def setUp(self):
# Data emulating local rig data
self.root_test_folder = tempfile.TemporaryDirectory()
# Create two rig sessions, one with 3A probe data and one with 3B probe data
self.session_path_3A = fu.create_fake_session_folder(self.root_test_folder.name)
# fu.create_fake_raw_behavior_data_folder(self.session_path_3A)
self.session_path_3B = fu.create_fake_session_folder(self.root_test_folder.name)
fu.create_fake_raw_behavior_data_folder(self.session_path_3B)
# Make some files
fu.populate_raw_spikeglx(self.session_path_3B / 'raw_ephys_data', '3B', n_probes=3)
ephys_folder = self.session_path_3A / 'raw_ephys_data'
fu.populate_raw_spikeglx(ephys_folder, '3A', legacy=True, n_probes=1)
# IBL protocol is for users to copy data to the right probe folder
shutil.move(ephys_folder.joinpath('raw_ephys_folder'),
ephys_folder.joinpath('my_run_probe00'))
[docs]
def test_get_new_filename(self):
different_gt = "ignoreThisPart_g1_t2.imec.ap.meta"
nidaq = 'foobar_g0_t0.nidq.cbin'
for suf in ('.ap.bin', '0.ap.bin', '1.ap.meta'):
newfname = misc.get_new_filename(f'ignoreThisPart_g0_t0.imec{suf}')
self.assertEqual(f'_spikeglx_ephysData_g0_t0.imec{suf}', newfname)
newfname = misc.get_new_filename(different_gt)
self.assertEqual('_spikeglx_ephysData_g1_t2.imec.ap.meta', newfname)
self.assertEqual('_spikeglx_ephysData_g0_t0.nidq.cbin', misc.get_new_filename(nidaq))
# Test errors
with self.assertRaises(ValueError):
misc.get_new_filename('_spikeglx_ephysData_g0_t0.wiring')
with self.assertRaises(ValueError):
misc.get_new_filename('_spikeglx_ephysData.meta.cbin')
def _test_rename_ephys_files(self, path, expected_n):
"""Test SpikeGLX output files are correctly renamed"""
misc.rename_ephys_files(path)
n = 0
for f in path.rglob("*.*.*"):
if any(x in f.name for x in ('.ap.', '.lf.', '.nidq.')):
self.assertTrue(f.name.startswith('_spikeglx_ephysData_g'))
n += 1
self.assertEqual(expected_n, n)
[docs]
def test_rename_and_move(self):
# Test for 3A
self._test_rename_ephys_files(self.session_path_3A, 4)
misc.move_ephys_files(self.session_path_3A)
probe_folders = list(self.session_path_3A.rglob("*probe*"))
self.assertTrue(len(probe_folders) == 1 and probe_folders[0].parts[-1] == 'probe00')
expected = [
'_spikeglx_ephysData_g0_t0.imec.ap.bin',
'_spikeglx_ephysData_g0_t0.imec.ap.meta',
'_spikeglx_ephysData_g0_t0.imec.lf.bin',
'_spikeglx_ephysData_g0_t0.imec.lf.meta'
]
self.assertCountEqual(expected, [x.name for x in probe_folders[0].glob('*')])
# Test for 3B
self._test_rename_ephys_files(self.session_path_3B, 14)
misc.move_ephys_files(self.session_path_3B)
probe_folders = sorted(self.session_path_3B.rglob("*probe*"))
# Check moved into 'probexx' folders
self.assertTrue(len(probe_folders) == 3)
self.assertCountEqual((f'probe0{x}' for x in range(3)),
[x.parts[-1] for x in probe_folders])
for i in range(3):
expected = [
f'_spikeglx_ephysData_g0_t0.imec{i}.ap.bin',
f'_spikeglx_ephysData_g0_t0.imec{i}.ap.meta',
f'_spikeglx_ephysData_g0_t0.imec{i}.lf.bin',
f'_spikeglx_ephysData_g0_t0.imec{i}.lf.meta'
]
self.assertCountEqual(expected, [x.name for x in probe_folders[i].glob('*')])
nidq_files = self.session_path_3B.joinpath('raw_ephys_data').glob("*.nidq.*")
expected = ['_spikeglx_ephysData_g0_t0.nidq.bin', '_spikeglx_ephysData_g0_t0.nidq.meta']
self.assertCountEqual(expected, [x.name for x in nidq_files])
[docs]
def test_create_alyx_probe_insertions(self):
# Connect to test DB
one = ONE(**TEST_DB)
# Create new session on database with a random date to avoid race conditions
_, eid = fu.register_new_session(one, subject='ZM_1150')
eid = str(eid)
# Currently the task protocol of a session must contain 'ephys' in order to create an insertion!
one.alyx.rest('sessions', 'partial_update', id=eid, data={'task_protocol': 'ephys'})
self.addCleanup(one.alyx.rest, 'sessions', 'delete', id=eid) # Delete after test
# Force probe insertion 3A
labels = [''.join(random.choices(string.ascii_letters, k=5)), ''.join(random.choices(string.ascii_letters, k=5))]
misc.create_alyx_probe_insertions(
eid, one=one, model="3A", labels=labels, force=True
)
# Verify it's been inserted
alyx_insertion = one.alyx.rest("insertions", "list", session=eid, no_cache=True)
alyx_insertion = [x for x in alyx_insertion if x["model"] == "3A"]
self.assertTrue(alyx_insertion[0]["model"] == "3A")
self.assertTrue(alyx_insertion[0]["name"] in labels)
self.assertTrue(alyx_insertion[1]["model"] == "3A")
self.assertTrue(alyx_insertion[1]["name"] in labels)
# Cleanup DB
one.alyx.rest("insertions", "delete", id=alyx_insertion[0]["id"])
one.alyx.rest("insertions", "delete", id=alyx_insertion[1]["id"])
# Force probe insertion 3B
labels = [''.join(random.choices(string.ascii_letters, k=5)), ''.join(random.choices(string.ascii_letters, k=5))]
misc.create_alyx_probe_insertions(eid, one=one, model="3B2", labels=labels)
# Verify it's been inserted
alyx_insertion = one.alyx.rest("insertions", "list", session=eid, no_cache=True)
self.assertTrue(alyx_insertion[0]["model"] == "3B2")
self.assertTrue(alyx_insertion[0]["name"] in labels)
self.assertTrue(alyx_insertion[1]["model"] == "3B2")
self.assertTrue(alyx_insertion[1]["name"] in labels)
# Cleanup DB
one.alyx.rest("insertions", "delete", id=alyx_insertion[0]["id"])
one.alyx.rest("insertions", "delete", id=alyx_insertion[1]["id"])
[docs]
def test_probe_names_from_session_path(self):
expected_pnames = ['probe00', 'probe01', 'probe03', 'probe02a', 'probe02b', 'probe02c', 'probe02d', 'probe04']
nidq_file = Path(__file__).parent.joinpath("fixtures/pipes", "sample3B_g0_t0.nidq.meta")
meta_files = {
"probe00": Path(__file__).parent.joinpath("fixtures/pipes", "sample3A_g0_t0.imec.ap.meta"),
"probe01": Path(__file__).parent.joinpath("fixtures/pipes", "sample3B_g0_t0.imec1.ap.meta"),
"probe04": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.1_g0_t0.imec.ap.meta"),
"probe03": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.4_1shank_g0_t0.imec.ap.meta"),
"probe02": Path(__file__).parent.joinpath("fixtures/pipes", "sampleNP2.4_4shanks_g0_t0.imec.ap.meta"),
}
with tempfile.TemporaryDirectory() as tdir:
session_path = Path(tdir).joinpath('Algernon', '2021-02-12', '001')
raw_ephys_path = session_path.joinpath('raw_ephys_data')
raw_ephys_path.mkdir(parents=True, exist_ok=True)
shutil.copy(nidq_file, raw_ephys_path.joinpath("_spikeglx_ephysData_g0_t0.nidq.meta"))
for pname, meta_file in meta_files.items():
probe_path = raw_ephys_path.joinpath(pname)
probe_path.mkdir()
shutil.copy(meta_file, probe_path.joinpath('_spikeglx_ephysData_g0_t0.imec0.ap.meta'))
probe_path.joinpath('nested_folder').mkdir()
probe_path.joinpath('nested_folder', 'toto.ap.meta').touch()
self.assertEqual(set(misc.probe_labels_from_session_path(session_path)), set(expected_pnames))
[docs]
def tearDown(self):
self.root_test_folder.cleanup()
[docs]
class TestScanFixPassiveFiles(unittest.TestCase):
""""""
[docs]
def setUp(self):
self.tmp_dir = tempfile.TemporaryDirectory()
# Session 001 and 002 are 2 sessions with ephys and passive badly transferred
(
self.session_path,
self.passive_session_path,
) = fu.create_fake_ephys_recording_bad_passive_transfer_sessions(
self.tmp_dir.name, lab="fakelab", mouse="fakemouse", date="1900-01-01", num="001"
)
# Create another complete ephys session same mouse and date
self.other_good_session = fu.create_fake_complete_ephys_session(
self.tmp_dir.name, lab="fakelab", mouse="fakemouse", date="1900-01-01", increment=True
)
[docs]
def test_scan_fix(self):
from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=True)
self.assertTrue(len(from_to_pairs) == 1)
self.assertTrue(sum(moved_ok) == 0)
from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=False)
self.assertTrue(len(from_to_pairs) == 1)
self.assertTrue(sum(moved_ok) == 1)
# Second run Nothing to do
from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=True)
self.assertTrue(len(from_to_pairs) == 0)
self.assertTrue(sum(moved_ok) == 0)
from_to_pairs, moved_ok = fix.execute(self.tmp_dir.name, dry=False)
self.assertTrue(len(from_to_pairs) == 0)
self.assertTrue(sum(moved_ok) == 0)
[docs]
def test_find_pairs(self):
from_to_pairs = fix.find_pairs(self.tmp_dir.name)
from_path_parts = ['fakelab', 'Subjects', 'fakemouse', '1900-01-01', '002']
self.assertTrue(all(x in Path(from_to_pairs[0][0]).parts for x in from_path_parts))
to_path_parts = ['fakelab', 'Subjects', 'fakemouse', '1900-01-01', '001']
self.assertTrue(all(x in Path(from_to_pairs[0][1]).parts for x in to_path_parts))
[docs]
def test_move_rename_pairs(self):
# Test all outputs of find function
from_to_pairs = []
moved_ok = fix.move_rename_pairs(from_to_pairs)
self.assertTrue(not moved_ok)
# Bad paths
from_to_pairs = [("bla", "ble")]
moved_ok = fix.move_rename_pairs(from_to_pairs)
self.assertTrue(sum(moved_ok) == 0)
# Same as execute
from_to_pairs = fix.find_pairs(self.tmp_dir.name)
moved_ok = fix.move_rename_pairs(from_to_pairs)
self.assertTrue(sum(moved_ok) == 1)
[docs]
def tearDown(self):
self.tmp_dir.cleanup()
[docs]
class TestRegisterRawDataTask(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.one = ONE(**TEST_DB)
self.tempdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tempdir.cleanup)
self.session_path = Path(self.tempdir.name).joinpath('subject', '2023-01-01', '001')
self.session_path.mkdir(parents=True)
[docs]
def test_rename_files(self):
"""Test upload of snapshots.
Another test for this exists in ibllib.tests.test_base_tasks.TestRegisterRawDataTask.
This test does not work on real files and works without a test db.
"""
# Add base dir snapshot
(folder := self.session_path.joinpath('snapshots')).mkdir()
folder.joinpath('snap.PNG').touch()
collection = 'raw_task_data'
for i, ext in enumerate(['tif', 'jpg', 'gif']):
(p := self.session_path.joinpath(f'{collection}_{i:02}', 'snapshots')).mkdir(parents=True)
p.joinpath(f'snapshot.{ext}').touch()
# Stuff with text note
p = self.session_path.joinpath(f'{collection}_00', 'snapshots', 'pic.jpeg')
with open(p, 'wb') as fp:
fp.write('foo'.encode())
with open(p.with_name('pic.txt'), 'w') as fp:
fp.write('bar')
task = RegisterRawDataTask(self.session_path, one=self.one)
# Mock the _is_animated_gif function to return true for any GIF file
as_png_side_effect = lambda x: x.with_suffix('.png').touch() or x.with_suffix('.png') # noqa
with mock.patch.object(self.one.alyx, 'rest') as rest, \
mock.patch.object(self.one, 'path2eid', return_value=str(uuid4())), \
mock.patch.object(task, '_save_as_png', side_effect=as_png_side_effect), \
mock.patch.object(task, '_is_animated_gif', side_effect=lambda x: x.suffix == '.gif'):
task.register_snapshots(collection=['', f'{collection}*'])
self.assertEqual(5, rest.call_count)
files = []
for args, kwargs in rest.call_args_list:
self.assertEqual(('notes', 'create'), args)
files.append(Path(kwargs['files']['image'].name).name)
width = kwargs['data'].get('width')
# Test that original size passed as width only for gif file
self.assertEqual('orig', width) if files[-1].endswith('gif') else self.assertIsNone(width)
expected = ('snap.PNG', 'pic.jpeg', 'snapshot.png', 'snapshot.jpg', 'snapshot.gif')
self.assertCountEqual(expected, files)
[docs]
class TestSleeplessDecorator(unittest.TestCase):
[docs]
def test_decorator_argument_passing(self):
def dummy_function(arg1, arg2):
return arg1, arg2
# Applying the decorator to the dummy function
decorated_func = sleepless(dummy_function)
# Check if the function name is maintained
self.assertEqual(decorated_func.__name__, 'dummy_function')
# Check if arguments are passed correctly
result = decorated_func("test1", "test2")
self.assertEqual(result, ("test1", "test2"))
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)