Source code for ibllib.tests.test_io

import unittest
from unittest.mock import patch
import os
import uuid
import tempfile
from pathlib import Path
import logging
import json

import numpy as np
from one.api import ONE
from iblutil.io import params
import yaml

from ibllib.tests import TEST_DB
from ibllib.io import flags, misc, video, session_params
import ibllib.io.raw_data_loaders as raw
import ibllib.io.raw_daq_loaders as raw_daq


[docs] class TestsParams(unittest.TestCase):
[docs] def setUp(self): self.par_dict = {'A': 'tata', 'O': 'toto', 'I': 'titi', 'num': 15, 'liste': [1, 'turlu'], 'apath': Path('/gna/gna/gna')} self.parstr = 'toto' params.write(self.parstr, self.par_dict) params.write(self.parstr, params.from_dict(self.par_dict))
[docs] def test_params(self): # first go to and from dictionary par_dict = self.par_dict par = params.from_dict(par_dict) self.assertEqual(params.as_dict(par), par_dict) # next go to and from dictionary via json par2 = params.read(self.parstr) self.assertEqual(par, par2)
[docs] def test_param_get_file(self): home_dir = Path(params.getfile(self.parstr)).parent # straight case the file is .{str} in the home directory assert home_dir.joinpath(".toto") == Path(params.getfile(self.parstr)) # straight case the file is .{str} in the home directory assert home_dir.joinpath(".toto") == Path(params.getfile(".toto")) # subfolder case assert home_dir.joinpath(".toto", ".titi") == Path(params.getfile("toto/titi"))
[docs] def test_new_default_param(self): # in this case an updated version of the codes brings in a new parameter default = {'A': 'tata2', 'O': 'toto2', 'I': 'titi2', 'E': 'tete2', 'num': 15, 'liste': [1, 'turlu']} expected_result = {'A': 'tata', 'O': 'toto', 'I': 'titi', 'num': 15, 'liste': [1, 'turlu'], 'apath': str(Path('/gna/gna/gna')), 'E': 'tete2', } par2 = params.read(self.parstr, default=default) self.assertCountEqual(par2.as_dict(), expected_result) # on the next path the parameter has been added to the param file par2 = params.read(self.parstr, default=default) self.assertCountEqual(par2.as_dict(), expected_result) # check that it doesn't break if a named tuple is given instead of a dict par3 = params.read(self.parstr, default=par2) self.assertEqual(par2, par3) # check that a non-existing par file raises error Path(params.getfile(self.parstr)).unlink() with self.assertRaises(FileNotFoundError): params.read(self.parstr) # check that a non-existing par file with default returns default par = params.read(self.parstr, default=default) self.assertCountEqual(par, params.from_dict(default)) # even if this default is a Params named tuple par = params.read(self.parstr, default=par) self.assertEqual(par, params.from_dict(default)) # check default empty dict Path(params.getfile(self.parstr)).unlink() filename = Path(params.getfile(self.parstr)) self.assertFalse(filename.exists()) par = params.read(self.parstr, default={}) self.assertIsNone(par) self.assertTrue(filename.exists())
[docs] def tearDown(self): # at last delete the param file Path(params.getfile('toto')).unlink(missing_ok=True)
[docs] class TestsRawDataLoaders(unittest.TestCase):
[docs] def setUp(self): self.tempfile = tempfile.NamedTemporaryFile(delete=False) self.bin_session_path = Path(__file__).parent.joinpath( 'fixtures', 'io', 'data_loaders', "_iblrig_test_mouse_2020-01-01_001")
[docs] def testFlagFileRead(self): # empty file should return True self.assertEqual(flags.read_flag_file(self.tempfile.name), True) # test with 2 lines and a trailing with open(self.tempfile.name, 'w+') as fid: fid.write('file1\nfile2\n') self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(['file1', 'file2'])) # test with 2 lines and a trailing, Windows convention with open(self.tempfile.name, 'w+') as fid: fid.write('file1\r\nfile2\r\n') self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(['file1', 'file2']))
[docs] def testAppendFlagFile(self): # DO NOT CHANGE THE ORDER OF TESTS BELOW # prepare a file with 3 dataset types file_list = ['_ibl_extraRewards.times', '_ibl_lickPiezo.raw', '_ibl_lickPiezo.timestamps'] with open(self.tempfile.name, 'w+') as fid: fid.write('\n'.join(file_list)) self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(file_list)) # with an existing file containing files, writing more files append to it file_list_2 = ['turltu'] # also makes sure that if a string is provided it works flags.write_flag_file(self.tempfile.name, file_list_2[0]) self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(file_list + file_list_2)) # writing again keeps unique file values flags.write_flag_file(self.tempfile.name, file_list_2[0]) n = sum([1 for f in flags.read_flag_file(self.tempfile.name) if f == file_list_2[0]]) self.assertEqual(n, 1) # with an existing file containing files, writing empty filelist returns True for all files flags.write_flag_file(self.tempfile.name, None) self.assertEqual(flags.read_flag_file(self.tempfile.name), True) # with an existing empty file, writing filelist returns True for all files flags.write_flag_file(self.tempfile.name, ['file1', 'file2']) self.assertEqual(flags.read_flag_file(self.tempfile.name), True) # makes sure that read after write empty list also returns True flags.write_flag_file(self.tempfile.name, []) self.assertEqual(flags.read_flag_file(self.tempfile.name), True) # with an existing empty file, writing filelist returns the list if clobber flags.write_flag_file(self.tempfile.name, ['file1', 'file2', 'file3'], clobber=True) self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(['file1', 'file2', 'file3'])) # test the removal of a file within the list flags.excise_flag_file(self.tempfile.name, removed_files='file1') self.assertEqual(sorted(flags.read_flag_file(self.tempfile.name)), ['file2', 'file3']) # if file-list is True it means all files and file_list should be empty after read flags.write_flag_file(self.tempfile.name, file_list=True) self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
[docs] def test_load_encoder_trial_info(self): self.session = Path(__file__).parent.joinpath('extractors', 'data', 'session_biased_ge5') data = raw.load_encoder_trial_info(self.session) self.assertTrue(data is not None) self.assertIsNone(raw.load_encoder_trial_info(self.session.with_name('empty'))) self.assertIsNone(raw.load_encoder_trial_info(None))
[docs] def test_load_camera_ssv_times(self): session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys') with self.assertRaises(ValueError): raw.load_camera_ssv_times(session, 'tail') with self.assertRaises(FileNotFoundError): raw.load_camera_ssv_times(session.with_name('foobar'), 'body') bonsai, camera = raw.load_camera_ssv_times(session, 'body') self.assertTrue(bonsai.size == camera.size == 6001) self.assertEqual(bonsai.dtype.str, '<M8[ns]') self.assertEqual(str(bonsai[0]), '2020-08-19T16:42:57.790361600') expected = np.array([69.466875, 69.5, 69.533, 69.566125, 69.59925]) np.testing.assert_array_equal(expected, camera[:5]) # Many sessions have the columns in the wrong order. Here we write 5 lines from the # fixture file to another file in a temporary folder, with the columns swapped. from_file = session.joinpath('raw_video_data', '_iblrig_bodyCamera.timestamps.ssv') with tempfile.TemporaryDirectory() as tempdir: # New file with columns swapped (also checks loading files with UUID in name) filename = f'_iblrig_leftCamera.timestamps.{uuid.uuid4()}.ssv' to_file = Path(tempdir).joinpath('raw_video_data', filename) to_file.parent.mkdir(exist_ok=True) with open(from_file, 'r') as a, open(to_file, 'w') as b: for i in range(5): # Read line from fixture file and write into file in swapped order b.write('{1} {0} {2}'.format(*a.readline().split(' '))) assert to_file.exists(), 'failed to write test file' bonsai, camera = raw.load_camera_ssv_times(to_file.parents[1], 'left') # Verify that values returned in the same order as before self.assertEqual(bonsai.dtype.str, '<M8[ns]') self.assertEqual(camera.dtype.str, '<f8') self.assertAlmostEqual(69.466875, camera[0])
[docs] def test_load_camera_gpio(self): """ Embedded frame data comes from 057e25ef-3f80-42e8-aa9f-e259df8bc9ad, left camera :return: """ session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys') session2 = Path(__file__).parent.joinpath( 'fixtures', 'io', 'data_loaders', '_iblrig_test_mouse_2020-01-01_001' ) gpio = raw.load_camera_gpio(session, 'body', as_dicts=True) gpio2 = raw.load_camera_gpio(session2, 'left', as_dicts=True) self.assertEqual(len(gpio), 4) # One dict per pin self.assertEqual(len(gpio2), 4) # One dict per pin *gpio_, gpio_4 = gpio # Check last dict; pin 4 should have one pulse self.assertTrue(all(k in ('indices', 'polarities') for k in gpio_4.keys())) np.testing.assert_array_equal(gpio_4['indices'], np.array([166, 172], dtype=np.int64)) np.testing.assert_array_equal(gpio_4['polarities'], np.array([1, -1])) # Test raw flag gpio = raw.load_camera_gpio(session, 'body', as_dicts=False) self.assertEqual(gpio.dtype, bool) self.assertEqual(gpio.shape, (510, 4)) # Test empty / None self.assertIsNone(raw.load_camera_gpio(None, 'body')) self.assertIsNone(raw.load_camera_gpio(session, 'right')) [self.assertIsNone(x) for x in raw.load_camera_gpio(session, 'right', as_dicts=True)] # Test noisy GPIO data side = 'right' with tempfile.TemporaryDirectory() as tdir: session_path = Path(tdir).joinpath('mouse', '2020-06-01', '001') session_path.joinpath('raw_video_data').mkdir(parents=True) # Test loads file with UUID did = uuid.uuid4() # Random uuid filename = session_path / 'raw_video_data' / f'_iblrig_{side}Camera.GPIO.{did}.bin' np.full(1000, 1.87904819e+09, dtype=np.float64).tofile(filename) with self.assertLogs('ibllib', level='WARNING'): raw.load_camera_gpio(session_path, side, as_dicts=True) # Test dead pin array np.zeros(3000, dtype=np.float64).tofile(filename) with self.assertLogs('ibllib', level='ERROR'): gpio = raw.load_camera_gpio(session_path, side, as_dicts=True) [self.assertIsNone(x) for x in gpio]
[docs] def test_load_camera_frame_count(self): """ Embedded frame data comes from 057e25ef-3f80-42e8-aa9f-e259df8bc9ad, left camera :return: """ session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys') count = raw.load_camera_frame_count(session, 'body', raw=False) np.testing.assert_array_equal(count, np.arange(510, dtype=np.int32)) self.assertEqual(count.dtype, int) # Test raw flag count = raw.load_camera_frame_count(session, 'body', raw=True) self.assertEqual(count[0], int(16696704)) # Test empty / None self.assertIsNone(raw.load_camera_frame_count(None, 'body')) self.assertIsNone(raw.load_camera_frame_count(session, 'right'))
[docs] def test_load_embedded_frame_data(self): session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys') count, gpio = raw.load_embedded_frame_data(session, 'body') self.assertEqual(count[0], 0) self.assertIsInstance(gpio[-1], dict) count, gpio = raw.load_embedded_frame_data(session, 'body', raw=True) self.assertNotEqual(count[0], 0) self.assertIsInstance(gpio, np.ndarray)
[docs] def test_load_camera_frameData(self): import pandas as pd fd_raw = raw.load_camera_frameData(self.bin_session_path, raw=True) fd = raw.load_camera_frameData(self.bin_session_path) # Wrong camera input file not found with self.assertRaises(AssertionError): raw.load_camera_frameData(self.bin_session_path, camera='right') # Shape self.assertTrue(fd.shape[1] == 4) self.assertTrue(fd_raw.shape[1] == 4) # Type self.assertTrue(isinstance(fd, pd.DataFrame)) self.assertTrue(isinstance(fd_raw, pd.DataFrame)) # Column names df_cols = ["Timestamp", "embeddedTimeStamp", "embeddedFrameCounter", "embeddedGPIOPinState"] self.assertTrue(np.all([x in fd.columns for x in df_cols])) self.assertTrue(np.all([x in fd_raw.columns for x in df_cols])) # Column types parsed_dtypes = { "Timestamp": np.float64, "embeddedTimeStamp": np.float64, "embeddedFrameCounter": np.int64, "embeddedGPIOPinState": object } self.assertTrue(fd.dtypes.to_dict() == parsed_dtypes) self.assertTrue(all([x == np.int64 for x in fd_raw.dtypes]))
[docs] def test_load_settings(self): main_path = Path(__file__).parent.joinpath('extractors', 'data') self.training_ge5 = main_path / 'session_training_ge5' settings = raw.load_settings(self.training_ge5) self.assertIsInstance(settings, dict) self.assertEqual(144, len(settings)) with self.assertLogs(raw._logger, level=20): self.assertIsNone(raw.load_settings(None)) # Should return None when path empty with self.assertLogs(raw._logger, level=20): self.assertIsNone(raw.load_settings(self.training_ge5, 'raw_task_data_00'))
[docs] def tearDown(self): self.tempfile.close() os.unlink(self.tempfile.name)
[docs] class TestsMisc(unittest.TestCase):
[docs] def setUp(self): self._tdir = tempfile.TemporaryDirectory() # self.addClassCleanup(tmpdir.cleanup) # py3.8 self.tempdir = Path(self._tdir.name) self.subdirs = [ self.tempdir / 'test_empty_parent', self.tempdir / 'test_empty_parent' / 'test_empty', self.tempdir / 'test_empty', self.tempdir / 'test_full', ] self.file = self.tempdir / 'test_full' / 'file.txt' _ = [x.mkdir() for x in self.subdirs] self.file.touch()
[docs] def tearDown(self) -> None: self._tdir.cleanup()
def _resetup_folders(self): self.file.unlink() (self.tempdir / 'test_full').rmdir() _ = [x.rmdir() for x in self.subdirs if x.exists()] _ = [x.mkdir() for x in self.subdirs] self.file.touch()
[docs] def test_delete_empty_folders(self): pre = [x.exists() for x in self.subdirs] pre_expected = [True, True, True, True] self.assertTrue(all([x == y for x, y in zip(pre, pre_expected)])) # Test dry run pos_expected = None pos = misc.delete_empty_folders(self.tempdir) self.assertTrue(pos == pos_expected) # Test dry=False, non recursive pos_expected = [True, False, False, True] misc.delete_empty_folders(self.tempdir, dry=False) pos = [x.exists() for x in self.subdirs] self.assertTrue(all([x == y for x, y in zip(pos, pos_expected)])) self._resetup_folders() # Test recursive pos_expected = [False, False, False, True] misc.delete_empty_folders(self.tempdir, dry=False, recursive=True) pos = [x.exists() for x in self.subdirs] self.assertTrue(all([x == y for x, y in zip(pos, pos_expected)]))
[docs] class TestVideo(unittest.TestCase):
[docs] @classmethod def setUpClass(cls) -> None: cls.one = ONE(**TEST_DB) if 'public' in cls.one.alyx._par.HTTP_DATA_SERVER: cls.one.alyx._par = cls.one.alyx._par.set( 'HTTP_DATA_SERVER', cls.one.alyx._par.HTTP_DATA_SERVER.rsplit('/', 1)[0])
[docs] def setUp(self) -> None: self.eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7' self.url = ('https://ibl.flatironinstitute.org/mainenlab/' 'Subjects/ZM_1743/2019-06-14/001/raw_video_data/' '_iblrig_leftCamera.raw.71cfeef2-2aa5-46b5-b88f-ca07e3d92474.mp4')
[docs] def test_label_from_path(self): # Test file path session_path = self.one.eid2path(self.eid) video_path = session_path / 'raw_video_data' / '_iblrig_bodyCamera.raw.mp4' label = video.label_from_path(video_path) self.assertEqual('body', label) # Test URL label = video.label_from_path(self.url) self.assertEqual('left', label) # Test file name label = video.label_from_path('_iblrig_rightCamera.raw.mp4') self.assertEqual('right', label) # Test wrong file label = video.label_from_path('_iblrig_taskSettings.raw.json') self.assertIsNone(label)
[docs] def test_url_from_eid(self): assert self.one.mode != 'remote' actual = video.url_from_eid(self.eid, 'left', self.one) self.assertEqual(self.url, actual) actual = video.url_from_eid(self.eid, one=self.one) expected = {'left': self.url} self.assertEqual(expected, actual) actual = video.url_from_eid(self.eid, label=('left', 'right'), one=self.one) expected = {'left': self.url, 'right': None} self.assertEqual(expected, actual) # Test remote mode old_mode = self.one.mode self.one.mode = 'remote' actual = video.url_from_eid(self.eid, label='left', one=self.one) self.assertEqual(self.url, actual) self.one.mode = old_mode # Test arg checks with self.assertRaises(ValueError): video.url_from_eid(self.eid, 'back')
[docs] def test_assert_valid_label(self): with self.assertRaises(ValueError): video.assert_valid_label('tail') label = video.assert_valid_label('LEFT') self.assertEqual(label, 'left') # Verify works with lists labels = video.assert_valid_label(['Right', 'body']) self.assertEqual(labels, ('right', 'body')) with self.assertRaises(TypeError): video.assert_valid_label(None)
[docs] class TestSessionParams(unittest.TestCase): """Tests for ibllib.io.session_params module."""
[docs] def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.addCleanup(self.tmpdir.cleanup) # load yaml fixture self.fixture_path = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml') with open(self.fixture_path, 'r') as fp: self.fixture = yaml.safe_load(fp) # save as individual files self.devices_path = Path(self.tmpdir.name).joinpath('_devices') # a sync that's different to widefield and ephys sync = {**self.fixture['sync']['nidq'].copy(), 'collection': 'raw_sync_data'} computers_descriptions = { 'widefield': dict(devices={'widefield': self.fixture['devices']['widefield']}), 'video': '', 'ephys': dict(devices={'neuropixel': self.fixture['devices']['neuropixel']}), 'behaviour': dict(devices={'microphone': self.fixture['devices']['microphone']}), 'sync': dict(sync={'nidq': sync}) } # the behaviour computer contains the task, project and procedure keys for k in filter(lambda x: x != 'devices', self.fixture): computers_descriptions['behaviour'][k] = self.fixture[k] # the ephys computer contains another identical sync key computers_descriptions['ephys']['sync'] = self.fixture['sync'] for label, data in computers_descriptions.items(): file_device = self.devices_path.joinpath(f'{label}.yaml') session_params.write_yaml(file_device, data)
[docs] @patch('iblutil.io.params.time.sleep') def test_aggregate(self, sleep_mock): """A test for both aggregate_device and merge_params.""" fullfile = self.devices_path.parent.joinpath('_ibl_experiment.description.yaml') file_lock = fullfile.with_suffix('.lock') # Test deals with file lock file_lock.touch() device = 'widefield' file_device = self.devices_path.joinpath(f'{device}.yaml') session_params.aggregate_device(file_device, fullfile) self.assertFalse(file_lock.exists(), 'failed to delete lock file') self.assertTrue(fullfile.exists(), 'failed to create aggregate file') sleep_mock.assert_called() with open(fullfile, 'r') as fp: data = yaml.safe_load(fp) self.assertCountEqual(('devices', 'version'), data.keys()) self.assertCountEqual((device,), data['devices'].keys()) self.assertEqual(data['devices'][device], self.fixture['devices'][device]) # A device with extra keys device = 'behaviour' file_device = self.devices_path.joinpath(f'{device}.yaml') session_params.aggregate_device(file_device, fullfile, unlink=True) self.assertFalse(file_lock.exists(), 'failed to delete lock file') self.assertFalse(file_device.exists(), 'failed to delete device file') with open(fullfile, 'r') as fp: data = yaml.safe_load(fp) expected_keys = ('devices', 'procedures', 'projects', 'sync', 'tasks', 'version') self.assertCountEqual(data.keys(), expected_keys) self.assertTrue(len(data['devices'].keys()) > 1) # A device with another identical sync key file_device = self.devices_path.joinpath('ephys.yaml') session_params.aggregate_device(file_device, fullfile, unlink=True) # A device with a different sync file_device = self.devices_path.joinpath('sync.yaml') with self.assertRaises(AssertionError): session_params.aggregate_device(file_device, fullfile, unlink=True) # An empty device file_device = self.devices_path.joinpath('video.yaml') with self.assertLogs(session_params.__name__, logging.WARNING): session_params.aggregate_device(file_device, fullfile, unlink=True)
[docs] @patch(session_params.__name__ + '.SPEC_VERSION', '999') def test_read_yaml(self): label = 'widefield' data = session_params.read_params(self.devices_path.joinpath(f'{label}.yaml')) self.assertEqual('999', data.pop('version'), 'failed to patch file') self.assertEqual(data['devices'][label], self.fixture['devices'][label]) # Check loads from directory path data_keys = session_params.read_params(self.fixture_path.parent).keys() self.assertCountEqual(self.fixture.keys(), data_keys)
[docs] def test_patch_data(self): """Test for session_params._patch_file function.""" with patch(session_params.__name__ + '.SPEC_VERSION', '1.0.0'), \ self.assertLogs(session_params.__name__, logging.WARNING): data = session_params._patch_file({'version': '1.1.0'}) self.assertEqual(data, {'version': '1.0.0'}) # Check tasks dicts separated into lists unpatched = {'version': '0.0.1', 'tasks': { 'fooChoiceWorld': {1: '1'}, 'barChoiceWorld': {2: '2'}}} data = session_params._patch_file(unpatched) self.assertIsInstance(data['tasks'], list) self.assertEqual([['fooChoiceWorld'], ['barChoiceWorld']], list(map(list, data['tasks']))) # Check patching list of dicts with some containing more than 1 key unpatched = {'tasks': [{'foo': {1: '1'}}, {'bar': {2: '2'}, 'baz': {3: '3'}}]} data = session_params._patch_file(unpatched) self.assertEqual(3, len(data['tasks'])) self.assertEqual([['foo'], ['bar'], ['baz']], list(map(list, data['tasks'])))
[docs] def test_get_collections(self): collections = session_params.get_collections(self.fixture) expected = { 'widefield': 'raw_widefield_data', 'microphone': 'raw_behavior_data', 'probe00': 'raw_ephys_data/probe00', 'probe01': 'raw_ephys_data/probe01', 'nidq': 'raw_ephys_data', 'passiveChoiceWorld': 'raw_passive_data', 'ephysChoiceWorld': 'raw_behavior_data' } self.assertCountEqual(expected, collections)
[docs] def test_get_collections_repeat_protocols(self): tasks = dict(tasks=[ {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}, {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, {'passiveChoiceWorld': {'collection': 'raw_passive_data_bis'}}]) collections = session_params.get_collections(tasks) self.assertEqual(set(collections['passiveChoiceWorld']), set(['raw_passive_data_bis', 'raw_passive_data'])) collections = session_params.get_collections(tasks, flat=True) self.assertEqual(collections, {'raw_passive_data_bis', 'raw_passive_data', 'raw_behavior_data'})
[docs] def test_merge_params(self): """Test for ibllib.io.session_params.merge_params functions.""" a = self.fixture b = {'procedures': ['Imaging', 'Injection'], 'tasks': [{'fooChoiceWorld': {'collection': 'bar'}}]} c = session_params.merge_params(a, b, copy=True) self.assertCountEqual(['Imaging', 'Behavior training/tasks', 'Injection'], c['procedures']) self.assertEqual(['passiveChoiceWorld', 'ephysChoiceWorld', 'fooChoiceWorld'], [list(x)[0] for x in c['tasks']]) # Ensure a and b not modified self.assertNotEqual(set(c['procedures']), set(a['procedures'])) self.assertNotEqual(set(a['procedures']), set(b['procedures'])) # Test duplicate tasks skipped while order kept constant d = {'tasks': [a['tasks'][1], {'ephysChoiceWorld': {'collection': 'raw_task_data_02', 'sync_label': 'nidq'}}]} e = session_params.merge_params(c, d, copy=True) expected = ['passiveChoiceWorld', 'ephysChoiceWorld', 'fooChoiceWorld', 'ephysChoiceWorld'] self.assertEqual(expected, [list(x)[0] for x in e['tasks']]) self.assertDictEqual({'collection': 'raw_task_data_02', 'sync_label': 'nidq'}, e['tasks'][-1]['ephysChoiceWorld']) # Test without copy session_params.merge_params(a, b, copy=False) self.assertCountEqual(['Imaging', 'Behavior training/tasks', 'Injection'], a['procedures']) # Test assertion on duplicate sync b['sync'] = {'foodaq': {'collection': 'raw_sync_data'}} self.assertRaises(AssertionError, session_params.merge_params, a, b) # Test how it handles the extractors key, which is an unhashable list f = {'tasks': [{'fooChoiceWorld': {'collection': 'bar', 'sync_label': 'bpod', 'extractors': ['a', 'b']}}]} g = session_params.merge_params(a, f, copy=True) self.assertCountEqual(['devices', 'procedures', 'projects', 'sync', 'tasks', 'version'], g.keys()) self.assertEqual(4, len(g['tasks'])) self.assertDictEqual(f['tasks'][0], g['tasks'][-1])
[docs] def test_get_protocol_number(self): """Test ibllib.io.session_params.get_task_protocol_number function.""" self.assertIsNone(session_params.get_task_protocol_number(self.fixture)) self.assertIsNone(session_params.get_task_protocol_number(self.fixture, 'passiveChoiceWorld')) self.assertIsNone(session_params.get_task_protocol_number(self.fixture, 'fooChoiceWorld')) for i, task in enumerate(self.fixture['tasks']): next(iter(task.values()))['protocol_number'] = str(i) self.assertEqual(0, session_params.get_task_protocol_number(self.fixture, 'passiveChoiceWorld')) self.assertEqual([0, 1], session_params.get_task_protocol_number(self.fixture))
[docs] class TestRawDaqLoaders(unittest.TestCase): """Tests for raw_daq_loaders module"""
[docs] def setUp(self): self.tmpdir = tempfile.TemporaryDirectory() self.addCleanup(self.tmpdir.cleanup) # Create some toy DAQ data N = 3000 Fs = 1 a0_clean = np.zeros(N) self.n_ttl = 6 pulse_width = int(np.floor(50 * Fs)) for i in np.arange(1, N, int(np.floor(N / self.n_ttl))): a0_clean[i:i + pulse_width] = 1 a0 = (a0_clean * np.full(N, 5)) + np.random.rand(N) + 1 # 0 -> 5V w/ noise and 1V DC offset ctr0 = np.cumsum(a0_clean) # Counter channel, e.g. [0, 0, 0, 1, 1, 2, 3, 3, 3, 3, [...] n] ctr1 = np.cumsum(a0_clean * np.random.choice([1, -1], N)) # Position channel e.g. [0, 1, 2, 1, ...] self.timeline = {'timestamps': np.arange(0, N, Fs), 'raw': np.vstack([a0, ctr0, ctr1]).T} self.meta = {'daqSampleRate': Fs, 'inputs': [ {'name': 'bpod', 'arrayColumn': 1, 'measurement': 'Voltage', 'daqChannelID': 'ai0'}, {'name': 'neuralFrames', 'arrayColumn': 2, 'measurement': 'EdgeCount', 'daqChannelID': 'ctr0'}, {'name': 'rotaryEncoder', 'arrayColumn': 3, 'measurement': 'Position', 'daqChannelID': 'ctr1'} ]} # FIXME Because of non-standard ALF naming we cannot use save_object_npy for this purpose # alfio.save_object_npy(self.tmpdir.name, self.timeline, 'DAQ data', namespace='timeline') for k, v in self.timeline.items(): np.save(self.tmpdir.name + f'/_timeline_DAQdata.{k}.npy', v) with open(self.tmpdir.name + '/_timeline_DAQdata.meta.json', 'w') as fp: json.dump(self.meta, fp)
[docs] def test_extract_sync_timeline(self): """Test for extract_sync_timeline function.""" chmap = {'bpod': 0, 'neuralFrames': 1, 'rotaryEncoder': 3} sync = raw_daq.extract_sync_timeline(self.tmpdir.name, chmap) self.assertCountEqual(('times', 'channels', 'polarities'), sync.keys()) # Should be sorted by times self.assertTrue(np.all(np.diff(sync['times']) >= 0)) # Number of detected fronts should be correct self.assertEqual(len(sync['times'][sync['channels'] == 0]), self.n_ttl * 2) # Check polarities fronts = sync['polarities'][sync['channels'] == 0] self.assertEqual(1, fronts[0]) # Check polarities alternate between 1 and -1 self.assertTrue( np.all(np.unique(np.cumsum(fronts)) == [0, 1]) and np.all(np.unique(fronts) == [-1, 1]) ) # Check edge count channel sync fronts = sync['polarities'][sync['channels'] == 1] # Check a few timestamps times = sync['times'][sync['channels'] == 1] np.testing.assert_array_almost_equal(times[:5], np.arange(5) + 1.) # Because of the way we made the data, the number of fronts should == pulse_width * n_ttl # Minus one from unique values because one of those values will be zero self.assertEqual(len(np.unique(self.timeline['raw'][:, 1])) - 1, len(fronts)) self.assertTrue(np.all(fronts == 1)) # Check position channel sync fronts = sync['polarities'][sync['channels'] == 3] self.assertEqual(len(np.unique(self.timeline['raw'][:, 1])) - 1, len(fronts)) self.assertTrue(np.all(np.unique(fronts) == [-1, 1])) # Check for missing channel warnings chmap['unknown'] = 2 # Add channel that's not in meta file with self.assertLogs(logging.getLogger('ibllib.io.raw_daq_loaders'), logging.WARNING) as log: raw_daq.extract_sync_timeline(self.tmpdir.name, chmap) record, = log.records self.assertIn('unknown', record.message) # Check measurement type validation self.meta['inputs'][0]['measurement'] = 'FooBar' with open(self.tmpdir.name + '/_timeline_DAQdata.meta.json', 'w') as fp: json.dump(self.meta, fp) self.assertRaises(NotImplementedError, raw_daq.extract_sync_timeline, self.tmpdir.name)
[docs] def test_timeline_meta2wiring(self): """Test for timeline_meta2wiring function.""" wiring = raw_daq.timeline_meta2wiring(self.tmpdir.name, save=False) expected = { 'SYSTEM': 'timeline', 'SYNC_WIRING_ANALOG': {'ai0': 'bpod'}, 'SYNC_WIRING_DIGITAL': {'ctr0': 'neuralFrames', 'ctr1': 'rotaryEncoder'} } self.assertDictEqual(expected, wiring) wiring, outpath = raw_daq.timeline_meta2wiring(self.tmpdir.name, save=True) expected_path = Path(self.tmpdir.name, '_timeline_DAQData.wiring.json') self.assertEqual(expected_path, outpath) self.assertTrue(outpath.exists())
[docs] def test_timeline_meta2chmap(self): """Test for timeline_meta2chmap function.""" chmap = raw_daq.timeline_meta2chmap(self.meta) expected = {'bpod': 1, 'neuralFrames': 2, 'rotaryEncoder': 3} self.assertDictEqual(expected, chmap) chmap = raw_daq.timeline_meta2chmap(self.meta, exclude_channels=('bpod', 'rotaryEncoder')) self.assertDictEqual({'neuralFrames': expected.pop('neuralFrames')}, chmap) chmap = raw_daq.timeline_meta2chmap(self.meta, include_channels=('bpod', 'rotaryEncoder')) self.assertDictEqual(expected, chmap)
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)