Source code for ibllib.tests.extractors.test_extractors
"""Test trials, wheel and camera extractors."""
import functools
import shutil
import tempfile
import unittest
import unittest.mock
from pathlib import Path
import numpy as np
import pandas as pd
import one.alf.io as alfio
from ibllib.io.extractors import training_trials, biased_trials, camera
from ibllib.io import raw_data_loaders as raw
from ibllib.io.extractors.base import BaseExtractor
[docs]
def wheelMoves_fixture(func):
"""Decorator to save some dummy wheelMoves ALF files for extraction tests"""
@functools.wraps(func)
def wrapper(obj=None):
# Save some wheelMoves ALF files
attr_list = ['training_lt5',
'training_ge5',
'biased_lt5',
'biased_ge5']
alf_paths = [getattr(obj, p)['path'] / 'alf' for p in attr_list]
n_trials = [getattr(obj, p)['ntrials'] for p in attr_list]
for p, n in zip(alf_paths, n_trials):
p.mkdir()
np.save(str(p / '_ibl_wheelMoves.intervals.npy'), np.zeros((n, 2)))
np.save(str(p / '_ibl_wheelMoves.peakAmplitude.npy'), np.zeros(n))
# Run method
func(obj)
# Teardown; delete the files
for p in alf_paths:
shutil.rmtree(p)
return wrapper
[docs]
class TestExtractTrialData(unittest.TestCase):
[docs]
def setUp(self):
self.main_path = Path(__file__).parent
self.training_lt5 = {'path': self.main_path / 'data' / 'session_training_lt5'}
self.biased_lt5 = {'path': self.main_path / 'data' / 'session_biased_lt5'}
self.training_ge5 = {'path': self.main_path / 'data' / 'session_training_ge5'}
self.biased_ge5 = {'path': self.main_path / 'data' / 'session_biased_ge5'}
self.ephys = {'path': self.main_path / 'data' / 'session_ephys'}
self.training_lt5['ntrials'] = len(raw.load_data(self.training_lt5['path']))
self.biased_lt5['ntrials'] = len(raw.load_data(self.biased_lt5['path']))
self.training_ge5['ntrials'] = len(raw.load_data(self.training_ge5['path']))
self.biased_ge5['ntrials'] = len(raw.load_data(self.biased_ge5['path']))
self.ephys['ntrials'] = len(raw.load_data(self.ephys['path']))
# turn off logging for unit testing as we will purposely go into warning/error cases
self.wheel_ge5_path = self.main_path / 'data' / 'wheel_ge5'
self.wheel_lt5_path = self.main_path / 'data' / 'wheel_lt5'
# Save some dummy wheel moves data for trial firstMovement_times extraction
[docs]
def test_get_feedbackType(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackType(
self.training_lt5['path']).extract()[0]
self.assertEqual(ft.size, self.training_lt5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# -- version >= 5.0.0
ft = training_trials.FeedbackType(
self.training_ge5['path']).extract()[0]
self.assertEqual(ft.size, self.training_ge5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# BIASED SESSIONS
ft = biased_trials.FeedbackType(
self.biased_lt5['path']).extract()[0]
self.assertEqual(ft.size, self.biased_lt5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# -- version >= 5.0.0
ft = biased_trials.FeedbackType(
self.biased_ge5['path']).extract()[0]
self.assertEqual(ft.size, self.biased_ge5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
[docs]
def test_get_contrastLR(self):
# TRAINING SESSIONS
cl, cr = training_trials.ContrastLR(
self.training_lt5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# -- version >= 5.0.0
cl, cr = training_trials.ContrastLR(
self.training_ge5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# BIASED SESSIONS
cl, cr = biased_trials.ContrastLR(
self.biased_lt5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# -- version >= 5.0.0
cl, cr = biased_trials.ContrastLR(
self.biased_ge5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
[docs]
def test_get_probabilityLeft(self):
# TRAINING SESSIONS
pl = training_trials.ProbabilityLeft(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# -- version >= 5.0.0
pl = training_trials.ProbabilityLeft(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# BIASED SESSIONS
pl = biased_trials.ProbabilityLeft(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# Test if only probs that are in prob set
md = raw.load_settings(self.biased_lt5['path'])
if md:
probs = md['BLOCK_PROBABILITY_SET']
probs.append(0.5)
self.assertTrue(sum([x in probs for x in pl]) == len(pl))
# -- version >= 5.0.0
pl = biased_trials.ProbabilityLeft(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# Test if only probs that are in prob set
md = raw.load_settings(self.biased_ge5['path'])
probs = md['BLOCK_PROBABILITY_SET']
probs.append(0.5)
self.assertTrue(sum([x in probs for x in pl]) == len(pl))
# EPHYS SESSION
data = raw.load_data(self.ephys['path'])
md = raw.load_settings(self.ephys['path'])
*_, pLeft0, _ = biased_trials.ProbaContrasts(
self.ephys['path']).extract(bpod_trials=data, settings=md)[0]
self.assertEqual(len(pLeft0), self.ephys['ntrials'], 'ephys prob left')
# Test if only generative prob values in data
self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft0)))
# Test if settings file has empty LEN_DATA result is same
md.update({'LEN_BLOCKS': None})
*_, pLeft1, _ = biased_trials.ProbaContrasts(
self.ephys['path']).extract(bpod_trials=data, settings=md)[0]
self.assertTrue(all(pLeft0 == pLeft1))
# Test if only generative prob values in data
self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft1)))
[docs]
def test_get_choice(self):
# TRAINING SESSIONS
choice = training_trials.Choice(
session_path=self.training_lt5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.training_lt5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# -- version >= 5.0.0
choice = training_trials.Choice(
session_path=self.training_ge5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.training_ge5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# BIASED SESSIONS
choice = biased_trials.Choice(
session_path=self.biased_lt5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.biased_lt5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# -- version >= 5.0.0
choice = biased_trials.Choice(
session_path=self.biased_ge5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.biased_ge5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
[docs]
def test_get_repNum(self):
# TRAINING SESSIONS
rn = training_trials.RepNum(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rn, np.ndarray)
expected = [0, 1, 2, 0]
np.testing.assert_array_equal(rn, expected)
# -- version >= 5.0.0
rn = training_trials.RepNum(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rn, np.ndarray)
expected = [0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 0, 1]
np.testing.assert_array_equal(rn, expected)
# BIASED SESSIONS have no repeated trials
[docs]
def test_get_rewardVolume(self):
# TRAINING SESSIONS
rv = training_trials.RewardVolume(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# -- version >= 5.0.0
rv = training_trials.RewardVolume(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# BIASED SESSIONS
rv = biased_trials.RewardVolume(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# Test if all non-zero rewards are of the same value
self.assertTrue(all(x == max(rv) for x in rv if x != 0))
# -- version >= 5.0.0
rv = biased_trials.RewardVolume(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# Test if all non-zero rewards are of the same value
self.assertTrue(all(x == max(rv) for x in rv if x != 0))
[docs]
def test_get_feedback_times_ge5(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
# BIASED SESSIONS
ft = biased_trials.FeedbackTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
[docs]
def test_get_feedback_times_lt5(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
# BIASED SESSIONS
ft = biased_trials.FeedbackTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
[docs]
def test_get_stimOnTrigger_times(self):
# TRAINING SESSIONS
sott = training_trials.StimOnTriggerTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# -- version >= 5.0.0
sott = training_trials.StimOnTriggerTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# BIASED SESSIONS
sott = biased_trials.StimOnTriggerTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# -- version >= 5.0.0
sott = biased_trials.StimOnTriggerTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
[docs]
def test_get_stimOn_times_lt5(self):
# TRAINING SESSIONS
st = training_trials.StimOnTimes_deprecated(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(st, np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnTimes_deprecated(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(st, np.ndarray)
[docs]
def test_get_stimOn_times_ge5(self):
# TRAINING SESSIONS
st = training_trials.StimOnTimes_deprecated(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(st, np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnTimes_deprecated(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(st, np.ndarray)
[docs]
def test_stimOnOffFreeze_times(self):
# TRAINING SESSIONS
st = training_trials.StimOnOffFreezeTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnOffFreezeTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# TRAINING SESSIONS
st = training_trials.StimOnOffFreezeTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnOffFreezeTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
[docs]
def test_get_intervals(self):
# TRAINING SESSIONS
di = training_trials.Intervals(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# -- version >= 5.0.0
di = training_trials.Intervals(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# BIASED SESSIONS
di = biased_trials.Intervals(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# -- version >= 5.0.0
di = biased_trials.Intervals(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
[docs]
def test_get_response_times(self):
# TRAINING SESSIONS
rt = training_trials.ResponseTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# -- version >= 5.0.0
rt = training_trials.ResponseTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# BIASED SESSIONS
rt = biased_trials.ResponseTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# -- version >= 5.0.0
rt = biased_trials.ResponseTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
[docs]
def test_get_goCueTrigger_times(self):
# TRAINING SESSIONS
data = raw.load_data(self.training_lt5['path'])
gct = np.array([tr['behavior_data']['States timestamps']
['closed_loop'][0][0] for tr in data])
self.assertIsInstance(gct, np.ndarray)
# -- version >= 5.0.0
gct = training_trials.GoCueTriggerTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(gct, np.ndarray)
# BIASED SESSIONS
data = raw.load_data(self.biased_lt5['path'])
gct = np.array([tr['behavior_data']['States timestamps']
['closed_loop'][0][0] for tr in data])
self.assertIsInstance(gct, np.ndarray)
# -- version >= 5.0.0
gct = biased_trials.GoCueTriggerTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(gct, np.ndarray)
[docs]
def test_get_goCueOnset_times(self):
# TRAINING SESSIONS
gcot = training_trials.GoCueTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertTrue(np.all(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 4)
# -- version >= 5.0.0
gcot = training_trials.GoCueTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 12)
# BIASED SESSIONS
gcot = biased_trials.GoCueTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 4)
# -- version >= 5.0.0
gcot = biased_trials.GoCueTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 8)
[docs]
def test_get_included_trials_lt5(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_included_trials_ge5(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_included_trials(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0]
self.assertIsInstance(it, np.ndarray)
# -- version >= 5.0.0
it = training_trials.IncludedTrials(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0]
self.assertIsInstance(it, np.ndarray)
# -- version >= 5.0.0
it = biased_trials.IncludedTrials(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_pause_duration(self):
"""Test for extraction of pause periods."""
extractor = training_trials.PauseDuration(self.biased_ge5['path'])
pp = extractor.extract()[0]
self.assertIsInstance(pp, np.ndarray)
# For sessions pre-pause implementation we expect zeros
np.testing.assert_array_equal(pp, np.zeros(self.biased_ge5['ntrials'], dtype=float))
# For v8.9.0 and later it should return duration or NaN
for i, trial in enumerate(extractor.bpod_trials):
trial.update({'pause_duration': i})
pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0]
np.testing.assert_array_equal(pp, np.arange(self.biased_ge5['ntrials']))
extractor.bpod_trials[5].pop('pause_duration')
pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0]
self.assertTrue(np.isnan(pp[5]))
[docs]
@wheelMoves_fixture
def test_extract_all(self):
# TRAINING SESSIONS
# Expect an error raised because no wheel moves were present in test data
with self.assertRaises(ValueError) as ex:
training_trials.extract_all(
self.training_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}, save=True)
self.assertIn('_ibl_wheelMoves.intervals.npy appears to be empty', str(ex.exception))
# -- version >= 5.0.0
out, files = training_trials.extract_all(self.training_ge5['path'], save=True)
self.assertEqual(20, len(out))
self.assertTrue(all(map(Path.exists, files)))
# BIASED SESSIONS
# The new trials extractor additionally extracts the wheel data and this fails for the < 5.0
# test data so we will stub the wheel extractor
with unittest.mock.patch('ibllib.io.extractors.biased_trials.Wheel') as Wheel:
Wheel.var_names = tuple()
Wheel().extract.return_value = ({}, [])
out, files = biased_trials.extract_all(
self.biased_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}, save=True)
self.assertEqual(15, len(out))
self.assertTrue(all(map(Path.exists, files)))
# -- version >= 5.0.0
out, files = biased_trials.extract_all(self.biased_ge5['path'], save=True)
self.assertEqual(19, len(out))
self.assertTrue(all(map(Path.exists, files)))
[docs]
def test_encoder_positions_clock_reset(self):
# TRAINING SESSIONS
# only for training?
path = self.training_lt5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_lt5(path)
dat = np.array([849736, 1532230, 1822449, 1833514, 1841566, 1848206, 1853979, 1859144])
self.assertTrue(np.all(np.diff(dy['re_ts']) > 0))
self.assertTrue(all(dy['re_ts'][6:] - 2 ** 32 - dat == 0))
[docs]
def test_encoder_positions_clock_errors(self):
# here we test for 2 kinds of file corruption that happen
# 1/2 the first sample time is corrupt and absurdly high and should be discarded
# 2/2 2 samples are swapped and need to be swapped backk
path = self.biased_lt5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_lt5(path)
self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0))
# -- version >= 5.0.0
path = self.biased_ge5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_ge5(path)
self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0))
[docs]
def test_wheel_folders(self):
# the wheel folder contains other errors in bpod output that had to be addressed
for wf in self.wheel_lt5_path.glob('_iblrig_encoderPositions*.raw*.ssv'):
df = raw._load_encoder_positions_file_lt5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_lt5_path.glob('_iblrig_encoderEvents*.raw*.ssv'):
df = raw._load_encoder_events_file_lt5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_ge5_path.glob('_iblrig_encoderPositions*.raw*.ssv'):
df = raw._load_encoder_positions_file_ge5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_ge5_path.glob('_iblrig_encoderEvents*.raw*.ssv'):
df = raw._load_encoder_events_file_ge5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
[docs]
def test_load_encoder_positions(self):
raw.load_encoder_positions(self.training_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_positions(self.training_ge5['path'])
raw.load_encoder_positions(self.biased_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_positions(self.biased_ge5['path'])
[docs]
def test_load_encoder_events(self):
raw.load_encoder_events(self.training_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_events(self.training_ge5['path'])
raw.load_encoder_events(self.biased_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_events(self.biased_ge5['path'])
[docs]
def test_size_outputs(self):
# check the output dimensions
# VERSION >= 5.0.0
from ibllib.io.extractors.bpod_trials import extract_all
extract_all(self.training_ge5['path'])
trials = alfio.load_object(self.training_ge5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
extract_all(self.biased_ge5['path'])
trials = alfio.load_object(self.biased_ge5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
# VERSION < 5.0.0
# for these test data there are no wheel moves so let's mock the output
mock_data = {
'intervals': np.array([[0, 1], ]),
'peakAmplitude': np.array([1, 1]),
'peakVelocity_times': np.array([1, 1])}
function_name = 'ibllib.io.extractors.training_wheel.extract_wheel_moves'
# Training
with unittest.mock.patch(function_name, return_value=mock_data):
extract_all(self.training_lt5['path'])
trials = alfio.load_object(self.training_lt5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
# Biased
with unittest.mock.patch(function_name, return_value=mock_data):
extract_all(self.biased_lt5['path'])
trials = alfio.load_object(self.biased_lt5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
[docs]
def tearDown(self):
for f in self.main_path.rglob('_ibl_log.*.log'):
f.unlink()
[x.unlink() for x in self.training_lt5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.biased_lt5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.training_ge5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.biased_ge5['path'].rglob('alf/*') if x.is_file()]
[x.rmdir() for x in self.training_lt5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.biased_lt5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.training_ge5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.biased_ge5['path'].rglob('alf/') if x.is_dir()]
[docs]
class TestSyncWheelBpod(unittest.TestCase):
[docs]
def test_sync_bpod_bonsai_poor_quality_timestamps(self):
sync_trials_robust = raw.sync_trials_robust
drift_pol = np.array([11 * 1e-6, -20]) # bpod starts 20 secs before with 10 ppm drift
np.random.seed(seed=784)
t0_full = np.cumsum(np.random.rand(50)) + .001
t1_full = np.polyval(drift_pol, t0_full) + t0_full
t0 = t0_full.copy()
t1 = t1_full.copy()
t0_, t1_ = sync_trials_robust(t0, t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, t1[:-1])
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, t1[1:])
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0[1:], t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0[:-1], t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, np.delete(t1, 24))
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(np.delete(t0, 12), np.delete(t1, 24))
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
[docs]
class TestWheelLoaders(unittest.TestCase):
[docs]
def test_encoder_events_corrupt(self):
path = self.main_path.joinpath('data', 'wheel', 'lt5')
for file_events in path.rglob('_iblrig_encoderEvents.raw.*'):
dy = raw._load_encoder_events_file_lt5(file_events)
self.assertTrue(dy.size > 6)
path = self.main_path.joinpath('data', 'wheel', 'ge5')
for file_events in path.rglob('_iblrig_encoderEvents.raw.*'):
dy = raw._load_encoder_events_file_ge5(file_events)
self.assertTrue(dy.size > 6)
[docs]
def test_encoder_positions_corrupts(self):
path = self.main_path.joinpath('data', 'wheel', 'ge5')
for file_position in path.rglob('_iblrig_encoderPositions.raw.*'):
dy = raw._load_encoder_positions_file_ge5(file_position)
self.assertTrue(dy.size > 18)
path = self.main_path.joinpath('data', 'wheel', 'lt5')
for file_position in path.rglob('_iblrig_encoderPositions.raw.*'):
dy = raw._load_encoder_positions_file_lt5(file_position)
self.assertTrue(dy.size > 18)
[docs]
class MockExtracor(BaseExtractor):
save_names = (
"some_file.csv",
"some_file.tsv",
"some_file.ssv",
"some_file.npy",
)
var_names = (
"csv",
"ssv",
"tsv",
"npy",
)
def _extract(self, **kwargs) -> tuple:
csv = pd.DataFrame([1, 2, 3])
ssv = pd.DataFrame([1, 2, 3])
tsv = pd.DataFrame([1, 2, 3])
npy = np.array([1, 2, 3])
return (csv, ssv, tsv, npy)
[docs]
class TestBaseExtractorSavingMethods(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.tempdir = tempfile.TemporaryDirectory()
self.session_path = self.tempdir.name
# self.addClassCleanup(tempdir.cleanup) # py3.8
self.mock_extractor = MockExtracor(self.session_path)
[docs]
def test_saving_method(self):
data, paths = self.mock_extractor.extract(save=True)
self.assertTrue(all([x.exists() for x in paths]))
[docs]
class TestCameraExtractors(unittest.TestCase):
[docs]
def test_groom_pin_state(self):
# UNIT DATA
fps = 60
t_offset = 39.4
ts = np.arange(0, 10, 1 / fps) + t_offset
# Add drift
ts += np.full_like(ts, 1e-4).cumsum()
n_pulses = 2
pulse_width = 0.3
duty = 0.5
gpio = {'indices': np.empty(n_pulses * 2, dtype=np.int32),
'polarities': np.ones(n_pulses * 2, dtype=np.int32)}
gpio['polarities'][1::2] = -1
aud_offset = 40.
audio = {'times': np.empty(n_pulses * 2),
'polarities': gpio['polarities']}
for p in range(n_pulses):
i = p * 2
rise = (pulse_width * p) + duty * p + 1
audio['times'][i] = aud_offset + rise
audio['times'][i + 1] = audio['times'][i] + pulse_width
rise += t_offset
gpio['indices'][i] = np.where(ts > rise)[0][0]
gpio['indices'][i + 1] = np.where(ts > rise + pulse_width)[0][0]
gpio_, audio_, ts_ = camera.groom_pin_state(gpio, audio, ts)
self.assertEqual(audio, audio_, 'Audio dict shouldn\'t be effected')
np.testing.assert_array_almost_equal(ts_[:4], [40., 40.016667, 40.033333, 40.05])
# Broken TTLs + extra TTL
delay = 0.08
pulse_width = 1e-5
t = audio['times'][0] + delay
audio['times'] = np.sort(np.append(audio['times'], [t, t + pulse_width, 80]))
audio['polarities'] = np.ones(audio['times'].shape, dtype=np.int32)
audio['polarities'][1::2] = -1
gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, min_diff=5e-3)
self.assertTrue(audio_['times'].size == gpio_['times'].size == 4)
# One front shifted by a large amount
audio['times'][4] -= 0.3
gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, tolerance=.1, min_diff=5e-3)
self.assertTrue(np.all(gpio_['times'] == audio_['times']))
self.assertTrue(np.all(gpio_['times'] == np.array([41., 41.3])))
[docs]
def test_attribute_times(self, display=False):
# Create two timestamp arrays at two different frequencies
tsa = np.linspace(0, 60, 60 * 4)[:60] # 240bpm
tsb = np.linspace(0, 60, 60 * 3)[:45] # 180bpm
tsa = np.sort(np.append(tsa, .4)) # Add ambiguous front
tsb = np.sort(np.append(tsb, .41))
if display:
from ibllib.plots import vertical_lines
import matplotlib.pyplot as plt
vertical_lines(tsb, linestyle=':', color='r', label='tsb')
vertical_lines(tsa, linestyle=':', color='b', label='tsa')
plt.legend()
# Check with default args
matches = camera.attribute_times(tsa, tsb)
expected = np.array(
[0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21,
22, 24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44,
45, 46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60]
)
np.testing.assert_array_equal(matches, expected)
self.assertEqual(matches.size, tsb.size)
# Taking closest instead of first should change index of ambiguous front
matches = camera.attribute_times(tsa, tsb, take='nearest')
expected[np.r_[1:3]] = expected[1:3] + 1
np.testing.assert_array_equal(matches, expected)
# Taking first after should exclude many pulses
matches = camera.attribute_times(tsa, tsb, take='after')
missing = [0, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20,
22, 23, 25, 26, 28, 29, 31, 32, 34, 35, 37, 40, 43]
expected[missing] = -1
np.testing.assert_array_equal(matches, expected)
# Lower tolerance
matches = camera.attribute_times(tsa, tsb, tol=0.05)
expected = np.array([0, 2, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57])
np.testing.assert_array_equal(matches[matches > -1], expected)
# Remove injective assert
matches = camera.attribute_times(tsa, tsb, injective=False, take='nearest')
expected = np.array(
[0, 2, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21, 22,
24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44, 45,
46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60]
)
np.testing.assert_array_equal(matches, expected)
# Check input validation
with self.assertRaises(ValueError):
camera.attribute_times(tsa, tsb, injective=False, take='closest')
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)