Source code for ibllib.tests.extractors.test_extractors

"""Test trials, wheel and camera extractors."""
import functools
import shutil
import tempfile
import unittest
from unittest.mock import patch, Mock, MagicMock
from pathlib import Path

import numpy as np
import pandas as pd

import one.alf.io as alfio
from ibllib.io.extractors.bpod_trials import get_bpod_extractor
from ibllib.io.extractors import training_trials, biased_trials, camera
from ibllib.io import raw_data_loaders as raw
from ibllib.io.extractors.base import BaseExtractor
from ibllib.pipes.dynamic_pipeline import get_trials_tasks


[docs] def wheelMoves_fixture(func): """Decorator to save some dummy wheelMoves ALF files for extraction tests""" @functools.wraps(func) def wrapper(obj=None): # Save some wheelMoves ALF files attr_list = ['training_lt5', 'training_ge5', 'biased_lt5', 'biased_ge5'] alf_paths = [getattr(obj, p)['path'] / 'alf' for p in attr_list] n_trials = [getattr(obj, p)['ntrials'] for p in attr_list] for p, n in zip(alf_paths, n_trials): p.mkdir() np.save(str(p / '_ibl_wheelMoves.intervals.npy'), np.zeros((n, 2))) np.save(str(p / '_ibl_wheelMoves.peakAmplitude.npy'), np.zeros(n)) # Run method func(obj) # Teardown; delete the files for p in alf_paths: shutil.rmtree(p) return wrapper
[docs] class TestExtractTrialData(unittest.TestCase):
[docs] def setUp(self): self.main_path = Path(__file__).parent self.training_lt5 = {'path': self.main_path / 'data' / 'session_training_lt5'} self.biased_lt5 = {'path': self.main_path / 'data' / 'session_biased_lt5'} self.training_ge5 = {'path': self.main_path / 'data' / 'session_training_ge5'} self.biased_ge5 = {'path': self.main_path / 'data' / 'session_biased_ge5'} self.ephys = {'path': self.main_path / 'data' / 'session_ephys'} self.training_lt5['ntrials'] = len(raw.load_data(self.training_lt5['path'])) self.biased_lt5['ntrials'] = len(raw.load_data(self.biased_lt5['path'])) self.training_ge5['ntrials'] = len(raw.load_data(self.training_ge5['path'])) self.biased_ge5['ntrials'] = len(raw.load_data(self.biased_ge5['path'])) self.ephys['ntrials'] = len(raw.load_data(self.ephys['path'])) # turn off logging for unit testing as we will purposely go into warning/error cases self.wheel_ge5_path = self.main_path / 'data' / 'wheel_ge5' self.wheel_lt5_path = self.main_path / 'data' / 'wheel_lt5'
# Save some dummy wheel moves data for trial firstMovement_times extraction
[docs] def test_get_feedbackType(self): # TRAINING SESSIONS ft = training_trials.FeedbackType( self.training_lt5['path']).extract()[0] self.assertEqual(ft.size, self.training_lt5['ntrials']) # check if no 0's in feedbackTypes self.assertFalse(ft[ft == 0].size > 0) # -- version >= 5.0.0 ft = training_trials.FeedbackType( self.training_ge5['path']).extract()[0] self.assertEqual(ft.size, self.training_ge5['ntrials']) # check if no 0's in feedbackTypes self.assertFalse(ft[ft == 0].size > 0) # BIASED SESSIONS ft = biased_trials.FeedbackType( self.biased_lt5['path']).extract()[0] self.assertEqual(ft.size, self.biased_lt5['ntrials']) # check if no 0's in feedbackTypes self.assertFalse(ft[ft == 0].size > 0) # -- version >= 5.0.0 ft = biased_trials.FeedbackType( self.biased_ge5['path']).extract()[0] self.assertEqual(ft.size, self.biased_ge5['ntrials']) # check if no 0's in feedbackTypes self.assertFalse(ft[ft == 0].size > 0)
[docs] def test_get_contrastLR(self): # TRAINING SESSIONS cl, cr = training_trials.ContrastLR( self.training_lt5['path']).extract()[0] self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)])) self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)])) self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl)) self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl)) # -- version >= 5.0.0 cl, cr = training_trials.ContrastLR( self.training_ge5['path']).extract()[0] self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)])) self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)])) self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl)) self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl)) # BIASED SESSIONS cl, cr = biased_trials.ContrastLR( self.biased_lt5['path']).extract()[0] self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)])) self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)])) self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl)) self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl)) # -- version >= 5.0.0 cl, cr = biased_trials.ContrastLR( self.biased_ge5['path']).extract()[0] self.assertTrue(all([np.sign(x) >= 0 for x in cl if ~np.isnan(x)])) self.assertTrue(all([np.sign(x) >= 0 for x in cr if ~np.isnan(x)])) self.assertTrue(sum(np.isnan(cl)) + sum(np.isnan(cr)) == len(cl)) self.assertTrue(sum(~np.isnan(cl)) + sum(~np.isnan(cr)) == len(cl))
[docs] def test_get_probabilityLeft(self): # TRAINING SESSIONS pl = training_trials.ProbabilityLeft( self.training_lt5['path']).extract()[0] self.assertIsInstance(pl, np.ndarray) # -- version >= 5.0.0 pl = training_trials.ProbabilityLeft( self.training_ge5['path']).extract()[0] self.assertIsInstance(pl, np.ndarray) # BIASED SESSIONS pl = biased_trials.ProbabilityLeft( self.biased_lt5['path']).extract()[0] self.assertIsInstance(pl, np.ndarray) # Test if only probs that are in prob set md = raw.load_settings(self.biased_lt5['path']) if md: probs = md['BLOCK_PROBABILITY_SET'] probs.append(0.5) self.assertTrue(sum([x in probs for x in pl]) == len(pl)) # -- version >= 5.0.0 pl = biased_trials.ProbabilityLeft( self.biased_ge5['path']).extract()[0] self.assertIsInstance(pl, np.ndarray) # Test if only probs that are in prob set md = raw.load_settings(self.biased_ge5['path']) probs = md['BLOCK_PROBABILITY_SET'] probs.append(0.5) self.assertTrue(sum([x in probs for x in pl]) == len(pl)) # EPHYS SESSION data = raw.load_data(self.ephys['path']) md = raw.load_settings(self.ephys['path']) *_, pLeft0, _ = biased_trials.ProbaContrasts( self.ephys['path']).extract(bpod_trials=data, settings=md)[0] self.assertEqual(len(pLeft0), self.ephys['ntrials'], 'ephys prob left') # Test if only generative prob values in data self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft0))) # Test if settings file has empty LEN_DATA result is same md.update({'LEN_BLOCKS': None}) *_, pLeft1, _ = biased_trials.ProbaContrasts( self.ephys['path']).extract(bpod_trials=data, settings=md)[0] self.assertTrue(all(pLeft0 == pLeft1)) # Test if only generative prob values in data self.assertTrue(all(x in [0.2, 0.5, 0.8] for x in np.unique(pLeft1)))
[docs] def test_get_choice(self): # TRAINING SESSIONS choice = training_trials.Choice( session_path=self.training_lt5['path']).extract(save=False)[0] self.assertIsInstance(choice, np.ndarray) data = raw.load_data(self.training_lt5['path']) trial_nogo = np.array( [~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0]) for t in data]) if any(trial_nogo): self.assertTrue(all(choice[trial_nogo]) == 0) # -- version >= 5.0.0 choice = training_trials.Choice( session_path=self.training_ge5['path']).extract(save=False)[0] self.assertIsInstance(choice, np.ndarray) data = raw.load_data(self.training_ge5['path']) trial_nogo = np.array( [~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0]) for t in data]) if any(trial_nogo): self.assertTrue(all(choice[trial_nogo]) == 0) # BIASED SESSIONS choice = biased_trials.Choice( session_path=self.biased_lt5['path']).extract(save=False)[0] self.assertIsInstance(choice, np.ndarray) data = raw.load_data(self.biased_lt5['path']) trial_nogo = np.array( [~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0]) for t in data]) if any(trial_nogo): self.assertTrue(all(choice[trial_nogo]) == 0) # -- version >= 5.0.0 choice = biased_trials.Choice( session_path=self.biased_ge5['path']).extract(save=False)[0] self.assertIsInstance(choice, np.ndarray) data = raw.load_data(self.biased_ge5['path']) trial_nogo = np.array( [~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0]) for t in data]) if any(trial_nogo): self.assertTrue(all(choice[trial_nogo]) == 0)
[docs] def test_get_repNum(self): # TRAINING SESSIONS rn = training_trials.RepNum( self.training_lt5['path']).extract()[0] self.assertIsInstance(rn, np.ndarray) expected = [0, 1, 2, 0] np.testing.assert_array_equal(rn, expected) # -- version >= 5.0.0 rn = training_trials.RepNum( self.training_ge5['path']).extract()[0] self.assertIsInstance(rn, np.ndarray) expected = [0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 0, 1] np.testing.assert_array_equal(rn, expected)
# BIASED SESSIONS have no repeated trials
[docs] def test_get_rewardVolume(self): # TRAINING SESSIONS rv = training_trials.RewardVolume( self.training_lt5['path']).extract()[0] self.assertIsInstance(rv, np.ndarray) # -- version >= 5.0.0 rv = training_trials.RewardVolume( self.training_ge5['path']).extract()[0] self.assertIsInstance(rv, np.ndarray) # BIASED SESSIONS rv = biased_trials.RewardVolume( self.biased_lt5['path']).extract()[0] self.assertIsInstance(rv, np.ndarray) # Test if all non-zero rewards are of the same value self.assertTrue(all(x == max(rv) for x in rv if x != 0)) # -- version >= 5.0.0 rv = biased_trials.RewardVolume( self.biased_ge5['path']).extract()[0] self.assertIsInstance(rv, np.ndarray) # Test if all non-zero rewards are of the same value self.assertTrue(all(x == max(rv) for x in rv if x != 0))
[docs] def test_get_feedback_times_ge5(self): # TRAINING SESSIONS ft = training_trials.FeedbackTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(ft, np.ndarray) # BIASED SESSIONS ft = biased_trials.FeedbackTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(ft, np.ndarray)
[docs] def test_get_feedback_times_lt5(self): # TRAINING SESSIONS ft = training_trials.FeedbackTimes( self.training_lt5['path']).extract()[0] self.assertIsInstance(ft, np.ndarray) # BIASED SESSIONS ft = biased_trials.FeedbackTimes( self.biased_lt5['path']).extract()[0] self.assertIsInstance(ft, np.ndarray)
[docs] def test_get_stimOnTrigger_times(self): # TRAINING SESSIONS sott = training_trials.StimOnTriggerTimes( self.training_lt5['path']).extract()[0] self.assertIsInstance(sott, np.ndarray) # -- version >= 5.0.0 sott = training_trials.StimOnTriggerTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(sott, np.ndarray) # BIASED SESSIONS sott = biased_trials.StimOnTriggerTimes( self.biased_lt5['path']).extract()[0] self.assertIsInstance(sott, np.ndarray) # -- version >= 5.0.0 sott = biased_trials.StimOnTriggerTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(sott, np.ndarray)
[docs] def test_stimOnOffFreeze_times(self): # TRAINING SESSIONS st = training_trials.StimOnOffFreezeTimes( self.training_lt5['path']).extract()[0] self.assertIsInstance(st[0], np.ndarray) # BIASED SESSIONS st = biased_trials.StimOnOffFreezeTimes( self.biased_lt5['path']).extract()[0] self.assertIsInstance(st[0], np.ndarray) # TRAINING SESSIONS st = training_trials.StimOnOffFreezeTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(st[0], np.ndarray) # BIASED SESSIONS st = biased_trials.StimOnOffFreezeTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(st[0], np.ndarray)
[docs] def test_get_intervals(self): # TRAINING SESSIONS di = training_trials.Intervals( self.training_lt5['path']).extract()[0] self.assertIsInstance(di, np.ndarray) self.assertFalse(np.isnan(di).all()) # -- version >= 5.0.0 di = training_trials.Intervals( self.training_ge5['path']).extract()[0] self.assertIsInstance(di, np.ndarray) self.assertFalse(np.isnan(di).all()) # BIASED SESSIONS di = biased_trials.Intervals( self.training_lt5['path']).extract()[0] self.assertIsInstance(di, np.ndarray) self.assertFalse(np.isnan(di).all()) # -- version >= 5.0.0 di = biased_trials.Intervals( self.training_ge5['path']).extract()[0] self.assertIsInstance(di, np.ndarray) self.assertFalse(np.isnan(di).all())
[docs] def test_get_response_times(self): # TRAINING SESSIONS rt = training_trials.ResponseTimes( self.training_lt5['path']).extract()[0] self.assertIsInstance(rt, np.ndarray) # -- version >= 5.0.0 rt = training_trials.ResponseTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(rt, np.ndarray) # BIASED SESSIONS rt = biased_trials.ResponseTimes( self.biased_lt5['path']).extract()[0] self.assertIsInstance(rt, np.ndarray) # -- version >= 5.0.0 rt = biased_trials.ResponseTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(rt, np.ndarray)
[docs] def test_get_goCueTrigger_times(self): # TRAINING SESSIONS data = raw.load_data(self.training_lt5['path']) gct = np.array([tr['behavior_data']['States timestamps'] ['closed_loop'][0][0] for tr in data]) self.assertIsInstance(gct, np.ndarray) # -- version >= 5.0.0 gct = training_trials.GoCueTriggerTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(gct, np.ndarray) # BIASED SESSIONS data = raw.load_data(self.biased_lt5['path']) gct = np.array([tr['behavior_data']['States timestamps'] ['closed_loop'][0][0] for tr in data]) self.assertIsInstance(gct, np.ndarray) # -- version >= 5.0.0 gct = biased_trials.GoCueTriggerTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(gct, np.ndarray)
[docs] def test_get_goCueOnset_times(self): # TRAINING SESSIONS gcot = training_trials.GoCueTimes( self.training_lt5['path']).extract()[0] self.assertIsInstance(gcot, np.ndarray) self.assertTrue(np.all(np.isnan(gcot))) self.assertTrue(gcot.size != 0 or gcot.size == 4) # -- version >= 5.0.0 gcot = training_trials.GoCueTimes( self.training_ge5['path']).extract()[0] self.assertIsInstance(gcot, np.ndarray) self.assertFalse(np.any(np.isnan(gcot))) self.assertTrue(gcot.size != 0 or gcot.size == 12) # BIASED SESSIONS gcot = biased_trials.GoCueTimes( self.biased_lt5['path']).extract()[0] self.assertIsInstance(gcot, np.ndarray) self.assertFalse(np.any(np.isnan(gcot))) self.assertTrue(gcot.size != 0 or gcot.size == 4) # -- version >= 5.0.0 gcot = biased_trials.GoCueTimes( self.biased_ge5['path']).extract()[0] self.assertIsInstance(gcot, np.ndarray) self.assertFalse(np.any(np.isnan(gcot))) self.assertTrue(gcot.size != 0 or gcot.size == 8)
[docs] def test_get_included_trials_lt5(self): # TRAINING SESSIONS it = training_trials.IncludedTrials( self.training_lt5['path']).extract()[0] self.assertIsInstance(it, np.ndarray) # BIASED SESSIONS it = biased_trials.IncludedTrials( self.biased_lt5['path']).extract()[0] self.assertIsInstance(it, np.ndarray)
[docs] def test_get_included_trials_ge5(self): # TRAINING SESSIONS it = training_trials.IncludedTrials( self.training_ge5['path']).extract()[0] self.assertIsInstance(it, np.ndarray) # BIASED SESSIONS it = biased_trials.IncludedTrials( self.biased_ge5['path']).extract()[0] self.assertIsInstance(it, np.ndarray)
[docs] def test_get_included_trials(self): # TRAINING SESSIONS it = training_trials.IncludedTrials( self.training_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0] self.assertIsInstance(it, np.ndarray) # -- version >= 5.0.0 it = training_trials.IncludedTrials( self.training_ge5['path']).extract()[0] self.assertIsInstance(it, np.ndarray) # BIASED SESSIONS it = biased_trials.IncludedTrials( self.biased_lt5['path']).extract(settings={'IBLRIG_VERSION': '4.9.9'})[0] self.assertIsInstance(it, np.ndarray) # -- version >= 5.0.0 it = biased_trials.IncludedTrials( self.biased_ge5['path']).extract()[0] self.assertIsInstance(it, np.ndarray)
[docs] def test_get_pause_duration(self): """Test for extraction of pause periods.""" extractor = training_trials.PauseDuration(self.biased_ge5['path']) pp = extractor.extract()[0] self.assertIsInstance(pp, np.ndarray) # For sessions pre-pause implementation we expect zeros np.testing.assert_array_equal(pp, np.zeros(self.biased_ge5['ntrials'], dtype=float)) # For v8.9.0 and later it should return duration or NaN for i, trial in enumerate(extractor.bpod_trials): trial.update({'pause_duration': i}) pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0] np.testing.assert_array_equal(pp, np.arange(self.biased_ge5['ntrials'])) extractor.bpod_trials[5].pop('pause_duration') pp = extractor.extract(bpod_trials=extractor.bpod_trials, settings={'IBLRIG_VERSION': '8.9.0'})[0] self.assertTrue(np.isnan(pp[5]))
[docs] def test_encoder_positions_clock_reset(self): # TRAINING SESSIONS # only for training? path = self.training_lt5['path'] / "raw_behavior_data" path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None) dy = raw._load_encoder_positions_file_lt5(path) dat = np.array([849736, 1532230, 1822449, 1833514, 1841566, 1848206, 1853979, 1859144]) self.assertTrue(np.all(np.diff(dy['re_ts']) > 0)) self.assertTrue(all(dy['re_ts'][6:] - 2 ** 32 - dat == 0))
[docs] def test_encoder_positions_clock_errors(self): # here we test for 2 kinds of file corruption that happen # 1/2 the first sample time is corrupt and absurdly high and should be discarded # 2/2 2 samples are swapped and need to be swapped backk path = self.biased_lt5['path'] / "raw_behavior_data" path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None) dy = raw._load_encoder_positions_file_lt5(path) self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0)) # -- version >= 5.0.0 path = self.biased_ge5['path'] / "raw_behavior_data" path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None) dy = raw._load_encoder_positions_file_ge5(path) self.assertTrue(np.all(np.diff(np.array(dy.re_ts)) > 0))
[docs] def test_wheel_folders(self): # the wheel folder contains other errors in bpod output that had to be addressed for wf in self.wheel_lt5_path.glob('_iblrig_encoderPositions*.raw*.ssv'): df = raw._load_encoder_positions_file_lt5(wf) self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0)) for wf in self.wheel_lt5_path.glob('_iblrig_encoderEvents*.raw*.ssv'): df = raw._load_encoder_events_file_lt5(wf) self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0)) for wf in self.wheel_ge5_path.glob('_iblrig_encoderPositions*.raw*.ssv'): df = raw._load_encoder_positions_file_ge5(wf) self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0)) for wf in self.wheel_ge5_path.glob('_iblrig_encoderEvents*.raw*.ssv'): df = raw._load_encoder_events_file_ge5(wf) self.assertTrue(np.all(np.diff(np.array(df.re_ts)) > 0))
[docs] def test_load_encoder_positions(self): raw.load_encoder_positions(self.training_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}) raw.load_encoder_positions(self.training_ge5['path']) raw.load_encoder_positions(self.biased_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}) raw.load_encoder_positions(self.biased_ge5['path'])
[docs] def test_load_encoder_events(self): raw.load_encoder_events(self.training_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}) raw.load_encoder_events(self.training_ge5['path']) raw.load_encoder_events(self.biased_lt5['path'], settings={'IBLRIG_VERSION': '4.9.9'}) raw.load_encoder_events(self.biased_ge5['path'])
[docs] def test_size_outputs(self): # check the output dimensions # VERSION >= 5.0.0 task, = get_trials_tasks(self.training_ge5['path']) trials, _ = task.extract_behaviour(save=True) # Load from file (returned `trials` above includes wheel data) trials = alfio.load_object(self.training_ge5['path'] / 'alf', object='trials') self.assertTrue(alfio.check_dimensions(trials) == 0) task, = get_trials_tasks(self.biased_ge5['path']) trials, _ = task.extract_behaviour(save=True) trials = alfio.load_object(self.biased_ge5['path'] / 'alf', object='trials') self.assertTrue(alfio.check_dimensions(trials) == 0) # VERSION < 5.0.0 # for these test data there are no wheel moves so let's mock the output mock_data = { 'intervals': np.array([[0, 1], ]), 'peakAmplitude': np.array([1, 1]), 'peakVelocity_times': np.array([1, 1])} function_name = 'ibllib.io.extractors.training_wheel.extract_wheel_moves' # Training with patch(function_name, return_value=mock_data): task, = get_trials_tasks(self.training_lt5['path']) trials, _ = task.extract_behaviour(save=True) trials = alfio.load_object(self.training_lt5['path'] / 'alf', object='trials') self.assertTrue(alfio.check_dimensions(trials) == 0) # Biased with patch(function_name, return_value=mock_data): task, = get_trials_tasks(self.biased_lt5['path']) trials, _ = task.extract_behaviour(save=True) trials = alfio.load_object(self.biased_lt5['path'] / 'alf', object='trials') self.assertTrue(alfio.check_dimensions(trials) == 0)
[docs] def tearDown(self): for f in self.main_path.rglob('_ibl_log.*.log'): f.unlink() [x.unlink() for x in self.training_lt5['path'].rglob('alf/*') if x.is_file()] [x.unlink() for x in self.biased_lt5['path'].rglob('alf/*') if x.is_file()] [x.unlink() for x in self.training_ge5['path'].rglob('alf/*') if x.is_file()] [x.unlink() for x in self.biased_ge5['path'].rglob('alf/*') if x.is_file()] [x.rmdir() for x in self.training_lt5['path'].rglob('alf/') if x.is_dir()] [x.rmdir() for x in self.biased_lt5['path'].rglob('alf/') if x.is_dir()] [x.rmdir() for x in self.training_ge5['path'].rglob('alf/') if x.is_dir()] [x.rmdir() for x in self.biased_ge5['path'].rglob('alf/') if x.is_dir()]
[docs] class TestSyncWheelBpod(unittest.TestCase):
[docs] def test_sync_bpod_bonsai_poor_quality_timestamps(self): sync_trials_robust = raw.sync_trials_robust drift_pol = np.array([11 * 1e-6, -20]) # bpod starts 20 secs before with 10 ppm drift np.random.seed(seed=784) t0_full = np.cumsum(np.random.rand(50)) + .001 t1_full = np.polyval(drift_pol, t0_full) + t0_full t0 = t0_full.copy() t1 = t1_full.copy() t0_, t1_ = sync_trials_robust(t0, t1) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(t0, t1[:-1]) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(t0, t1[1:]) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(t0[1:], t1) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(t0[:-1], t1) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(t0, np.delete(t1, 24)) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_) t0_, t1_ = sync_trials_robust(np.delete(t0, 12), np.delete(t1, 24)) assert np.allclose(t1_, np.polyval(drift_pol, t0_) + t0_)
[docs] class TestWheelLoaders(unittest.TestCase):
[docs] def setUp(self) -> None: self.main_path = Path(__file__).parent
[docs] def test_encoder_events_corrupt(self): path = self.main_path.joinpath('data', 'wheel', 'lt5') for file_events in path.rglob('_iblrig_encoderEvents.raw.*'): dy = raw._load_encoder_events_file_lt5(file_events) self.assertTrue(dy.size > 6) path = self.main_path.joinpath('data', 'wheel', 'ge5') for file_events in path.rglob('_iblrig_encoderEvents.raw.*'): dy = raw._load_encoder_events_file_ge5(file_events) self.assertTrue(dy.size > 6)
[docs] def test_encoder_positions_corrupts(self): path = self.main_path.joinpath('data', 'wheel', 'ge5') for file_position in path.rglob('_iblrig_encoderPositions.raw.*'): dy = raw._load_encoder_positions_file_ge5(file_position) self.assertTrue(dy.size > 18) path = self.main_path.joinpath('data', 'wheel', 'lt5') for file_position in path.rglob('_iblrig_encoderPositions.raw.*'): dy = raw._load_encoder_positions_file_lt5(file_position) self.assertTrue(dy.size > 18)
[docs] class MockExtracor(BaseExtractor): save_names = ( "some_file.csv", "some_file.tsv", "some_file.ssv", "some_file.npy", ) var_names = ( "csv", "ssv", "tsv", "npy", ) def _extract(self, **kwargs) -> tuple: csv = pd.DataFrame([1, 2, 3]) ssv = pd.DataFrame([1, 2, 3]) tsv = pd.DataFrame([1, 2, 3]) npy = np.array([1, 2, 3]) return (csv, ssv, tsv, npy)
[docs] class TestBaseExtractorSavingMethods(unittest.TestCase):
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.session_path = self.tempdir.name # self.addClassCleanup(tempdir.cleanup) # py3.8 self.mock_extractor = MockExtracor(self.session_path)
[docs] def test_saving_method(self): data, paths = self.mock_extractor.extract(save=True) self.assertTrue(all([x.exists() for x in paths]))
[docs] def tearDown(self): self.tempdir.cleanup()
[docs] class TestCameraExtractors(unittest.TestCase):
[docs] def test_groom_pin_state(self): # UNIT DATA fps = 60 t_offset = 39.4 ts = np.arange(0, 10, 1 / fps) + t_offset # Add drift ts += np.full_like(ts, 1e-4).cumsum() n_pulses = 2 pulse_width = 0.3 duty = 0.5 gpio = {'indices': np.empty(n_pulses * 2, dtype=np.int32), 'polarities': np.ones(n_pulses * 2, dtype=np.int32)} gpio['polarities'][1::2] = -1 aud_offset = 40. audio = {'times': np.empty(n_pulses * 2), 'polarities': gpio['polarities']} for p in range(n_pulses): i = p * 2 rise = (pulse_width * p) + duty * p + 1 audio['times'][i] = aud_offset + rise audio['times'][i + 1] = audio['times'][i] + pulse_width rise += t_offset gpio['indices'][i] = np.where(ts > rise)[0][0] gpio['indices'][i + 1] = np.where(ts > rise + pulse_width)[0][0] gpio_, audio_, ts_ = camera.groom_pin_state(gpio, audio, ts) self.assertEqual(audio, audio_, 'Audio dict shouldn\'t be affected') np.testing.assert_array_almost_equal(ts_[:4], [40., 40.016667, 40.033333, 40.05]) # Broken TTLs + extra TTL delay = 0.08 pulse_width = 1e-5 t = audio['times'][0] + delay audio['times'] = np.sort(np.append(audio['times'], [t, t + pulse_width, 80])) audio['polarities'] = np.ones(audio['times'].shape, dtype=np.int32) audio['polarities'][1::2] = -1 gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, min_diff=5e-3) self.assertTrue(audio_['times'].size == gpio_['times'].size == 4) # One front shifted by a large amount audio['times'][4] -= 0.3 gpio_, audio_, _ = camera.groom_pin_state(gpio, audio, ts, tolerance=.1, min_diff=5e-3) self.assertTrue(np.all(gpio_['times'] == audio_['times'])) self.assertTrue(np.all(gpio_['times'] == np.array([41., 41.3])))
[docs] def test_attribute_times(self, display=False): # Create two timestamp arrays at two different frequencies tsa = np.linspace(0, 60, 60 * 4)[:60] # 240bpm tsb = np.linspace(0, 60, 60 * 3)[:45] # 180bpm tsa = np.sort(np.append(tsa, .4)) # Add ambiguous front tsb = np.sort(np.append(tsb, .41)) if display: from ibllib.plots import vertical_lines import matplotlib.pyplot as plt vertical_lines(tsb, linestyle=':', color='r', label='tsb') vertical_lines(tsa, linestyle=':', color='b', label='tsa') plt.legend() # Check with default args matches = camera.attribute_times(tsa, tsb) expected = np.array( [0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21, 22, 24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44, 45, 46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60] ) np.testing.assert_array_equal(matches, expected) self.assertEqual(matches.size, tsb.size) # Taking closest instead of first should change index of ambiguous front matches = camera.attribute_times(tsa, tsb, take='nearest') expected[np.r_[1:3]] = expected[1:3] + 1 np.testing.assert_array_equal(matches, expected) # Taking first after should exclude many pulses matches = camera.attribute_times(tsa, tsb, take='after') missing = [0, 4, 5, 7, 8, 10, 11, 13, 14, 16, 17, 19, 20, 22, 23, 25, 26, 28, 29, 31, 32, 34, 35, 37, 40, 43] expected[missing] = -1 np.testing.assert_array_equal(matches, expected) # Lower tolerance matches = camera.attribute_times(tsa, tsb, tol=0.05) expected = np.array([0, 2, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57]) np.testing.assert_array_equal(matches[matches > -1], expected) # Remove injective assert matches = camera.attribute_times(tsa, tsb, injective=False, take='nearest') expected = np.array( [0, 2, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 20, 21, 22, 24, 25, 26, 28, 29, 30, 32, 33, 34, 36, 37, 38, 40, 41, 42, 44, 45, 46, 48, 49, -1, 52, 53, -1, 56, 57, -1, 60] ) np.testing.assert_array_equal(matches, expected) # Check input validation with self.assertRaises(ValueError): camera.attribute_times(tsa, tsb, injective=False, take='closest')
[docs] class TestGetBpodExtractor(unittest.TestCase):
[docs] def test_get_bpod_extractor(self): # un-existing extractor should raise a value error with self.assertRaises(ValueError): get_bpod_extractor('', protocol='sdf', task_collection='raw_behavior_data') # in this case this returns an ibllib.io.extractors.training_trials.TrainingTrials instance extractor = get_bpod_extractor( '', protocol='_trainingChoiceWorld', task_collection='raw_behavior_data' ) self.assertTrue(isinstance(extractor, BaseExtractor))
[docs] def test_get_bpod_custom_extractor(self): # here we'll mock a custom module with a custom extractor DummyModule = MagicMock() DummyExtractor = Mock(spec_set=BaseExtractor) DummyModule.toto.return_value = DummyExtractor base_module = 'ibllib.io.extractors.bpod_trials' with patch(f'{base_module}.get_bpod_extractor_class', return_value='toto'), \ patch(f'{base_module}.importlib.import_module', return_value=DummyModule) as import_mock: self.assertIs(get_bpod_extractor(''), DummyExtractor) import_mock.assert_called_with('projects') # Check raises when imported class not an extractor DummyModule.toto.return_value = MagicMock(spec=dict) self.assertRaisesRegex(ValueError, 'should be an Extractor class', get_bpod_extractor, '')
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)