Source code for ibllib.tests.extractors.test_extractors
"""Test trials, wheel and camera extractors."""
import functools
import shutil
import tempfile
import unittest
from unittest.mock import patch, Mock, MagicMock
from pathlib import Path
import numpy as np
import pandas as pd
import one.alf.io as alfio
from ibllib.io.extractors.bpod_trials import get_bpod_extractor
from ibllib.io.extractors import training_trials, biased_trials, camera
from ibllib.io import raw_data_loaders as raw
from ibllib.io.extractors.base import BaseExtractor
from ibllib.pipes.dynamic_pipeline import get_trials_tasks
[docs]
def wheelMoves_fixture(func):
"""Decorator to save some dummy wheelMoves ALF files for extraction tests"""
@functools.wraps(func)
def wrapper(obj=None):
# Save some wheelMoves ALF files
attr_list = ['training_lt5',
'training_ge5',
'biased_lt5',
'biased_ge5']
alf_paths = [getattr(obj, p)['path'] / 'alf' for p in attr_list]
n_trials = [getattr(obj, p)['ntrials'] for p in attr_list]
for p, n in zip(alf_paths, n_trials):
p.mkdir()
np.save(str(p / '_ibl_wheelMoves.intervals.npy'), np.zeros((n, 2)))
np.save(str(p / '_ibl_wheelMoves.peakAmplitude.npy'), np.zeros(n))
# Run method
func(obj)
# Teardown; delete the files
for p in alf_paths:
shutil.rmtree(p)
return wrapper
[docs]
class TestExtractTrialData(unittest.TestCase):
[docs]
def setUp(self):
self.main_path = Path(__file__).parent
self.training_lt5 = {'path': self.main_path / 'data' / 'session_training_lt5'}
self.biased_lt5 = {'path': self.main_path / 'data' / 'session_biased_lt5'}
self.training_ge5 = {'path': self.main_path / 'data' / 'session_training_ge5'}
self.biased_ge5 = {'path': self.main_path / 'data' / 'session_biased_ge5'}
self.ephys = {'path': self.main_path / 'data' / 'session_ephys'}
self.training_lt5['ntrials'] = len(raw.load_data(self.training_lt5['path']))
self.biased_lt5['ntrials'] = len(raw.load_data(self.biased_lt5['path']))
self.training_ge5['ntrials'] = len(raw.load_data(self.training_ge5['path']))
self.biased_ge5['ntrials'] = len(raw.load_data(self.biased_ge5['path']))
self.ephys['ntrials'] = len(raw.load_data(self.ephys['path']))
# turn off logging for unit testing as we will purposely go into warning/error cases
self.wheel_ge5_path = self.main_path / 'data' / 'wheel_ge5'
self.wheel_lt5_path = self.main_path / 'data' / 'wheel_lt5'
# Save some dummy wheel moves data for trial firstMovement_times extraction
[docs]
def test_get_feedbackType(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackType(
self.training_lt5['path']).extract()[0]
self.assertEqual(ft.size, self.training_lt5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# -- version >= 5.0.0
ft = training_trials.FeedbackType(
self.training_ge5['path']).extract()[0]
self.assertEqual(ft.size, self.training_ge5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# BIASED SESSIONS
ft = biased_trials.FeedbackType(
self.biased_lt5['path']).extract()[0]
self.assertEqual(ft.size, self.biased_lt5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
# -- version >= 5.0.0
ft = biased_trials.FeedbackType(
self.biased_ge5['path']).extract()[0]
self.assertEqual(ft.size, self.biased_ge5['ntrials'])
# check if no 0's in feedbackTypes
self.assertFalse(ft[ft == 0].size > 0)
[docs]
def test_get_contrastLR(self):
# TRAINING SESSIONS
cl, cr = training_trials.ContrastLR(
self.training_lt5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# -- version >= 5.0.0
cl, cr = training_trials.ContrastLR(
self.training_ge5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# BIASED SESSIONS
cl, cr = biased_trials.ContrastLR(
self.biased_lt5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
# -- version >= 5.0.0
cl, cr = biased_trials.ContrastLR(
self.biased_ge5['path']).extract()[0]
self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)]))
self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)]))
self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl))
self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
[docs]
def test_get_probabilityLeft(self):
# TRAINING SESSIONS
pl = training_trials.ProbabilityLeft(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# -- version >= 5.0.0
pl = training_trials.ProbabilityLeft(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# BIASED SESSIONS
pl = biased_trials.ProbabilityLeft(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# Test if only probs that are in prob set
md = raw.load_settings(self.biased_lt5['path'])
if md:
probs = md['BLOCK_PROBABILITY_SET']
probs.append(0.5)
self.assertTrue(sum([x in probs for x in pl]) == len(pl))
# -- version >= 5.0.0
pl = biased_trials.ProbabilityLeft(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(pl, np.ndarray)
# Test if only probs that are in prob set
md = raw.load_settings(self.biased_ge5['path'])
probs = md['BLOCK_PROBABILITY_SET']
probs.append(0.5)
self.assertTrue(sum([x in probs for x in pl]) == len(pl))
# EPHYS SESSION
data = raw.load_data(self.ephys['path'])
md = raw.load_settings(self.ephys['path'])
*_, pLeft0, _ = biased_trials.ProbaContrasts(
self.ephys['path']).extract(bpod_trials=data, settings=md)[0]
self.assertEqual(len(pLeft0), self.ephys['ntrials'], 'ephys prob left')
# Test if only generative prob values in data
self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft0)))
# Test if settings file has empty LEN_DATA result is same
md.update({'LEN_BLOCKS': None})
*_, pLeft1, _ = biased_trials.ProbaContrasts(
self.ephys['path']).extract(bpod_trials=data, settings=md)[0]
self.assertTrue(all(pLeft0 == pLeft1))
# Test if only generative prob values in data
self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft1)))
[docs]
def test_get_choice(self):
# TRAINING SESSIONS
choice = training_trials.Choice(
session_path=self.training_lt5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.training_lt5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# -- version >= 5.0.0
choice = training_trials.Choice(
session_path=self.training_ge5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.training_ge5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# BIASED SESSIONS
choice = biased_trials.Choice(
session_path=self.biased_lt5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.biased_lt5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
# -- version >= 5.0.0
choice = biased_trials.Choice(
session_path=self.biased_ge5['path']).extract(save=False)[0]
self.assertIsInstance(choice, np.ndarray)
data = raw.load_data(self.biased_ge5['path'])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in data])
if any(trial_nogo):
self.assertTrue(all(choice[trial_nogo]) == 0)
[docs]
def test_get_repNum(self):
# TRAINING SESSIONS
rn = training_trials.RepNum(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rn, np.ndarray)
expected = [0, 1, 2, 0]
np.testing.assert_array_equal(rn, expected)
# -- version >= 5.0.0
rn = training_trials.RepNum(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rn, np.ndarray)
expected = [0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 0, 1]
np.testing.assert_array_equal(rn, expected)
# BIASED SESSIONS have no repeated trials
[docs]
def test_get_rewardVolume(self):
# TRAINING SESSIONS
rv = training_trials.RewardVolume(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# -- version >= 5.0.0
rv = training_trials.RewardVolume(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# BIASED SESSIONS
rv = biased_trials.RewardVolume(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# Test if all non-zero rewards are of the same value
self.assertTrue(all(x == max(rv) for x in rv if x != 0))
# -- version >= 5.0.0
rv = biased_trials.RewardVolume(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(rv, np.ndarray)
# Test if all non-zero rewards are of the same value
self.assertTrue(all(x == max(rv) for x in rv if x != 0))
[docs]
def test_get_feedback_times_ge5(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
# BIASED SESSIONS
ft = biased_trials.FeedbackTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
[docs]
def test_get_feedback_times_lt5(self):
# TRAINING SESSIONS
ft = training_trials.FeedbackTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
# BIASED SESSIONS
ft = biased_trials.FeedbackTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(ft, np.ndarray)
[docs]
def test_get_stimOnTrigger_times(self):
# TRAINING SESSIONS
sott = training_trials.StimOnTriggerTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# -- version >= 5.0.0
sott = training_trials.StimOnTriggerTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# BIASED SESSIONS
sott = biased_trials.StimOnTriggerTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
# -- version >= 5.0.0
sott = biased_trials.StimOnTriggerTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(sott, np.ndarray)
[docs]
def test_stimOnOffFreeze_times(self):
# TRAINING SESSIONS
st = training_trials.StimOnOffFreezeTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnOffFreezeTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# TRAINING SESSIONS
st = training_trials.StimOnOffFreezeTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
# BIASED SESSIONS
st = biased_trials.StimOnOffFreezeTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(st[0], np.ndarray)
[docs]
def test_get_intervals(self):
# TRAINING SESSIONS
di = training_trials.Intervals(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# -- version >= 5.0.0
di = training_trials.Intervals(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# BIASED SESSIONS
di = biased_trials.Intervals(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
# -- version >= 5.0.0
di = biased_trials.Intervals(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(di, np.ndarray)
self.assertFalse(np.isnan(di).all())
[docs]
def test_get_response_times(self):
# TRAINING SESSIONS
rt = training_trials.ResponseTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# -- version >= 5.0.0
rt = training_trials.ResponseTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# BIASED SESSIONS
rt = biased_trials.ResponseTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
# -- version >= 5.0.0
rt = biased_trials.ResponseTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(rt, np.ndarray)
[docs]
def test_get_goCueTrigger_times(self):
# TRAINING SESSIONS
data = raw.load_data(self.training_lt5['path'])
gct = np.array([tr['behavior_data']['States timestamps']
['closed_loop'][0][0] for tr in data])
self.assertIsInstance(gct, np.ndarray)
# -- version >= 5.0.0
gct = training_trials.GoCueTriggerTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(gct, np.ndarray)
# BIASED SESSIONS
data = raw.load_data(self.biased_lt5['path'])
gct = np.array([tr['behavior_data']['States timestamps']
['closed_loop'][0][0] for tr in data])
self.assertIsInstance(gct, np.ndarray)
# -- version >= 5.0.0
gct = biased_trials.GoCueTriggerTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(gct, np.ndarray)
[docs]
def test_get_goCueOnset_times(self):
# TRAINING SESSIONS
gcot = training_trials.GoCueTimes(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertTrue(np.all(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 4)
# -- version >= 5.0.0
gcot = training_trials.GoCueTimes(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 12)
# BIASED SESSIONS
gcot = biased_trials.GoCueTimes(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 4)
# -- version >= 5.0.0
gcot = biased_trials.GoCueTimes(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(gcot, np.ndarray)
self.assertFalse(np.any(np.isnan(gcot)))
self.assertTrue(gcot.size != 0 or gcot.size == 8)
[docs]
def test_get_included_trials_lt5(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_lt5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_lt5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_included_trials_ge5(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_included_trials(self):
# TRAINING SESSIONS
it = training_trials.IncludedTrials(
self.training_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0]
self.assertIsInstance(it, np.ndarray)
# -- version >= 5.0.0
it = training_trials.IncludedTrials(
self.training_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
# BIASED SESSIONS
it = biased_trials.IncludedTrials(
self.biased_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0]
self.assertIsInstance(it, np.ndarray)
# -- version >= 5.0.0
it = biased_trials.IncludedTrials(
self.biased_ge5['path']).extract()[0]
self.assertIsInstance(it, np.ndarray)
[docs]
def test_get_pause_duration(self):
"""Test for extraction of pause periods."""
extractor = training_trials.PauseDuration(self.biased_ge5['path'])
pp = extractor.extract()[0]
self.assertIsInstance(pp, np.ndarray)
# For sessions pre-pause implementation we expect zeros
np.testing.assert_array_equal(pp, np.zeros(self.biased_ge5['ntrials'], dtype=float))
# For v8.9.0 and later it should return duration or NaN
for i, trial in enumerate(extractor.bpod_trials):
trial.update({'pause_duration': i})
pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0]
np.testing.assert_array_equal(pp, np.arange(self.biased_ge5['ntrials']))
extractor.bpod_trials[5].pop('pause_duration')
pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0]
self.assertTrue(np.isnan(pp[5]))
[docs]
def test_encoder_positions_clock_reset(self):
# TRAINING SESSIONS
# only for training?
path = self.training_lt5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_lt5(path)
dat = np.array([849736, 1532230, 1822449, 1833514, 1841566, 1848206, 1853979, 1859144])
self.assertTrue(np.all(np.diff(dy['re_ts']) > 0))
self.assertTrue(all(dy['re_ts'][6:] - 2 ** 32 - dat == 0))
[docs]
def test_encoder_positions_clock_errors(self):
# here we test for 2 kinds of file corruption that happen
# 1/2 the first sample time is corrupt and absurdly high and should be discarded
# 2/2 2 samples are swapped and need to be swapped backk
path = self.biased_lt5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_lt5(path)
self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0))
# -- version >= 5.0.0
path = self.biased_ge5['path'] / "raw_behavior_data"
path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None)
dy = raw._load_encoder_positions_file_ge5(path)
self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0))
[docs]
def test_wheel_folders(self):
# the wheel folder contains other errors in bpod output that had to be addressed
for wf in self.wheel_lt5_path.glob('_iblrig_encoderPositions*.raw*.ssv'):
df = raw._load_encoder_positions_file_lt5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_lt5_path.glob('_iblrig_encoderEvents*.raw*.ssv'):
df = raw._load_encoder_events_file_lt5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_ge5_path.glob('_iblrig_encoderPositions*.raw*.ssv'):
df = raw._load_encoder_positions_file_ge5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
for wf in self.wheel_ge5_path.glob('_iblrig_encoderEvents*.raw*.ssv'):
df = raw._load_encoder_events_file_ge5(wf)
self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
[docs]
def test_load_encoder_positions(self):
raw.load_encoder_positions(self.training_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_positions(self.training_ge5['path'])
raw.load_encoder_positions(self.biased_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_positions(self.biased_ge5['path'])
[docs]
def test_load_encoder_events(self):
raw.load_encoder_events(self.training_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_events(self.training_ge5['path'])
raw.load_encoder_events(self.biased_lt5['path'],
settings={'IBLRIG_VERSION': '4.9.9'})
raw.load_encoder_events(self.biased_ge5['path'])
[docs]
def test_size_outputs(self):
# check the output dimensions
# VERSION >= 5.0.0
task, = get_trials_tasks(self.training_ge5['path'])
trials, _ = task.extract_behaviour(save=True)
# Load from file (returned `trials` above includes wheel data)
trials = alfio.load_object(self.training_ge5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
task, = get_trials_tasks(self.biased_ge5['path'])
trials, _ = task.extract_behaviour(save=True)
trials = alfio.load_object(self.biased_ge5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
# VERSION < 5.0.0
# for these test data there are no wheel moves so let's mock the output
mock_data = {
'intervals': np.array([[0, 1], ]),
'peakAmplitude': np.array([1, 1]),
'peakVelocity_times': np.array([1, 1])}
function_name = 'ibllib.io.extractors.training_wheel.extract_wheel_moves'
# Training
with patch(function_name, return_value=mock_data):
task, = get_trials_tasks(self.training_lt5['path'])
trials, _ = task.extract_behaviour(save=True)
trials = alfio.load_object(self.training_lt5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
# Biased
with patch(function_name, return_value=mock_data):
task, = get_trials_tasks(self.biased_lt5['path'])
trials, _ = task.extract_behaviour(save=True)
trials = alfio.load_object(self.biased_lt5['path'] / 'alf', object='trials')
self.assertTrue(alfio.check_dimensions(trials) == 0)
[docs]
def tearDown(self):
for f in self.main_path.rglob('_ibl_log.*.log'):
f.unlink()
[x.unlink() for x in self.training_lt5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.biased_lt5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.training_ge5['path'].rglob('alf/*') if x.is_file()]
[x.unlink() for x in self.biased_ge5['path'].rglob('alf/*') if x.is_file()]
[x.rmdir() for x in self.training_lt5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.biased_lt5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.training_ge5['path'].rglob('alf/') if x.is_dir()]
[x.rmdir() for x in self.biased_ge5['path'].rglob('alf/') if x.is_dir()]
[docs]
class TestSyncWheelBpod(unittest.TestCase):
[docs]
def test_sync_bpod_bonsai_poor_quality_timestamps(self):
sync_trials_robust = raw.sync_trials_robust
drift_pol = np.array([11 * 1e-6, -20]) # bpod starts 20 secs before with 10 ppm drift
np.random.seed(seed=784)
t0_full = np.cumsum(np.random.rand(50)) + .001
t1_full = np.polyval(drift_pol, t0_full) + t0_full
t0 = t0_full.copy()
t1 = t1_full.copy()
t0_, t1_ = sync_trials_robust(t0, t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, t1[:-1])
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, t1[1:])
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0[1:], t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0[:-1], t1)
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(t0, np.delete(t1, 24))
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
t0_, t1_ = sync_trials_robust(np.delete(t0, 12), np.delete(t1, 24))
assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
[docs]
class TestWheelLoaders(unittest.TestCase):
[docs]
def test_encoder_events_corrupt(self):
path = self.main_path.joinpath('data', 'wheel', 'lt5')
for file_events in path.rglob('_iblrig_encoderEvents.raw.*'):
dy = raw._load_encoder_events_file_lt5(file_events)
self.assertTrue(dy.size > 6)
path = self.main_path.joinpath('data', 'wheel', 'ge5')
for file_events in path.rglob('_iblrig_encoderEvents.raw.*'):
dy = raw._load_encoder_events_file_ge5(file_events)
self.assertTrue(dy.size > 6)
[docs]
def test_encoder_positions_corrupts(self):
path = self.main_path.joinpath('data', 'wheel', 'ge5')
for file_position in path.rglob('_iblrig_encoderPositions.raw.*'):
dy = raw._load_encoder_positions_file_ge5(file_position)
self.assertTrue(dy.size > 18)
path = self.main_path.joinpath('data', 'wheel', 'lt5')
for file_position in path.rglob('_iblrig_encoderPositions.raw.*'):
dy = raw._load_encoder_positions_file_lt5(file_position)
self.assertTrue(dy.size > 18)
[docs]
class MockExtracor(BaseExtractor):
save_names = (
"some_file.csv",
"some_file.tsv",
"some_file.ssv",
"some_file.npy",
)
var_names = (
"csv",
"ssv",
"tsv",
"npy",
)
def _extract(self, **kwargs) -> tuple:
csv = pd.DataFrame([1, 2, 3])
ssv = pd.DataFrame([1, 2, 3])
tsv = pd.DataFrame([1, 2, 3])
npy = np.array([1, 2, 3])
return (csv, ssv, tsv, npy)
[docs]
class TestBaseExtractorSavingMethods(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.tempdir = tempfile.TemporaryDirectory()
self.session_path = self.tempdir.name
# self.addClassCleanup(tempdir.cleanup) # py3.8
self.mock_extractor = MockExtracor(self.session_path)
[docs]
def test_saving_method(self):
data, paths = self.mock_extractor.extract(save=True)
self.assertTrue(all([x.exists() for x in paths]))
[docs]
class TestCameraExtractors(unittest.TestCase):
[docs]
def test_groom_pin_state(self):
# UNIT DATA
fps = 60
t_offset = 39.4
ts = np.arange(0, 10, 1 / fps) + t_offset
# Add drift
ts += np.full_like(ts, 1e-4).cumsum()
n_pulses = 2
pulse_width = 0.3
duty = 0.5
gpio = {'indices': np.empty(n_pulses * 2, dtype=np.int32),
'polarities': np.ones(n_pulses * 2, dtype=np.int32)}
gpio['polarities'][1::2] = -1
aud_offset = 40.
audio = {'times': np.empty(n_pulses * 2),
'polarities': gpio['polarities']}
for p in range(n_pulses):
i = p * 2
rise = (pulse_width * p) + duty * p + 1
audio['times'][i] = aud_offset + rise
audio['times'][i + 1] = audio['times'][i] + pulse_width
rise += t_offset
gpio['indices'][i] = np.where(ts > rise)[0][0]
gpio['indices'][i + 1] = np.where(ts > rise + pulse_width)[0][0]
gpio_, audio_, ts_ = camera.groom_pin_state(gpio, audio, ts)
self.assertEqual(audio, audio_, 'Audio dict shouldn\'t be affected')
np.testing.assert_array_almost_equal(ts_[:4], [40., 40.016667, 40.033333, 40.05])
# Broken TTLs + extra TTL
delay = 0.08
pulse_width = 1e-5
t = audio['times'][0] + delay
audio['times'] = np.sort(np.append(audio['times'], [t, t + pulse_width, 80]))
audio['polarities'] = np.ones(audio['times'].shape, dtype=np.int32)
audio['polarities'][1::2] = -1
gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, min_diff=5e-3)
self.assertTrue(audio_['times'].size == gpio_['times'].size == 4)
# One front shifted by a large amount
audio['times'][4] -= 0.3
gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, tolerance=.1, min_diff=5e-3)
self.assertTrue(np.all(gpio_['times'] == audio_['times']))
self.assertTrue(np.all(gpio_['times'] == np.array([41., 41.3])))
[docs]
def test_attribute_times(self, display=False):
# Create two timestamp arrays at two different frequencies
tsa = np.linspace(0, 60, 60 * 4)[:60] # 240bpm
tsb = np.linspace(0, 60, 60 * 3)[:45] # 180bpm
tsa = np.sort(np.append(tsa, .4)) # Add ambiguous front
tsb = np.sort(np.append(tsb, .41))
if display:
from ibllib.plots import vertical_lines
import matplotlib.pyplot as plt
vertical_lines(tsb, linestyle=':', color='r', label='tsb')
vertical_lines(tsa, linestyle=':', color='b', label='tsa')
plt.legend()
# Check with default args
matches = camera.attribute_times(tsa, tsb)
expected = np.array(
[0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21,
22, 24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44,
45, 46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60]
)
np.testing.assert_array_equal(matches, expected)
self.assertEqual(matches.size, tsb.size)
# Taking closest instead of first should change index of ambiguous front
matches = camera.attribute_times(tsa, tsb, take='nearest')
expected[np.r_[1:3]] = expected[1:3] + 1
np.testing.assert_array_equal(matches, expected)
# Taking first after should exclude many pulses
matches = camera.attribute_times(tsa, tsb, take='after')
missing = [0, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20,
22, 23, 25, 26, 28, 29, 31, 32, 34, 35, 37, 40, 43]
expected[missing] = -1
np.testing.assert_array_equal(matches, expected)
# Lower tolerance
matches = camera.attribute_times(tsa, tsb, tol=0.05)
expected = np.array([0, 2, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57])
np.testing.assert_array_equal(matches[matches > -1], expected)
# Remove injective assert
matches = camera.attribute_times(tsa, tsb, injective=False, take='nearest')
expected = np.array(
[0, 2, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21, 22,
24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44, 45,
46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60]
)
np.testing.assert_array_equal(matches, expected)
# Check input validation
with self.assertRaises(ValueError):
camera.attribute_times(tsa, tsb, injective=False, take='closest')
[docs]
class TestGetBpodExtractor(unittest.TestCase):
[docs]
def test_get_bpod_extractor(self):
# un-existing extractor should raise a value error
with self.assertRaises(ValueError):
get_bpod_extractor('', protocol='sdf', task_collection='raw_behavior_data')
# in this case this returns an ibllib.io.extractors.training_trials.TrainingTrials instance
extractor = get_bpod_extractor(
'', protocol='_trainingChoiceWorld',
task_collection='raw_behavior_data'
)
self.assertTrue(isinstance(extractor, BaseExtractor))
[docs]
def test_get_bpod_custom_extractor(self):
# here we'll mock a custom module with a custom extractor
DummyModule = MagicMock()
DummyExtractor = Mock(spec_set=BaseExtractor)
DummyModule.toto.return_value = DummyExtractor
base_module = 'ibllib.io.extractors.bpod_trials'
with patch(f'{base_module}.get_bpod_extractor_class', return_value='toto'), \
patch(f'{base_module}.importlib.import_module', return_value=DummyModule) as import_mock:
self.assertIs(get_bpod_extractor(''), DummyExtractor)
import_mock.assert_called_with('projects')
# Check raises when imported class not an extractor
DummyModule.toto.return_value = MagicMock(spec=dict)
self.assertRaisesRegex(ValueError, 'should be an Extractor class', get_bpod_extractor, '')
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)