Source code for ibllib.tests.extractors.test_ephys_fpga
"""Tests for ephys FPGA sync and FPGA wheel extraction."""
import unittest
import tempfile
from pathlib import Path
import pickle
import numpy as np
import spikeglx
from ibllib.io.extractors import ephys_fpga
from ibllib.io.extractors.training_wheel import extract_first_movement_times, infer_wheel_units
from ibllib.io.extractors.training_wheel import extract_wheel_moves
import brainbox.behavior.wheel as wh
[docs]
class TestsFolderStructure(unittest.TestCase):
[docs]
def setUp(self):
self.dir = tempfile.TemporaryDirectory()
pl = Path(self.dir.name) / 'raw_ephys_data' / 'probe_left'
pr = Path(self.dir.name) / 'raw_ephys_data' / 'probe_right'
pl.mkdir(parents=True)
pr.mkdir(parents=True)
(pl / 'iblrig_ephysData.raw_g0_t0.imec.lf.bin').touch()
(pl / 'iblrig_ephysData.raw_g0_t0.imec.ap.bin').touch()
(pr / 'iblrig_ephysData.raw_g0_t0.imec.lf.bin').touch()
(pr / 'iblrig_ephysData.raw_g0_t0.imec.ap.bin').touch()
[docs]
def test_get_ephys_files(self):
# first test at the root directory level, with a string input
ephys_files = spikeglx.glob_ephys_files(self.dir.name)
for ef in ephys_files:
self.assertTrue(ef.label in ['probe_right', 'probe_left'])
self.assertTrue(ef.ap.exists() and ef.lf.exists())
# second test at the ephys directory level, with a pathlib.Path input
ephys_files = spikeglx.glob_ephys_files(Path(self.dir.name) / 'raw_ephys_data')
for ef in ephys_files:
self.assertTrue(ef.label in ['probe_right', 'probe_left'])
self.assertTrue(ef.ap.exists() and ef.lf.exists())
[docs]
class TestSyncExtraction(unittest.TestCase):
[docs]
def setUp(self):
self.workdir = Path(__file__).parents[1] / 'fixtures' / 'sync_ephys_fpga'
self.meta_files = list(Path.glob(self.workdir, '*.meta'))
[docs]
def test_sync_nidq(self):
self.sync_gen(fn='sample3B_g0_t0.nidq.meta', ns=32, nc=2, sync_depth=8)
[docs]
def test_sync_3B(self):
self.sync_gen(fn='sample3B_g0_t0.imec1.ap.meta', ns=32, nc=385, sync_depth=16)
[docs]
def test_sync_3A(self):
self.sync_gen(fn='sample3A_g0_t0.imec.ap.meta', ns=32, nc=385, sync_depth=16)
[docs]
def sync_gen(self, fn, ns, nc, sync_depth):
# nidq has 1 analog and 1 digital sync channels
with tempfile.TemporaryDirectory() as tdir:
ses_path = Path(tdir).joinpath('raw_ephys_data')
ses_path.mkdir(parents=True, exist_ok=True)
bin_file = ses_path.joinpath(fn).with_suffix('.bin')
nidq = spikeglx._mock_spikeglx_file(bin_file, self.workdir / fn,
ns=ns, nc=nc, sync_depth=sync_depth)
syncs, files = ephys_fpga.extract_sync(tdir)
self.assertTrue(np.all(syncs[0].channels[slice(1, None, 2)] ==
np.arange(0, nidq['sync_depth'])))
with self.assertLogs(level='INFO') as log:
ephys_fpga.extract_sync(tdir)
self.assertEqual(1, len(log.output))
self.assertIn('SGLX sync found', log.output[0])
[docs]
class TestIBLChannelMaps(unittest.TestCase):
[docs]
def test_ibl_sync_maps(self):
s = ephys_fpga.get_ibl_sync_map({'ap': 'toto', 'path': self.workdir}, '3A')
self.assertEqual(s, ephys_fpga.CHMAPS['3A']['ap'])
s = ephys_fpga.get_ibl_sync_map({'nidq': 'toto', 'path': self.workdir}, '3B')
self.assertEqual(s, ephys_fpga.CHMAPS['3B']['nidq'])
s = ephys_fpga.get_ibl_sync_map({'ap': 'toto', 'path': self.workdir}, '3B')
self.assertEqual(s, ephys_fpga.CHMAPS['3B']['ap'])
[docs]
class TestTTLsExtraction(unittest.TestCase):
[docs]
def test_audio_ttl_wiring_camera(self):
"""
Test ephys_fpga._clean_audio function.
Test removal of spurious TTLs due to a wrong wiring of the camera onto the soundcard
example eid: e349a2e7-50a3-47ca-bc45-20d1899854ec
"""
# on this example it's business as usual and the clean up should not change anything
audio = {
'times': np.array([1740.1032, 1740.20176667, 1741.0786, 1741.57713333, 1744.78716667, 1744.88573333]),
'polarities': np.array([1., -1., 1., -1., 1., -1.]),
}
audio_ = ephys_fpga._clean_audio(audio)
self.assertEqual(audio, audio_)
# on that example
audio = {'times': np.array([3399.4090251, 3399.4110249, 3399.4156911, 3399.4176909,
3399.42232377, 3399.42432357, 3399.42898977, 3399.43095624,
3399.43562244, 3399.43762224, 3399.44228844, 3399.44425491,
3399.44892111, 3399.45092091, 3399.45558711, 3399.45755358,
3399.46221978, 3399.46421958, 3399.46888578, 3399.47085225,
3399.47551845, 3399.47751825, 3399.48215112, 3399.48415092,
3399.48881712, 3399.49081692, 3399.49544979, 3399.49744959,
3399.50211579, 3399.50411559, 3400.306602, 3400.3086018,
3400.313268, 3400.3152678, 3400.31990067, 3400.32190047,
3400.32656667, 3400.32856647, 3400.33319934, 3400.33519914,
3400.33986534, 3400.34186514, 3400.34649801, 3400.34849781,
3400.35316401, 3400.35516381, 3400.35979668, 3400.36179648,
3400.36646268, 3400.36846248, 3400.37309535, 3400.37509515,
3400.37976135, 3400.38172782, 3400.38639402, 3400.38839382,
3400.39306002, 3400.39502649, 3400.39969269, 3400.40169249,
3400.40635869, 3400.40832516, 3400.41299136, 3400.41499116,
3400.41962403, 3400.42162383, 3400.42629003, 3400.42828983,
3400.4329227, 3400.4349225, 3400.4395887, 3400.4415885,
3400.44622137, 3400.44822117, 3400.45288737, 3400.45488717,
3400.45952004, 3400.46151984, 3400.46618604, 3400.46818584,
3400.47281871, 3400.47481851, 3400.47948471, 3400.48148451,
3400.48611738, 3400.48811718, 3400.49278338, 3400.49478318,
3400.49941605, 3400.50141585, 3400.50608205, 3400.50808185,
3400.51271472, 3400.51471452, 3400.51938072, 3400.52138052,
3400.52601339, 3400.52801319, 3400.53267939, 3400.53467919,
3400.53931206, 3400.54131186, 3400.54597806, 3400.54797786,
3400.55261073, 3400.55461053, 3400.55927673, 3400.5612432,
3400.5659094, 3400.5679092, 3400.5725754, 3400.57454187,
3400.57920807, 3400.58120787, 3400.58587407, 3400.58784054,
3400.59250674, 3400.59450654, 3400.59917274, 3400.60113921,
3400.60580541, 3400.60780521, 3400.61243808, 3400.61443788,
3400.61910408, 3400.62110388, 3400.62573675, 3400.62773655,
3400.63240275, 3400.63440255, 3400.63903542, 3400.64103522,
3400.64570142, 3400.64770122, 3400.65233409, 3400.65433389,
3400.65900009, 3400.66099989, 3400.66563276, 3400.66763256,
3400.67229876, 3400.67429856, 3400.67893143, 3400.68093123,
3400.68559743, 3400.68759723, 3400.6922301, 3400.6942299,
3400.6988961, 3400.7008959, 3400.70552877, 3400.70752857,
3400.71219477, 3400.71419457, 3400.71882744, 3400.72082724,
3400.72549344, 3400.72749324, 3400.73212611, 3400.73412591,
3400.73879211, 3400.74079191, 3400.74542478, 3400.74742458,
3400.75209078, 3400.75405725, 3400.75872345, 3400.76072325,
3400.76538945, 3400.76735592, 3400.77202212, 3400.77402192,
3400.77868812, 3400.78065459, 3400.78532079, 3400.78732059,
3400.79198679, 3400.79395326, 3400.79861946, 3400.80061926]),
'polarities': np.array([1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.])
}
audio_ = ephys_fpga._clean_audio(audio)
expected = {'times': np.array([3399.4090251, 3399.50411559, 3400.306602, 3400.80061926]),
'polarities': np.array([1., -1., 1., -1.])}
assert all(np.all(audio_[k] == expected[k]) for k in audio_)
[docs]
def test_audio_ttl_start_up_down(self):
"""
If the behaviour starts before FPGA, it is very unlikely but possible that the FPGA
starts on up state and that the first front of the audio is a downgoing one.
The extraction should handle both cases seamlessly: cf eid d839491f-55d8-4cbe-a298-7839208ba12b
"""
def _test_audio(audio, audio_intervals):
for tone, intervals in audio_intervals.items():
if tone == 'unassigned':
self.assertFalse(len(intervals))
else:
np.testing.assert_array_almost_equal(
intervals[:, 0], audio['times'][audio[tone]], err_msg=tone)
audio = {
'times': np.array([1740.1032, 1740.20176667, 1741.0786, 1741.57713333, 1744.78716667, 1744.88573333]),
'polarities': np.array([1., -1., 1., -1., 1., -1.]),
'error_tone': np.array([False, False, True, False, False, False]),
'ready_tone': np.array([True, False, False, False, True, False]),
'channels': np.zeros(6, dtype=int)
}
extractor = ephys_fpga.FpgaTrials('subject/2023-01-01/000') # placeholder session path unused
_audio, audio_intervals = extractor.get_audio_event_times(audio, {'audio': 0})
_test_audio(audio, audio_intervals) # this tests the usual pulses
audio['channels'][0] = 1 # this tests when it starts in upstate
audio['ready_tone'][0] = False # the first ready tone should now be skipped
_audio, audio_intervals = extractor.get_audio_event_times(audio, {'audio': 0})
_test_audio(audio, audio_intervals)
np.testing.assert_array_equal(_audio['times'], audio['times'][1:])
[docs]
def test_frame2ttl_flickers(self):
"""
Frame2ttl can flicker abnormally. One way to detect this is to remove consecutive polarity
switches under a given threshold
"""
DISPLAY = False # for debug purposes
F2TTL_THRESH = 0.01
diff = F2TTL_THRESH * np.array([0.5, 10])
# flicker ends with a polarity switch - downgoing pulse is removed
t = np.r_[0, np.cumsum(diff[np.array([1, 1, 0, 0, 1])])] + 1
frame2ttl = {'times': t, 'polarities': np.mod(np.arange(t.size) + 1, 2) * 2 - 1}
expected = {'times': np.array([1., 1.1, 1.2, 1.31]),
'polarities': np.array([1, -1, 1, -1])}
frame2ttl_ = ephys_fpga._clean_frame2ttl(frame2ttl, display=DISPLAY, threshold=F2TTL_THRESH)
assert all(np.all(frame2ttl_[k] == expected[k]) for k in frame2ttl_)
# stand-alone flicker
t = np.r_[0, np.cumsum(diff[np.array([1, 1, 0, 0, 0, 1])])] + 1
frame2ttl = {'times': t, 'polarities': np.mod(np.arange(t.size) + 1, 2) * 2 - 1}
expected = {'times': np.array([1., 1.1, 1.2, 1.215, 1.315]),
'polarities': np.array([1, -1, 1, -1, 1])}
frame2ttl_ = ephys_fpga._clean_frame2ttl(frame2ttl, display=DISPLAY)
assert all(np.all(frame2ttl_[k] == expected[k]) for k in frame2ttl_)
[docs]
def test_bpod_trace_extraction(self):
"""Test FpgaTrials.get_bpod_event_times method."""
expected = {
'trial_start': np.array([0, 4, 8, 12, 14, 16, 20, 24, 26, 28, 32, 34, 38, 42, 46, 48, 52, 56]),
'valve_open': np.array([2, 6, 10, 18, 22, 30, 36, 40, 44, 50, 54])
}
bpod_fronts_ = np.array([1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1.])
bpod_times_ = np.array([6.75033333, 109.7648, 117.12136667, 117.27136667,
118.51416667, 118.51426667, 122.3873, 122.5373,
123.7964, 123.7965, 127.82903333, 127.97903333,
129.24503333, 129.24513333, 132.97976667, 132.97986667,
136.8624, 136.8625, 140.56083333, 140.71083333,
141.95523333, 141.95533333, 143.55326667, 143.70326667,
144.93636667, 144.93646667, 149.5042, 149.5043,
153.08273333, 153.08283333, 155.29713333, 155.44713333,
156.70316667, 156.70326667, 164.0096, 164.0097,
164.9186, 165.0686, 166.30633333, 166.30643333,
167.91133333, 168.06133333, 169.28373333, 169.28386667,
171.39736667, 171.54736667, 172.77786667, 172.77796667,
176.7828, 176.7829, 178.0305, 178.1805,
179.41063333, 179.41073333, 181.70343333, 181.85343333,
183.12896667, 183.12906667])
extractor = ephys_fpga.FpgaTrials('subject/2023-01-01/000') # placeholder session path unused
sync = {'times': bpod_times_, 'polarities': bpod_fronts_, 'channels': np.zeros_like(bpod_times_, dtype=int)}
_, bpod_intervals = extractor.get_bpod_event_times(sync, {'bpod': 0})
for k in expected:
np.testing.assert_array_equal(bpod_intervals[k][:, 0], bpod_times_[expected[k]])
[docs]
def test_ttl_bpod_gaelle_writes_protocols_but_guido_doesnt_read_them(self):
bpod_t = np.array([5.423290950005423, 6.397993470006398, 6.468919710006469,
7.497916800007498, 7.997933460007998, 8.599239990008599,
8.5993399800086, 15.141985650015142, 15.642002310015641,
16.215411630016217, 16.215511620016215, 17.104122750017105,
17.175015660017174, 18.204012750018205, 18.704029410018705,
19.286337840019286, 19.28643783001929, 21.76005711002176,
21.83095002002183, 22.85998044002286])
pol = (np.mod(np.arange(bpod_t.size), 2) - 0.5) * 2
sync = {'times': bpod_t, 'polarities': pol, 'channels': np.zeros_like(bpod_t, dtype=int)}
extractor = ephys_fpga.FpgaTrials('subject/2023-01-01/000') # placeholder session path unused
_, bpod_intervals = extractor.get_bpod_event_times(sync, {'bpod': 0})
expected = bpod_t[[1, 5, 9, 15]]
np.testing.assert_array_equal(bpod_intervals['trial_start'][:, 0], expected)
# when the bpod has started before the ephys, the first pulses may have been missed
# and the first TTL may be negative/. This needs to yield the same result as if the
# bpod was started properly
sync['channels'][0] = 1
_, bpod_intervals_ = extractor.get_bpod_event_times(sync, {'bpod': 0})
for event, intervals in bpod_intervals.items():
np.testing.assert_array_equal(intervals, bpod_intervals_[event], err_msg=event)
[docs]
def test_align_to_trial(self):
"""Test ephys_fpga._assign_events_to_trial function."""
# simple test with one missing at the end
t_trial_start = np.arange(0, 5) * 10
t_event = np.arange(0, 5) * 10 + 2
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
self.assertTrue(np.allclose(t_event_nans, t_event, equal_nan=True, atol=0, rtol=0))
# test with missing values
t_trial_start = np.array([109, 118, 123, 129, 132, 136, 141, 144, 149, 153])
t_event = np.array([122, 133, 140, 143, 146, 150, 154])
t_event_out_ = np.array([np.nan, 122, np.nan, np.nan, 133, 140, 143, 146, 150, 154])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
self.assertTrue(np.allclose(t_event_out_, t_event_nans, equal_nan=True, atol=0, rtol=0))
# test with events before initial start trial
t_trial_start = np.arange(0, 5) * 10
t_event = np.arange(0, 5) * 10 - 2
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
desired_out = np.array([8., 18., 28., 38., np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# test with several events per trial, missing events and events before
t_trial_start = np.array([0, 10, 20, 30, 40])
t_event = np.array([-1, 2, 4, 12, 35, 42])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
desired_out = np.array([4, 12., np.nan, 35, 42])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# same test above but this time take the first index instead of last
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take='first')
desired_out = np.array([2, 12., np.nan, 35, 42])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# take second to last
t_trial_start = np.array([0, 10, 20, 30, 40])
t_event = np.array([2, 4, 12, 13, 14, 21, 32, 33, 34, 35, 42])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take=-2)
desired_out = np.array([2, 13, np.nan, 34, np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take=1)
desired_out = np.array([4, 13, np.nan, 33, np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# test errors
self.assertRaises(ValueError, ephys_fpga._assign_events_to_trial, np.array([0., 2., 1.]), t_event)
self.assertRaises(ValueError, ephys_fpga._assign_events_to_trial, t_trial_start, np.array([0., 2., 1.]))
[docs]
class TestWheelExtraction(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.ta = np.array([2, 4, 6, 8, 12, 14, 16, 18])
self.pa = np.array([1, -1, 1, -1, 1, -1, 1, -1])
self.tb = np.array([3, 5, 7, 9, 11, 13, 15, 17])
self.pb = np.array([1, -1, 1, -1, 1, -1, 1, -1])
[docs]
def test_x1_decoding(self):
p_ = np.array([1, 2, 1, 0])
t_ = np.array([2, 6, 11, 15])
t, p = ephys_fpga._rotary_encoder_positions_from_fronts(self.ta, self.pa, self.tb, self.pb, ticks=np.pi * 2, coding='x1')
self.assertTrue(np.all(t == t_))
self.assertTrue(np.all(p == p_))
[docs]
def test_x4_decoding(self):
p_ = np.array([1, 2, 3, 4, 5, 6, 7, 8, 7, 6, 5, 4, 3, 2, 1, 0]) / 4
t_ = np.array([2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18])
t, p = ephys_fpga._rotary_encoder_positions_from_fronts(self.ta, self.pa, self.tb, self.pb, ticks=np.pi * 2, coding='x4')
self.assertTrue(np.all(t == t_))
self.assertTrue(np.all(np.isclose(p, p_)))
[docs]
def test_x2_decoding(self):
p_ = np.array([1, 2, 3, 4, 3, 2, 1, 0]) / 2
t_ = np.array([2, 4, 6, 8, 12, 14, 16, 18])
t, p = ephys_fpga._rotary_encoder_positions_from_fronts(self.ta, self.pa, self.tb, self.pb, ticks=np.pi * 2, coding='x2')
self.assertTrue(np.all(t == t_))
self.assertTrue(np.all(p == p_))
[docs]
def test_wheel_trace_from_sync(self):
"""Test ephys_fpga._rotary_encoder_positions_from_fronts function."""
pos_ = - np.array([-1, 0, -1, -2, -1, -2]) * (np.pi / ephys_fpga.WHEEL_TICKS)
ta = np.array([1, 2, 3, 4, 5, 6])
tb = np.array([0.5, 3.2, 3.3, 3.4, 5.25, 5.5])
pa = (np.mod(np.arange(6), 2) - 0.5) * 2
pb = (np.mod(np.arange(6) + 1, 2) - .5) * 2
t, pos = ephys_fpga._rotary_encoder_positions_from_fronts(ta, pa, tb, pb, coding='x2')
self.assertTrue(np.all(np.isclose(pos_, pos)))
pos_ = - np.array([-1, 0, -1, 0, -1, -2]) * (np.pi / ephys_fpga.WHEEL_TICKS)
tb = np.array([0.5, 3.2, 3.4, 5.25])
pb = (np.mod(np.arange(4) + 1, 2) - .5) * 2
t, pos = ephys_fpga._rotary_encoder_positions_from_fronts(ta, pa, tb, pb, coding='x2')
self.assertTrue(np.all(np.isclose(pos_, pos)))
[docs]
class TestExtractedWheelUnits(unittest.TestCase):
"""Tests the infer_wheel_units function."""
wheel_radius_cm = 3.1
[docs]
def setUp(self) -> None:
"""
Create the wheel position data for testing: the positions attribute holds a dictionary of
units, each holding a dictionary of encoding types to test, e.g.
positions = {
'rad': {
'X1': ...,
'X2': ...,
'X4': ...
},
'cm': {
'X1': ...,
'X2': ...,
'X4': ...
}
}
:return:
"""
def x(unit, enc=int(1), wheel_radius=self.wheel_radius_cm):
radius = 1 if unit == 'rad' else wheel_radius
return 1 / ephys_fpga.WHEEL_TICKS * np.pi * 2 * radius / enc
# A pseudo-random sequence of integrated fronts
seq = np.array([-1, 0, 1, 2, 1, 2, 3, 4, 3, 2, 1, 0, -1, -2, 1, -2])
encs = (1, 2, 4) # Encoding types to test
units = ('rad', 'cm') # Units to test
self.positions = {unit: {f'X{e}': x(unit, e) * seq for e in encs} for unit in units}
[docs]
def test_extract_wheel_moves(self):
for unit in self.positions.keys():
for encoding, pos in self.positions[unit].items():
result = infer_wheel_units(pos)
self.assertEqual(unit, result[0], f'failed to determine units for {encoding}')
expected = int(ephys_fpga.WHEEL_TICKS * int(encoding[1]))
self.assertEqual(expected, result[1], f'failed to determine number of ticks for {encoding} in {unit}')
self.assertEqual(encoding, result[2], f'failed to determine encoding in {unit}')
[docs]
class TestWheelMovesExtraction(unittest.TestCase):
[docs]
def setUp(self) -> None:
"""
Test data is in the form ((inputs), (outputs)) where inputs is a tuple containing a
numpy array of timestamps and one of positions; outputs is a tuple of outputs from
the functions. For details, see help on TestWheel.setUp method in module
brainbox.tests.test_behavior
"""
pickle_file = Path(__file__).parents[3].joinpath('brainbox', 'tests', 'fixtures', 'wheel_test.p')
if not pickle_file.exists():
self.test_data = None
else:
with open(pickle_file, 'rb') as f:
self.test_data = pickle.load(f)
# Some trial times for trial_data[1]
self.trials = {'goCue_times': np.array([162.5, 105.6, 55]), 'feedback_times': np.array([164.3, 108.3, 56])}
[docs]
def test_extract_wheel_moves(self):
test_data = self.test_data[1]
# Wrangle data into expected form
re_ts = test_data[0][0]
re_pos = test_data[0][1]
logname = 'ibllib.io.extractors.training_wheel'
with self.assertLogs(logname, level='INFO') as cm:
wheel_moves = extract_wheel_moves(re_ts, re_pos)
self.assertEqual([f'INFO:{logname}:Wheel in cm units using X2 encoding'], cm.output)
n = 56 # expected number of movements
self.assertTupleEqual(wheel_moves['intervals'].shape, (n, 2), 'failed to return the correct number of intervals')
self.assertEqual(wheel_moves['peakAmplitude'].size, n)
self.assertEqual(wheel_moves['peakVelocity_times'].size, n)
# Check the first 3 intervals
ints = np.array([[24.78462599, 25.22562599], [29.58762599, 31.15062599], [31.64262599, 31.81662599]])
actual = wheel_moves['intervals'][:3, ]
self.assertIsNone(np.testing.assert_allclose(actual, ints), 'unexpected intervals')
# Check amplitudes
actual = wheel_moves['peakAmplitude'][-3:]
expected = [0.50255486, -1.70103154, 1.00740789]
self.assertIsNone(np.testing.assert_allclose(actual, expected), 'unexpected amplitudes')
# Check peak velocities
actual = wheel_moves['peakVelocity_times'][-3:]
expected = [175.13662599, 176.65762599, 178.57262599]
self.assertIsNone(np.testing.assert_allclose(actual, expected), 'peak times')
# Test extraction in rad
re_pos = wh.cm_to_rad(re_pos)
with self.assertLogs(logname, level='INFO') as cm:
wheel_moves = ephys_fpga.extract_wheel_moves(re_ts, re_pos)
self.assertEqual([f'INFO:{logname}:Wheel in rad units using X2 encoding'], cm.output)
# Check the first 3 intervals. As position thresholds are adjusted by units and
# encoding, we should expect the intervals to be identical to above
actual = wheel_moves['intervals'][:3, ]
self.assertIsNone(np.testing.assert_allclose(actual, ints), 'unexpected intervals')
[docs]
def test_movement_log(self):
"""
Integration test for inferring the units and decoding type for wheel data input for
extract_wheel_moves. Only expected to work for the default wheel diameter.
"""
ta = np.array([2, 4, 6, 8, 12, 14, 16, 18])
pa = np.array([1, -1, 1, -1, 1, -1, 1, -1])
tb = np.array([3, 5, 7, 9, 11, 13, 15, 17])
pb = np.array([1, -1, 1, -1, 1, -1, 1, -1])
logname = 'ibllib.io.extractors.training_wheel'
for unit in ['cm', 'rad']:
for i in (1, 2, 4):
encoding = 'X' + str(i)
r = 3.1 if unit == 'cm' else 1
# print(encoding, unit)
t, p = ephys_fpga._rotary_encoder_positions_from_fronts(
ta, pa, tb, pb, ticks=1024, coding=encoding.lower(), radius=r)
expected = f'INFO:{logname}:Wheel in {unit} units using {encoding} encoding'
with self.assertLogs(logname, level='INFO') as cm:
ephys_fpga.extract_wheel_moves(t, p)
self.assertEqual([expected], cm.output)
[docs]
def test_extract_first_movement_times(self):
test_data = self.test_data[1]
wheel_moves = ephys_fpga.extract_wheel_moves(test_data[0][0], test_data[0][1])
first, is_final, ind = extract_first_movement_times(wheel_moves, self.trials)
np.testing.assert_allclose(first, [162.48462599, 105.62562599, np.nan])
np.testing.assert_array_equal(is_final, [False, True, False])
np.testing.assert_array_equal(ind, [46, 18])
[docs]
class TestFpgaTrials(unittest.TestCase):
"""Test FpgaTrials class."""
[docs]
def test_time_fields(self):
"""Test for FpgaTrials._time_fields static method."""
expected = ('intervals', 'fooBar_times_bpod', 'spike_times', 'baz_timestamps')
fields = ephys_fpga.FpgaTrials._time_fields(expected + ('position', 'timebase', 'fooBaz'))
self.assertCountEqual(expected, fields)
[docs]
def test_is_trials_object_attribute(self):
"""Test for FpgaTrials._is_trials_object_attribute method."""
extractor = ephys_fpga.FpgaTrials('subject/2020-01-01/001')
# Should assume this is a trials attribute if no save name defined
self.assertTrue(extractor._is_trials_object_attribute('stimOnTrigger_times'))
# Save name not trials attribute
self.assertFalse(extractor._is_trials_object_attribute('wheel_position'))
# Save name is trials attribute
self.assertTrue(extractor._is_trials_object_attribute('table'))
# Check with toy variables
extractor.var_names += ('foo_bar',)
extractor.save_names += (None,)
self.assertTrue(extractor._is_trials_object_attribute('foo_bar'))
self.assertFalse(extractor._is_trials_object_attribute('foo_bar', variable_length_vars='foo_bar'))
extractor.save_names = extractor.save_names[:-1] + ('_ibl_foo.bar_times.csv',)
self.assertFalse(extractor._is_trials_object_attribute('foo_bar'))
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)