import unittest
from unittest.mock import patch
import os
import uuid
import tempfile
from pathlib import Path
import logging
import json
import numpy as np
from one.api import ONE
from iblutil.io import params
import yaml
from ibllib.tests import TEST_DB
from ibllib.io import flags, misc, video, session_params
import ibllib.io.raw_data_loaders as raw
import ibllib.io.raw_daq_loaders as raw_daq
[docs]
class TestsParams(unittest.TestCase):
[docs]
def setUp(self):
self.par_dict = {'A': 'tata',
'O': 'toto',
'I': 'titi',
'num': 15,
'liste': [1, 'turlu'],
'apath': Path('/gna/gna/gna')}
self.parstr = 'toto'
params.write(self.parstr, self.par_dict)
params.write(self.parstr, params.from_dict(self.par_dict))
[docs]
def test_params(self):
# first go to and from dictionary
par_dict = self.par_dict
par = params.from_dict(par_dict)
self.assertEqual(params.as_dict(par), par_dict)
# next go to and from dictionary via json
par2 = params.read(self.parstr)
self.assertEqual(par, par2)
[docs]
def test_param_get_file(self):
home_dir = Path(params.getfile(self.parstr)).parent
# straight case the file is .{str} in the home directory
assert home_dir.joinpath(".toto") == Path(params.getfile(self.parstr))
# straight case the file is .{str} in the home directory
assert home_dir.joinpath(".toto") == Path(params.getfile(".toto"))
# subfolder case
assert home_dir.joinpath(".toto", ".titi") == Path(params.getfile("toto/titi"))
[docs]
def test_new_default_param(self):
# in this case an updated version of the codes brings in a new parameter
default = {'A': 'tata2',
'O': 'toto2',
'I': 'titi2',
'E': 'tete2',
'num': 15,
'liste': [1, 'turlu']}
expected_result = {'A': 'tata',
'O': 'toto',
'I': 'titi',
'num': 15,
'liste': [1, 'turlu'],
'apath': str(Path('/gna/gna/gna')),
'E': 'tete2',
}
par2 = params.read(self.parstr, default=default)
self.assertCountEqual(par2.as_dict(), expected_result)
# on the next path the parameter has been added to the param file
par2 = params.read(self.parstr, default=default)
self.assertCountEqual(par2.as_dict(), expected_result)
# check that it doesn't break if a named tuple is given instead of a dict
par3 = params.read(self.parstr, default=par2)
self.assertEqual(par2, par3)
# check that a non-existing par file raises error
Path(params.getfile(self.parstr)).unlink()
with self.assertRaises(FileNotFoundError):
params.read(self.parstr)
# check that a non-existing par file with default returns default
par = params.read(self.parstr, default=default)
self.assertCountEqual(par, params.from_dict(default))
# even if this default is a Params named tuple
par = params.read(self.parstr, default=par)
self.assertEqual(par, params.from_dict(default))
# check default empty dict
Path(params.getfile(self.parstr)).unlink()
filename = Path(params.getfile(self.parstr))
self.assertFalse(filename.exists())
par = params.read(self.parstr, default={})
self.assertIsNone(par)
self.assertTrue(filename.exists())
[docs]
def tearDown(self):
# at last delete the param file
Path(params.getfile('toto')).unlink(missing_ok=True)
[docs]
class TestsRawDataLoaders(unittest.TestCase):
[docs]
def setUp(self):
self.tempfile = tempfile.NamedTemporaryFile(delete=False)
self.bin_session_path = Path(__file__).parent.joinpath(
'fixtures', 'io', 'data_loaders', "_iblrig_test_mouse_2020-01-01_001")
[docs]
def testFlagFileRead(self):
# empty file should return True
self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
# test with 2 lines and a trailing
with open(self.tempfile.name, 'w+') as fid:
fid.write('file1\nfile2\n')
self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(['file1', 'file2']))
# test with 2 lines and a trailing, Windows convention
with open(self.tempfile.name, 'w+') as fid:
fid.write('file1\r\nfile2\r\n')
self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(['file1', 'file2']))
[docs]
def testAppendFlagFile(self):
# DO NOT CHANGE THE ORDER OF TESTS BELOW
# prepare a file with 3 dataset types
file_list = ['_ibl_extraRewards.times', '_ibl_lickPiezo.raw', '_ibl_lickPiezo.timestamps']
with open(self.tempfile.name, 'w+') as fid:
fid.write('\n'.join(file_list))
self.assertEqual(set(flags.read_flag_file(self.tempfile.name)), set(file_list))
# with an existing file containing files, writing more files append to it
file_list_2 = ['turltu']
# also makes sure that if a string is provided it works
flags.write_flag_file(self.tempfile.name, file_list_2[0])
self.assertEqual(set(flags.read_flag_file(self.tempfile.name)),
set(file_list + file_list_2))
# writing again keeps unique file values
flags.write_flag_file(self.tempfile.name, file_list_2[0])
n = sum([1 for f in flags.read_flag_file(self.tempfile.name) if f == file_list_2[0]])
self.assertEqual(n, 1)
# with an existing file containing files, writing empty filelist returns True for all files
flags.write_flag_file(self.tempfile.name, None)
self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
# with an existing empty file, writing filelist returns True for all files
flags.write_flag_file(self.tempfile.name, ['file1', 'file2'])
self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
# makes sure that read after write empty list also returns True
flags.write_flag_file(self.tempfile.name, [])
self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
# with an existing empty file, writing filelist returns the list if clobber
flags.write_flag_file(self.tempfile.name, ['file1', 'file2', 'file3'], clobber=True)
self.assertEqual(set(flags.read_flag_file(self.tempfile.name)),
set(['file1', 'file2', 'file3']))
# test the removal of a file within the list
flags.excise_flag_file(self.tempfile.name, removed_files='file1')
self.assertEqual(sorted(flags.read_flag_file(self.tempfile.name)), ['file2', 'file3'])
# if file-list is True it means all files and file_list should be empty after read
flags.write_flag_file(self.tempfile.name, file_list=True)
self.assertEqual(flags.read_flag_file(self.tempfile.name), True)
[docs]
def test_load_encoder_trial_info(self):
self.session = Path(__file__).parent.joinpath('extractors', 'data', 'session_biased_ge5')
data = raw.load_encoder_trial_info(self.session)
self.assertTrue(data is not None)
self.assertIsNone(raw.load_encoder_trial_info(self.session.with_name('empty')))
self.assertIsNone(raw.load_encoder_trial_info(None))
[docs]
def test_load_camera_ssv_times(self):
session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys')
with self.assertRaises(ValueError):
raw.load_camera_ssv_times(session, 'tail')
with self.assertRaises(FileNotFoundError):
raw.load_camera_ssv_times(session.with_name('foobar'), 'body')
bonsai, camera = raw.load_camera_ssv_times(session, 'body')
self.assertTrue(bonsai.size == camera.size == 6001)
self.assertEqual(bonsai.dtype.str, '<M8[ns]')
self.assertEqual(str(bonsai[0]), '2020-08-19T16:42:57.790361600')
expected = np.array([69.466875, 69.5, 69.533, 69.566125, 69.59925])
np.testing.assert_array_equal(expected, camera[:5])
# Many sessions have the columns in the wrong order. Here we write 5 lines from the
# fixture file to another file in a temporary folder, with the columns swapped.
from_file = session.joinpath('raw_video_data', '_iblrig_bodyCamera.timestamps.ssv')
with tempfile.TemporaryDirectory() as tempdir:
# New file with columns swapped (also checks loading files with UUID in name)
filename = f'_iblrig_leftCamera.timestamps.{uuid.uuid4()}.ssv'
to_file = Path(tempdir).joinpath('raw_video_data', filename)
to_file.parent.mkdir(exist_ok=True)
with open(from_file, 'r') as a, open(to_file, 'w') as b:
for i in range(5):
# Read line from fixture file and write into file in swapped order
b.write('{1} {0} {2}'.format(*a.readline().split(' ')))
assert to_file.exists(), 'failed to write test file'
bonsai, camera = raw.load_camera_ssv_times(to_file.parents[1], 'left')
# Verify that values returned in the same order as before
self.assertEqual(bonsai.dtype.str, '<M8[ns]')
self.assertEqual(camera.dtype.str, '<f8')
self.assertAlmostEqual(69.466875, camera[0])
[docs]
def test_load_camera_gpio(self):
"""
Embedded frame data comes from 057e25ef-3f80-42e8-aa9f-e259df8bc9ad, left camera
:return:
"""
session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys')
session2 = Path(__file__).parent.joinpath(
'fixtures', 'io', 'data_loaders', '_iblrig_test_mouse_2020-01-01_001'
)
gpio = raw.load_camera_gpio(session, 'body', as_dicts=True)
gpio2 = raw.load_camera_gpio(session2, 'left', as_dicts=True)
self.assertEqual(len(gpio), 4) # One dict per pin
self.assertEqual(len(gpio2), 4) # One dict per pin
*gpio_, gpio_4 = gpio # Check last dict; pin 4 should have one pulse
self.assertTrue(all(k in ('indices', 'polarities') for k in gpio_4.keys()))
np.testing.assert_array_equal(gpio_4['indices'], np.array([166, 172], dtype=np.int64))
np.testing.assert_array_equal(gpio_4['polarities'], np.array([1, -1]))
# Test raw flag
gpio = raw.load_camera_gpio(session, 'body', as_dicts=False)
self.assertEqual(gpio.dtype, bool)
self.assertEqual(gpio.shape, (510, 4))
# Test empty / None
self.assertIsNone(raw.load_camera_gpio(None, 'body'))
self.assertIsNone(raw.load_camera_gpio(session, 'right'))
[self.assertIsNone(x) for x in raw.load_camera_gpio(session, 'right', as_dicts=True)]
# Test noisy GPIO data
side = 'right'
with tempfile.TemporaryDirectory() as tdir:
session_path = Path(tdir).joinpath('mouse', '2020-06-01', '001')
session_path.joinpath('raw_video_data').mkdir(parents=True)
# Test loads file with UUID
did = uuid.uuid4() # Random uuid
filename = session_path / 'raw_video_data' / f'_iblrig_{side}Camera.GPIO.{did}.bin'
np.full(1000, 1.87904819e+09, dtype=np.float64).tofile(filename)
with self.assertLogs('ibllib', level='WARNING'):
raw.load_camera_gpio(session_path, side, as_dicts=True)
# Test dead pin array
np.zeros(3000, dtype=np.float64).tofile(filename)
with self.assertLogs('ibllib', level='ERROR'):
gpio = raw.load_camera_gpio(session_path, side, as_dicts=True)
[self.assertIsNone(x) for x in gpio]
[docs]
def test_load_camera_frame_count(self):
"""
Embedded frame data comes from 057e25ef-3f80-42e8-aa9f-e259df8bc9ad, left camera
:return:
"""
session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys')
count = raw.load_camera_frame_count(session, 'body', raw=False)
np.testing.assert_array_equal(count, np.arange(510, dtype=np.int32))
self.assertEqual(count.dtype, int)
# Test raw flag
count = raw.load_camera_frame_count(session, 'body', raw=True)
self.assertEqual(count[0], int(16696704))
# Test empty / None
self.assertIsNone(raw.load_camera_frame_count(None, 'body'))
self.assertIsNone(raw.load_camera_frame_count(session, 'right'))
[docs]
def test_load_embedded_frame_data(self):
session = Path(__file__).parent.joinpath('extractors', 'data', 'session_ephys')
count, gpio = raw.load_embedded_frame_data(session, 'body')
self.assertEqual(count[0], 0)
self.assertIsInstance(gpio[-1], dict)
count, gpio = raw.load_embedded_frame_data(session, 'body', raw=True)
self.assertNotEqual(count[0], 0)
self.assertIsInstance(gpio, np.ndarray)
[docs]
def test_load_camera_frameData(self):
import pandas as pd
fd_raw = raw.load_camera_frameData(self.bin_session_path, raw=True)
fd = raw.load_camera_frameData(self.bin_session_path)
# Wrong camera input file not found
with self.assertRaises(AssertionError):
raw.load_camera_frameData(self.bin_session_path, camera='right')
# Shape
self.assertTrue(fd.shape[1] == 4)
self.assertTrue(fd_raw.shape[1] == 4)
# Type
self.assertTrue(isinstance(fd, pd.DataFrame))
self.assertTrue(isinstance(fd_raw, pd.DataFrame))
# Column names
df_cols = ["Timestamp", "embeddedTimeStamp",
"embeddedFrameCounter", "embeddedGPIOPinState"]
self.assertTrue(np.all([x in fd.columns for x in df_cols]))
self.assertTrue(np.all([x in fd_raw.columns for x in df_cols]))
# Column types
parsed_dtypes = {
"Timestamp": np.float64,
"embeddedTimeStamp": np.float64,
"embeddedFrameCounter": np.int64,
"embeddedGPIOPinState": object
}
self.assertTrue(fd.dtypes.to_dict() == parsed_dtypes)
self.assertTrue(all([x == np.int64 for x in fd_raw.dtypes]))
[docs]
def test_load_settings(self):
main_path = Path(__file__).parent.joinpath('extractors', 'data')
self.training_ge5 = main_path / 'session_training_ge5'
settings = raw.load_settings(self.training_ge5)
self.assertIsInstance(settings, dict)
self.assertEqual(144, len(settings))
with self.assertLogs(raw._logger, level=20):
self.assertIsNone(raw.load_settings(None))
# Should return None when path empty
with self.assertLogs(raw._logger, level=20):
self.assertIsNone(raw.load_settings(self.training_ge5, 'raw_task_data_00'))
[docs]
def tearDown(self):
self.tempfile.close()
os.unlink(self.tempfile.name)
[docs]
class TestsMisc(unittest.TestCase):
[docs]
def setUp(self):
self._tdir = tempfile.TemporaryDirectory()
# self.addClassCleanup(tmpdir.cleanup) # py3.8
self.tempdir = Path(self._tdir.name)
self.subdirs = [
self.tempdir / 'test_empty_parent',
self.tempdir / 'test_empty_parent' / 'test_empty',
self.tempdir / 'test_empty',
self.tempdir / 'test_full',
]
self.file = self.tempdir / 'test_full' / 'file.txt'
_ = [x.mkdir() for x in self.subdirs]
self.file.touch()
[docs]
def tearDown(self) -> None:
self._tdir.cleanup()
def _resetup_folders(self):
self.file.unlink()
(self.tempdir / 'test_full').rmdir()
_ = [x.rmdir() for x in self.subdirs if x.exists()]
_ = [x.mkdir() for x in self.subdirs]
self.file.touch()
[docs]
def test_delete_empty_folders(self):
pre = [x.exists() for x in self.subdirs]
pre_expected = [True, True, True, True]
self.assertTrue(all([x == y for x, y in zip(pre, pre_expected)]))
# Test dry run
pos_expected = None
pos = misc.delete_empty_folders(self.tempdir)
self.assertTrue(pos == pos_expected)
# Test dry=False, non recursive
pos_expected = [True, False, False, True]
misc.delete_empty_folders(self.tempdir, dry=False)
pos = [x.exists() for x in self.subdirs]
self.assertTrue(all([x == y for x, y in zip(pos, pos_expected)]))
self._resetup_folders()
# Test recursive
pos_expected = [False, False, False, True]
misc.delete_empty_folders(self.tempdir, dry=False, recursive=True)
pos = [x.exists() for x in self.subdirs]
self.assertTrue(all([x == y for x, y in zip(pos, pos_expected)]))
[docs]
class TestVideo(unittest.TestCase):
[docs]
@classmethod
def setUpClass(cls) -> None:
cls.one = ONE(**TEST_DB)
if 'public' in cls.one.alyx._par.HTTP_DATA_SERVER:
cls.one.alyx._par = cls.one.alyx._par.set(
'HTTP_DATA_SERVER', cls.one.alyx._par.HTTP_DATA_SERVER.rsplit('/', 1)[0])
[docs]
def setUp(self) -> None:
self.eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7'
self.url = ('https://ibl.flatironinstitute.org/mainenlab/'
'Subjects/ZM_1743/2019-06-14/001/raw_video_data/'
'_iblrig_leftCamera.raw.71cfeef2-2aa5-46b5-b88f-ca07e3d92474.mp4')
[docs]
def test_label_from_path(self):
# Test file path
session_path = self.one.eid2path(self.eid)
video_path = session_path / 'raw_video_data' / '_iblrig_bodyCamera.raw.mp4'
label = video.label_from_path(video_path)
self.assertEqual('body', label)
# Test URL
label = video.label_from_path(self.url)
self.assertEqual('left', label)
# Test file name
label = video.label_from_path('_iblrig_rightCamera.raw.mp4')
self.assertEqual('right', label)
# Test wrong file
label = video.label_from_path('_iblrig_taskSettings.raw.json')
self.assertIsNone(label)
[docs]
def test_url_from_eid(self):
assert self.one.mode != 'remote'
actual = video.url_from_eid(self.eid, 'left', self.one)
self.assertEqual(self.url, actual)
actual = video.url_from_eid(self.eid, one=self.one)
expected = {'left': self.url}
self.assertEqual(expected, actual)
actual = video.url_from_eid(self.eid, label=('left', 'right'), one=self.one)
expected = {'left': self.url, 'right': None}
self.assertEqual(expected, actual)
# Test remote mode
old_mode = self.one.mode
self.one.mode = 'remote'
actual = video.url_from_eid(self.eid, label='left', one=self.one)
self.assertEqual(self.url, actual)
self.one.mode = old_mode
# Test arg checks
with self.assertRaises(ValueError):
video.url_from_eid(self.eid, 'back')
[docs]
def test_assert_valid_label(self):
with self.assertRaises(ValueError):
video.assert_valid_label('tail')
label = video.assert_valid_label('LEFT')
self.assertEqual(label, 'left')
# Verify works with lists
labels = video.assert_valid_label(['Right', 'body'])
self.assertEqual(labels, ('right', 'body'))
with self.assertRaises(TypeError):
video.assert_valid_label(None)
[docs]
class TestSessionParams(unittest.TestCase):
"""Tests for ibllib.io.session_params module."""
[docs]
def setUp(self) -> None:
self.tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tmpdir.cleanup)
# load yaml fixture
self.fixture_path = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml')
with open(self.fixture_path, 'r') as fp:
self.fixture = yaml.safe_load(fp)
# save as individual files
self.devices_path = Path(self.tmpdir.name).joinpath('_devices')
# a sync that's different to widefield and ephys
sync = {**self.fixture['sync']['nidq'].copy(), 'collection': 'raw_sync_data'}
computers_descriptions = {
'widefield': dict(devices={'widefield': self.fixture['devices']['widefield']}),
'video': '',
'ephys': dict(devices={'neuropixel': self.fixture['devices']['neuropixel']}),
'behaviour': dict(devices={'microphone': self.fixture['devices']['microphone']}),
'sync': dict(sync={'nidq': sync})
}
# the behaviour computer contains the task, project and procedure keys
for k in filter(lambda x: x != 'devices', self.fixture):
computers_descriptions['behaviour'][k] = self.fixture[k]
# the ephys computer contains another identical sync key
computers_descriptions['ephys']['sync'] = self.fixture['sync']
for label, data in computers_descriptions.items():
file_device = self.devices_path.joinpath(f'{label}.yaml')
session_params.write_yaml(file_device, data)
[docs]
@patch('iblutil.io.params.time.sleep')
def test_aggregate(self, sleep_mock):
"""A test for both aggregate_device and merge_params."""
fullfile = self.devices_path.parent.joinpath('_ibl_experiment.description.yaml')
file_lock = fullfile.with_suffix('.lock')
# Test deals with file lock
file_lock.touch()
device = 'widefield'
file_device = self.devices_path.joinpath(f'{device}.yaml')
session_params.aggregate_device(file_device, fullfile)
self.assertFalse(file_lock.exists(), 'failed to delete lock file')
self.assertTrue(fullfile.exists(), 'failed to create aggregate file')
sleep_mock.assert_called()
with open(fullfile, 'r') as fp:
data = yaml.safe_load(fp)
self.assertCountEqual(('devices', 'version'), data.keys())
self.assertCountEqual((device,), data['devices'].keys())
self.assertEqual(data['devices'][device], self.fixture['devices'][device])
# A device with extra keys
device = 'behaviour'
file_device = self.devices_path.joinpath(f'{device}.yaml')
session_params.aggregate_device(file_device, fullfile, unlink=True)
self.assertFalse(file_lock.exists(), 'failed to delete lock file')
self.assertFalse(file_device.exists(), 'failed to delete device file')
with open(fullfile, 'r') as fp:
data = yaml.safe_load(fp)
expected_keys = ('devices', 'procedures', 'projects', 'sync', 'tasks', 'version')
self.assertCountEqual(data.keys(), expected_keys)
self.assertTrue(len(data['devices'].keys()) > 1)
# A device with another identical sync key
file_device = self.devices_path.joinpath('ephys.yaml')
session_params.aggregate_device(file_device, fullfile, unlink=True)
# A device with a different sync
file_device = self.devices_path.joinpath('sync.yaml')
with self.assertRaises(AssertionError):
session_params.aggregate_device(file_device, fullfile, unlink=True)
# An empty device
file_device = self.devices_path.joinpath('video.yaml')
with self.assertLogs(session_params.__name__, logging.WARNING):
session_params.aggregate_device(file_device, fullfile, unlink=True)
[docs]
@patch(session_params.__name__ + '.SPEC_VERSION', '999')
def test_read_yaml(self):
label = 'widefield'
data = session_params.read_params(self.devices_path.joinpath(f'{label}.yaml'))
self.assertEqual('999', data.pop('version'), 'failed to patch file')
self.assertEqual(data['devices'][label], self.fixture['devices'][label])
# Check loads from directory path
data_keys = session_params.read_params(self.fixture_path.parent).keys()
self.assertCountEqual(self.fixture.keys(), data_keys)
[docs]
def test_patch_data(self):
"""Test for session_params._patch_file function."""
with patch(session_params.__name__ + '.SPEC_VERSION', '1.0.0'), \
self.assertLogs(session_params.__name__, logging.WARNING):
data = session_params._patch_file({'version': '1.1.0'})
self.assertEqual(data, {'version': '1.0.0'})
# Check tasks dicts separated into lists
unpatched = {'version': '0.0.1', 'tasks': {
'fooChoiceWorld': {1: '1'}, 'barChoiceWorld': {2: '2'}}}
data = session_params._patch_file(unpatched)
self.assertIsInstance(data['tasks'], list)
self.assertEqual([['fooChoiceWorld'], ['barChoiceWorld']], list(map(list, data['tasks'])))
# Check patching list of dicts with some containing more than 1 key
unpatched = {'tasks': [{'foo': {1: '1'}}, {'bar': {2: '2'}, 'baz': {3: '3'}}]}
data = session_params._patch_file(unpatched)
self.assertEqual(3, len(data['tasks']))
self.assertEqual([['foo'], ['bar'], ['baz']], list(map(list, data['tasks'])))
[docs]
def test_get_collections(self):
collections = session_params.get_collections(self.fixture)
expected = {
'widefield': 'raw_widefield_data',
'microphone': 'raw_behavior_data',
'probe00': 'raw_ephys_data/probe00',
'probe01': 'raw_ephys_data/probe01',
'nidq': 'raw_ephys_data',
'passiveChoiceWorld': 'raw_passive_data',
'ephysChoiceWorld': 'raw_behavior_data'
}
self.assertCountEqual(expected, collections)
[docs]
def test_get_collections_repeat_protocols(self):
tasks = dict(tasks=[
{'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}},
{'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
{'passiveChoiceWorld': {'collection': 'raw_passive_data_bis'}}])
collections = session_params.get_collections(tasks)
self.assertEqual(set(collections['passiveChoiceWorld']), set(['raw_passive_data_bis', 'raw_passive_data']))
collections = session_params.get_collections(tasks, flat=True)
self.assertEqual(collections, {'raw_passive_data_bis', 'raw_passive_data', 'raw_behavior_data'})
[docs]
def test_merge_params(self):
"""Test for ibllib.io.session_params.merge_params functions."""
a = self.fixture
b = {'procedures': ['Imaging', 'Injection'], 'tasks': [{'fooChoiceWorld': {'collection': 'bar'}}]}
c = session_params.merge_params(a, b, copy=True)
self.assertCountEqual(['Imaging', 'Behavior training/tasks', 'Injection'], c['procedures'])
self.assertEqual(['passiveChoiceWorld', 'ephysChoiceWorld', 'fooChoiceWorld'], [list(x)[0] for x in c['tasks']])
# Ensure a and b not modified
self.assertNotEqual(set(c['procedures']), set(a['procedures']))
self.assertNotEqual(set(a['procedures']), set(b['procedures']))
# Test duplicate tasks skipped while order kept constant
d = {'tasks': [a['tasks'][1], {'ephysChoiceWorld': {'collection': 'raw_task_data_02', 'sync_label': 'nidq'}}]}
e = session_params.merge_params(c, d, copy=True)
expected = ['passiveChoiceWorld', 'ephysChoiceWorld', 'fooChoiceWorld', 'ephysChoiceWorld']
self.assertEqual(expected, [list(x)[0] for x in e['tasks']])
self.assertDictEqual({'collection': 'raw_task_data_02', 'sync_label': 'nidq'}, e['tasks'][-1]['ephysChoiceWorld'])
# Test without copy
session_params.merge_params(a, b, copy=False)
self.assertCountEqual(['Imaging', 'Behavior training/tasks', 'Injection'], a['procedures'])
# Test assertion on duplicate sync
b['sync'] = {'foodaq': {'collection': 'raw_sync_data'}}
self.assertRaises(AssertionError, session_params.merge_params, a, b)
# Test how it handles the extractors key, which is an unhashable list
f = {'tasks': [{'fooChoiceWorld': {'collection': 'bar', 'sync_label': 'bpod', 'extractors': ['a', 'b']}}]}
g = session_params.merge_params(a, f, copy=True)
self.assertCountEqual(['devices', 'procedures', 'projects', 'sync', 'tasks', 'version'], g.keys())
self.assertEqual(4, len(g['tasks']))
self.assertDictEqual(f['tasks'][0], g['tasks'][-1])
[docs]
def test_get_protocol_number(self):
"""Test ibllib.io.session_params.get_task_protocol_number function."""
self.assertIsNone(session_params.get_task_protocol_number(self.fixture))
self.assertIsNone(session_params.get_task_protocol_number(self.fixture, 'passiveChoiceWorld'))
self.assertIsNone(session_params.get_task_protocol_number(self.fixture, 'fooChoiceWorld'))
for i, task in enumerate(self.fixture['tasks']):
next(iter(task.values()))['protocol_number'] = str(i)
self.assertEqual(0, session_params.get_task_protocol_number(self.fixture, 'passiveChoiceWorld'))
self.assertEqual([0, 1], session_params.get_task_protocol_number(self.fixture))
[docs]
class TestRawDaqLoaders(unittest.TestCase):
"""Tests for raw_daq_loaders module"""
[docs]
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(self.tmpdir.cleanup)
# Create some toy DAQ data
N = 3000
Fs = 1
a0_clean = np.zeros(N)
self.n_ttl = 6
pulse_width = int(np.floor(50 * Fs))
for i in np.arange(1, N, int(np.floor(N / self.n_ttl))):
a0_clean[i:i + pulse_width] = 1
a0 = (a0_clean * np.full(N, 5)) + np.random.rand(N) + 1 # 0 -> 5V w/ noise and 1V DC offset
ctr0 = np.cumsum(a0_clean) # Counter channel, e.g. [0, 0, 0, 1, 1, 2, 3, 3, 3, 3, [...] n]
ctr1 = np.cumsum(a0_clean * np.random.choice([1, -1], N)) # Position channel e.g. [0, 1, 2, 1, ...]
self.timeline = {'timestamps': np.arange(0, N, Fs), 'raw': np.vstack([a0, ctr0, ctr1]).T}
self.meta = {'daqSampleRate': Fs, 'inputs': [
{'name': 'bpod', 'arrayColumn': 1, 'measurement': 'Voltage', 'daqChannelID': 'ai0'},
{'name': 'neuralFrames', 'arrayColumn': 2, 'measurement': 'EdgeCount', 'daqChannelID': 'ctr0'},
{'name': 'rotaryEncoder', 'arrayColumn': 3, 'measurement': 'Position', 'daqChannelID': 'ctr1'}
]}
# FIXME Because of non-standard ALF naming we cannot use save_object_npy for this purpose
# alfio.save_object_npy(self.tmpdir.name, self.timeline, 'DAQ data', namespace='timeline')
for k, v in self.timeline.items():
np.save(self.tmpdir.name + f'/_timeline_DAQdata.{k}.npy', v)
with open(self.tmpdir.name + '/_timeline_DAQdata.meta.json', 'w') as fp:
json.dump(self.meta, fp)
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)