Source code for ibllib.tests.extractors.test_ephys_trials
import unittest
from pathlib import Path
import numpy as np
from ibllib.io.extractors import ephys_fpga, biased_trials
import ibllib.io.raw_data_loaders as raw
[docs]class TestEphysSyncExtraction(unittest.TestCase):
[docs] def test_bpod_trace_extraction(self):
t_valve_open_ = np.array([117.12136667, 122.3873, 127.82903333, 140.56083333,
143.55326667, 155.29713333, 164.9186, 167.91133333,
171.39736667, 178.0305, 181.70343333])
t_trial_start_ = np.array([109.7647, 118.51416667, 123.7964, 129.24503333,
132.97976667, 136.8624, 141.95523333, 144.93636667,
149.5042, 153.08273333, 156.70316667, 164.0096,
166.30633333, 169.28373333, 172.77786667, 176.7828,
179.41063333])
t_trial_start_[0] = 6.75033333 # rising front for first trial instead of falling
bpod_fronts_ = np.array([1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1.,
-1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1., 1., -1.,
1., -1., 1., -1., 1., -1.])
bpod_times_ = np.array([6.75033333, 109.7648, 117.12136667, 117.27136667,
118.51416667, 118.51426667, 122.3873, 122.5373,
123.7964, 123.7965, 127.82903333, 127.97903333,
129.24503333, 129.24513333, 132.97976667, 132.97986667,
136.8624, 136.8625, 140.56083333, 140.71083333,
141.95523333, 141.95533333, 143.55326667, 143.70326667,
144.93636667, 144.93646667, 149.5042, 149.5043,
153.08273333, 153.08283333, 155.29713333, 155.44713333,
156.70316667, 156.70326667, 164.0096, 164.0097,
164.9186, 165.0686, 166.30633333, 166.30643333,
167.91133333, 168.06133333, 169.28373333, 169.28386667,
171.39736667, 171.54736667, 172.77786667, 172.77796667,
176.7828, 176.7829, 178.0305, 178.1805,
179.41063333, 179.41073333, 181.70343333, 181.85343333,
183.12896667, 183.12906667])
t_trial_start, t_valve_open, _ = ephys_fpga._assign_events_bpod(bpod_times_,
bpod_fronts_)
self.assertTrue(np.all(np.isclose(t_trial_start, t_trial_start_)))
self.assertTrue(np.all(np.isclose(t_valve_open, t_valve_open_)))
[docs] def test_align_to_trial(self):
# simple test with one missing at the end
t_trial_start = np.arange(0, 5) * 10
t_event = np.arange(0, 5) * 10 + 2
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
self.assertTrue(np.allclose(t_event_nans, t_event, equal_nan=True, atol=0, rtol=0))
# test with missing values
t_trial_start = np.array([109, 118, 123, 129, 132, 136, 141, 144, 149, 153])
t_event = np.array([122, 133, 140, 143, 146, 150, 154])
t_event_out_ = np.array([np.nan, 122, np.nan, np.nan, 133, 140, 143, 146, 150, 154])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
self.assertTrue(np.allclose(t_event_out_, t_event_nans, equal_nan=True, atol=0, rtol=0))
# test with events before initial start trial
t_trial_start = np.arange(0, 5) * 10
t_event = np.arange(0, 5) * 10 - 2
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
desired_out = np.array([8., 18., 28., 38., np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# test with several events per trial, missing events and events before
t_trial_start = np.array([0, 10, 20, 30, 40])
t_event = np.array([-1, 2, 4, 12, 35, 42])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event)
desired_out = np.array([4, 12., np.nan, 35, 42])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# same test above but this time take the first index instead of last
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take='first')
desired_out = np.array([2, 12., np.nan, 35, 42])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
# take second to last
t_trial_start = np.array([0, 10, 20, 30, 40])
t_event = np.array([2, 4, 12, 13, 14, 21, 32, 33, 34, 35, 42])
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take=-2)
desired_out = np.array([2, 13, np.nan, 34, np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
t_event_nans = ephys_fpga._assign_events_to_trial(t_trial_start, t_event, take=1)
desired_out = np.array([4, 13, np.nan, 33, np.nan])
self.assertTrue(np.allclose(desired_out, t_event_nans, equal_nan=True, atol=0, rtol=0))
[docs] def test_wheel_trace_from_sync(self):
pos_ = - np.array([-1, 0, -1, -2, -1, -2]) * (np.pi / ephys_fpga.WHEEL_TICKS)
ta = np.array([1, 2, 3, 4, 5, 6])
tb = np.array([0.5, 3.2, 3.3, 3.4, 5.25, 5.5])
pa = (np.mod(np.arange(6), 2) - 0.5) * 2
pb = (np.mod(np.arange(6) + 1, 2) - .5) * 2
t, pos = ephys_fpga._rotary_encoder_positions_from_fronts(ta, pa, tb, pb, coding='x2')
self.assertTrue(np.all(np.isclose(pos_, pos)))
pos_ = - np.array([-1, 0, -1, 0, -1, -2]) * (np.pi / ephys_fpga.WHEEL_TICKS)
tb = np.array([0.5, 3.2, 3.4, 5.25])
pb = (np.mod(np.arange(4) + 1, 2) - .5) * 2
t, pos = ephys_fpga._rotary_encoder_positions_from_fronts(ta, pa, tb, pb, coding='x2')
self.assertTrue(np.all(np.isclose(pos_, pos)))
[docs]class TestEphysBehaviorExtraction(unittest.TestCase):
[docs] def test_get_probabilityLeft(self):
data = raw.load_data(self.session_path)
settings = raw.load_settings(self.session_path)
*_, pLeft0, _ = biased_trials.ProbaContrasts(
self.session_path).extract(bpod_trials=data, settings=settings)[0]
self.assertTrue(len(pLeft0) == len(data))
# Test if only generative prob values in data
self.assertTrue(all([x in [0.2, 0.5, 0.8] for x in np.unique(pLeft0)]))
# Test if settings file has empty LEN_DATA result is same
settings.update({"LEN_BLOCKS": None})
*_, pLeft1, _ = biased_trials.ProbaContrasts(
self.session_path).extract(bpod_trials=data, settings=settings)[0]
self.assertTrue(all(pLeft0 == pLeft1))
# Test if only generative prob values in data
self.assertTrue(all([x in [0.2, 0.5, 0.8] for x in np.unique(pLeft1)]))
if __name__ == "__main__":
unittest.main(exit=False, verbosity=2)