import unittest
from unittest import mock
from functools import partial
from pathlib import Path
from uuid import uuid4
import numpy as np
from iblutil.util import Bunch
from one.api import ONE
from one.alf import spec
from ibllib.tests import TEST_DB
from ibllib.qc import task_metrics as qcmetrics
from brainbox.behavior.wheel import cm_to_rad
def _create_test_qc_outcomes():
"""Create task QC outcomes dict.
Used by TestAggregateOutcome.test_compute_dateset_qc_status and TestDatasetQC.
"""
outcomes = {'_task_' + k[6:]: spec.QC.NOT_SET for k in qcmetrics.TaskQC._get_checks(...)}
outcomes['_task_reward_volumes'] = outcomes['_task_stimOn_delays'] = spec.QC.WARNING
outcomes['_task_reward_volume_set'] = outcomes['_task_goCue_delays'] = spec.QC.FAIL
outcomes['_task_errorCue_delays'] = outcomes['_task_stimOff_delays'] = spec.QC.PASS
outcomes['_task_iti_delays'] = spec.QC.CRITICAL
return outcomes
[docs]
class TestAggregateOutcome(unittest.TestCase):
[docs]
def test_outcome_from_dict_default(self):
# For a task that has no costume thresholds, default is 0.99 PASS and 0.9 WARNING and 0 FAIL,
# np.nan and None return not set
qc_dict = {'gnap': .99, 'gnop': np.nan, 'gnip': None, 'gnep': 0.9, 'gnup': 0.89}
expect = {
'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, 'gnip': spec.QC.NOT_SET,
'gnep': spec.QC.WARNING, 'gnup': spec.QC.FAIL}
outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA)
self.assertEqual(outcome, spec.QC.FAIL)
self.assertEqual(expect, outcome_dict)
[docs]
def test_outcome_from_dict_stimFreeze_delays(self):
# For '_task_stimFreeze_delays' the threshold are 0.99 PASS and 0 WARNING
qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_stimFreeze_delays': .1}
expect = {'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, '_task_stimFreeze_delays': spec.QC.WARNING}
outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA)
self.assertEqual(outcome, spec.QC.WARNING)
self.assertEqual(expect, outcome_dict)
[docs]
def test_outcome_from_dict_iti_delays(self):
# For '_task_iti_delays' the threshold is 0 NOT_SET
qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_iti_delays': .1}
expect = {'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, '_task_iti_delays': spec.QC.NOT_SET}
outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA)
self.assertEqual(outcome, spec.QC.PASS)
self.assertEqual(expect, outcome_dict)
[docs]
def test_out_of_bounds(self):
# When qc values are below 0 or above 1, give error
qc_dict = {'gnap': 1.01, 'gnop': 0, 'gnip': 0.99}
with self.assertRaises(ValueError) as e:
qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA)
self.assertTrue(e.exception.args[0] == 'Values out of bound')
[docs]
def test_compute_dateset_qc_status(self):
"""Test TaskQC.compute_dateset_qc_status method."""
outcomes = _create_test_qc_outcomes()
dataset_outcomes = qcmetrics.TaskQC.compute_dataset_qc_status(outcomes)
expected = {'_ibl_trials.stimOff_times': spec.QC.PASS,
'_ibl_trials.table': {
'intervals': spec.QC.CRITICAL,
'goCue_times': spec.QC.FAIL,
'response_times': spec.QC.NOT_SET,
'choice': spec.QC.NOT_SET,
'stimOn_times': spec.QC.WARNING,
'contrastLeft': spec.QC.NOT_SET,
'contrastRight': spec.QC.NOT_SET,
'feedbackType': spec.QC.NOT_SET,
'probabilityLeft': spec.QC.NOT_SET,
'feedback_times': spec.QC.PASS,
'firstMovement_times': spec.QC.NOT_SET}}
self.assertDictEqual(expected, dataset_outcomes)
[docs]
class TestDatasetQC(unittest.TestCase):
[docs]
def test_update_dataset_qc(self):
"""Test task_metrics.update_dataset_qc function."""
registered_datasets = [
{'name': '_ibl_trials.table.pqt', 'qc': 'NOT_SET', 'id': str(uuid4())},
{'name': '_ibl_other.intervals.npy', 'qc': 'PASS', 'id': str(uuid4())},
{'name': '_ibl_trials.stimOff_times.npy', 'qc': 'NOT_SET', 'id': str(uuid4())}
]
one = mock.MagicMock()
one.alyx.get.side_effect = lambda *args, **kwargs: {'qc': spec.QC.NOT_SET.name, 'json': {'extended_qc': None}}
one.alyx.rest.side_effect = lambda *args, **kwargs: kwargs.get('data')
one.offline = False
qc = qcmetrics.TaskQC('subject/2020-01-01/001', one=one)
task_qc_results = (spec.QC.CRITICAL, {}, _create_test_qc_outcomes()) # Inject some toy trials QC results
with mock.patch.object(qc, 'compute_session_status', return_value=task_qc_results):
out = qcmetrics.update_dataset_qc(qc, registered_datasets.copy(), one, override=False)
self.assertEqual(3, len(out))
self.assertEqual(['CRITICAL', 'PASS', 'PASS'], [x['qc'] for x in out])
# Check extended qc
extended_qc = one.alyx.json_field_update.call_args.kwargs.get('data', {}).get('extended_qc', {})
# Check a few of the fields
self.assertEqual(spec.QC.WARNING, extended_qc.get('stimOn_times'))
self.assertEqual(spec.QC.CRITICAL, extended_qc.get('intervals'))
self.assertEqual(spec.QC.FAIL, extended_qc.get('goCue_times'))
self.assertEqual(spec.QC.NOT_SET, extended_qc.get('response_times'))
# Test behaviour when dataset QC not in registered datasets list
one.reset_mock()
with mock.patch.object(qc, 'compute_session_status', return_value=task_qc_results), \
mock.patch.object(qc, 'compute_dataset_qc_status', return_value={'_ibl_foo.bar': spec.QC.PASS}), \
self.assertLogs(qcmetrics.__name__, level=10) as cm:
out = qcmetrics.update_dataset_qc(qc, registered_datasets.copy(), one, override=False)
self.assertEqual(registered_datasets, out)
self.assertIn('dataset _ibl_foo.bar not registered', cm.output[-1])
one.alyx.get.assert_not_called()
one.alyx.rest.assert_not_called()
# Test assertion on duplicate dataset stems
registered_datasets.append({
'name': '_ibl_other.intervals.csv', 'qc': 'FAIL', 'id': str(uuid4())
})
self.assertRaises(AssertionError, qcmetrics.update_dataset_qc, qc, registered_datasets.copy(), one)
[docs]
class TestTaskMetrics(unittest.TestCase):
[docs]
def setUp(self):
self.data = self.load_fake_bpod_data()
self.wheel_gain = 4
wheel_data = self.load_fake_wheel_data(self.data, wheel_gain=self.wheel_gain)
self.data.update(wheel_data)
[docs]
@staticmethod
def load_fake_bpod_data(n=5):
"""Create fake extractor output of bpodqc.load_data
:param n: the number of trials
:return: a dict of simulated trial data
"""
trigg_delay = 1e-4 # an ideal delay between triggers and measured times
resp_feeback_delay = 1e-3 # delay between feedback and response
stimOff_itiIn_delay = 5e-3 # delay between stimOff and itiIn
N = partial(np.random.normal, (n,)) # Convenience function for norm dist sampling
choice = np.ones((n,), dtype=int)
choice[[1, 3]] = -1 # a couple of incorrect trials
choice[0] = 0 # a nogo trial
# One trial of each type incorrect
correct = choice != 0
correct[np.argmax(choice == 1)] = 0
correct[np.argmax(choice == -1)] = 0
pauses = np.zeros(n, dtype=float)
# add a 5s pause on 3rd trial
pauses[2] = 5.
quiescence_length = 0.2 + np.random.standard_exponential(size=(n,))
iti_length = .5 # inter-trial interval
# trial lengths include quiescence period, a couple small trigger delays and iti
trial_lengths = quiescence_length + resp_feeback_delay + (trigg_delay * 4) + iti_length
# add on 60 + 2s for nogos + feedback time (1 or 2s) + ~0.5s for other responses
trial_lengths += (choice == 0) * 60 + (~correct + 1) + (choice != 0) * N(0.5)
start_times = (np.r_[0, np.cumsum(trial_lengths)] + np.r_[0, np.cumsum(pauses)])[:-1]
end_times = np.cumsum(trial_lengths) - 1e-2 + np.r_[0, np.cumsum(pauses)][:-1]
data = {
'phase': np.random.uniform(low=0, high=2 * np.pi, size=(n,)),
'quiescence': quiescence_length,
'choice': choice,
'correct': correct,
'intervals': np.c_[start_times, end_times],
'itiIn_times': end_times - iti_length + stimOff_itiIn_delay,
'position': np.ones_like(choice) * 35,
'pause_duration': pauses
}
data['stimOnTrigger_times'] = start_times + data['quiescence'] + 1e-4
data['stimOn_times'] = data['stimOnTrigger_times'] + 1e-1
data['goCueTrigger_times'] = data['stimOn_times'] + 1e-3
data['goCue_times'] = data['goCueTrigger_times'] + trigg_delay
data['response_times'] = end_times - (
resp_feeback_delay + iti_length + (~correct + 1)
)
data['feedback_times'] = data['response_times'] + resp_feeback_delay
data['stimFreeze_times'] = data['response_times'] + 1e-2
data['stimFreezeTrigger_times'] = data['stimFreeze_times'] - trigg_delay
data['feedbackType'] = np.vectorize(lambda x: -1 if x == 0 else x)(data['correct'])
outcome = data['feedbackType'].copy()
outcome[data['choice'] == 0] = 0
data['outcome'] = outcome
# Delay of 1 second if correct, 2 seconds if incorrect, and stim off at feedback for nogo
data['stimOffTrigger_times'] = data['feedback_times'] + (~correct + 1) - (choice == 0) * 2
data['stimOff_times'] = data['stimOffTrigger_times'] + trigg_delay
# Error tone times nan on incorrect trials
outcome_times = np.vectorize(lambda x, y: x + 1e-2 if y else np.nan)
data['errorCueTrigger_times'] = outcome_times(data['feedback_times'], ~data['correct'])
data['errorCue_times'] = data['errorCueTrigger_times'] + trigg_delay
data['valveOpen_times'] = outcome_times(data['feedback_times'], data['correct'])
data['rewardVolume'] = ~np.isnan(data['valveOpen_times']) * 3.0
return data
[docs]
@staticmethod
def load_fake_wheel_data(trial_data, wheel_gain=4):
# Load a wheel fragment: a numpy array of the form [timestamps, positions], for a wheel
# movement during one trial. Wheel is X1 bpod RE in radians.
wh_path = Path(__file__).parent.joinpath('..', 'fixtures', 'qc').resolve()
wheel_frag = np.load(wh_path.joinpath('wheel.npy'))
resolution = np.mean(np.abs(np.diff(wheel_frag[:, 1]))) # pos diff between samples
# abs displacement, s, in mm required to move 35 visual degrees
POS_THRESH = 35
s_mm = np.abs(POS_THRESH / wheel_gain) # don't care about direction
# convert abs displacement to radians (wheel pos is in rad)
pos_thresh = cm_to_rad(s_mm * 1e-1)
# index of threshold cross
pos_thresh_idx = np.argmax(np.abs(wheel_frag[:, 1]) > pos_thresh)
def qt_wheel_fill(start, end, t_step=0.001, p_step=None):
if p_step is None:
p_step = 2 * np.pi / 1024
t = np.arange(start, end, t_step)
p = np.random.randint(-1, 2, len(t))
t = t[p != 0]
p = p[p != 0].cumsum() * p_step
return t, p
wheel_data = [] # List generated of wheel data fragments
movement_times = [] # List of generated first movement times
def add_frag(t, p):
"""Add wheel data fragments to list, adjusting positions to be within one sample of
one another"""
last_samp = getattr(add_frag, 'last_samp', (0, 0))
p += last_samp[1]
if np.abs(p[0] - last_samp[1]) == 0:
p += resolution
wheel_data.append((t, p))
add_frag.last_samp = (t[-1], p[-1])
for i in np.arange(len(trial_data['choice'])):
# Iterate over trials generating wheel samples for the necessary periods
# trial start to stim on; should be below quiescence threshold
stimOn_trig = trial_data['stimOnTrigger_times'][i]
trial_start = trial_data['intervals'][i, 0]
t, p = qt_wheel_fill(trial_start, stimOn_trig, .5, resolution)
if len(t) > 0: # Possible for no movement during quiescence
add_frag(t, p)
# stim on to trial end
trial_end = trial_data['intervals'][i, 1]
if trial_data['choice'][i] == 0:
# Add random wheel movements for duration of trial
goCue = trial_data['goCue_times'][i]
t, p = qt_wheel_fill(goCue, trial_end, .1, resolution)
add_frag(t, p)
movement_times.append(t[0])
else:
# Align wheel fragment with response time
response_time = trial_data['response_times'][i]
t = wheel_frag[:, 0] + response_time - wheel_frag[pos_thresh_idx, 0]
p = np.abs(wheel_frag[:, 1]) * trial_data['choice'][i]
assert t[0] > add_frag.last_samp[0]
movement_times.append(t[1])
add_frag(t, p)
# Fill in random movements between end of response and trial
t, p = qt_wheel_fill(t[-1] + 0.01, trial_end, p_step=resolution)
add_frag(t, p)
# Stitch wheel fragments and assert no skips
wheel_data = np.concatenate(list(map(np.column_stack, wheel_data)))
assert np.all(np.diff(wheel_data[:, 0]) > 0), 'timestamps don\'t strictly increase'
np.testing.assert_allclose(np.abs(np.diff(wheel_data[:, 1])), resolution)
assert len(movement_times) == trial_data['intervals'].shape[0]
return {
'wheel_timestamps': wheel_data[:, 0],
'wheel_position': wheel_data[:, 1],
'firstMovement_times': np.array(movement_times)
}
[docs]
def test_check_stimOn_goCue_delays(self):
metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data)
self.assertTrue(np.allclose(metric, 0.0011), 'failed to return correct metric')
# Set incorrect timestamp (goCue occurs before stimOn)
self.data['goCue_times'][-1] = self.data['stimOn_times'][-1] - 1e-4
metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data)
n = len(self.data['stimOn_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_response_feedback_delays(self):
metric, passed = qcmetrics.check_response_feedback_delays(self.data)
self.assertTrue(np.allclose(metric, 0.001), 'failed to return correct metric')
# Set incorrect timestamp (feedback occurs before response)
self.data['feedback_times'][-1] = self.data['response_times'][-1] - 1e-4
metric, passed = qcmetrics.check_response_feedback_delays(self.data)
n = len(self.data['feedback_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_response_stimFreeze_delays(self):
metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-2), 'failed to return correct metric')
# Set incorrect timestamp (stimFreeze occurs before response)
self.data['stimFreeze_times'][-1] = self.data['response_times'][-1] - 1e-4
metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data)
n = len(self.data['feedback_times']) - np.sum(self.data['choice'] == 0)
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_positive_feedback_stimOff_delays(self):
metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data)
self.assertTrue(
np.allclose(metric[self.data['correct']], 1e-4), 'failed to return correct metric'
)
# Set incorrect timestamp (stimOff occurs just after response)
id = np.argmax(self.data['correct'])
self.data['stimOff_times'][id] = self.data['response_times'][id] + 1e-2
metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data)
expected = (self.data['correct'].sum() - 1) / self.data['correct'].sum()
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_negative_feedback_stimOff_delays(self):
err_trial = ~self.data['correct']
metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data)
values = np.abs(metric[err_trial])
self.assertTrue(np.allclose(values, 1e-2), 'failed to return correct metric')
# Set incorrect timestamp (stimOff occurs 1s after response)
id = np.argmax(err_trial)
self.data['stimOff_times'][id] = self.data['response_times'][id] + 1
metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data)
expected = (err_trial.sum() - 1) / err_trial.sum()
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_error_trial_event_sequence(self):
metric, passed = qcmetrics.check_error_trial_event_sequence(self.data)
self.assertTrue(np.all(metric == ~self.data['correct']), 'failed to return correct metric')
self.assertTrue(np.all(passed))
# Set incorrect timestamp (itiIn occurs before errorCue)
err_trial = ~self.data['correct']
(id,) = np.where(err_trial)
self.data['intervals'][id[0], 0] = np.inf
self.data['errorCue_times'][id[1]] = 0
metric, passed = qcmetrics.check_error_trial_event_sequence(self.data)
expected = (err_trial.sum() - 2) / err_trial.sum()
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_correct_trial_event_sequence(self):
metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data)
self.assertTrue(np.all(metric == self.data['correct']), 'failed to return correct metric')
self.assertTrue(np.all(passed))
# Set incorrect timestamp
correct = self.data['correct']
id = np.argmax(correct)
self.data['intervals'][id, 0] = np.inf
metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data)
expected = (correct.sum() - 1) / correct.sum()
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_trial_length(self):
metric, passed = qcmetrics.check_trial_length(self.data)
self.assertTrue(np.all(metric), 'failed to return correct metric')
# Set incorrect timestamp
self.data['goCue_times'][-1] = 0
metric, passed = qcmetrics.check_trial_length(self.data)
n = len(self.data['goCue_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_goCue_delays(self):
metric, passed = qcmetrics.check_goCue_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric')
# Set incorrect timestamp
self.data['goCue_times'][1] = self.data['goCueTrigger_times'][1] + 0.1
metric, passed = qcmetrics.check_goCue_delays(self.data)
n = len(self.data['goCue_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_errorCue_delays(self):
metric, passed = qcmetrics.check_errorCue_delays(self.data)
err_trial = ~self.data['correct']
self.assertTrue(np.allclose(metric[err_trial], 1e-4), 'failed to return correct metric')
# Set incorrect timestamp
id = np.argmax(err_trial)
self.data['errorCue_times'][id] = self.data['errorCueTrigger_times'][id] + 0.1
metric, passed = qcmetrics.check_errorCue_delays(self.data)
n = err_trial.sum()
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_stimOn_delays(self):
metric, passed = qcmetrics.check_stimOn_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-1), 'failed to return correct metric')
# Set incorrect timestamp
self.data['stimOn_times'][-1] = self.data['stimOnTrigger_times'][-1] + 0.2
metric, passed = qcmetrics.check_stimOn_delays(self.data)
n = len(self.data['stimOn_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_stimOff_delays(self):
metric, passed = qcmetrics.check_stimOff_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric')
# Set incorrect timestamp
self.data['stimOff_times'][-1] = self.data['stimOffTrigger_times'][-1] + 0.2
metric, passed = qcmetrics.check_stimOff_delays(self.data)
n = len(self.data['stimOff_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_stimFreeze_delays(self):
metric, passed = qcmetrics.check_stimFreeze_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric')
# Set incorrect timestamp
self.data['stimFreeze_times'][-1] = self.data['stimFreezeTrigger_times'][-1] + 0.2
metric, passed = qcmetrics.check_stimFreeze_delays(self.data)
n = len(self.data['stimFreeze_times'])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs]
def test_check_reward_volumes(self):
metric, passed = qcmetrics.check_reward_volumes(self.data)
self.assertTrue(all(x in {0.0, 3.0} for x in metric), 'failed to return correct metric')
self.assertTrue(np.all(passed))
# Set incorrect volume
id = np.array([np.argmax(self.data['correct']), np.argmax(~self.data['correct'])])
self.data['rewardVolume'][id] = self.data['rewardVolume'][id] + 1
metric, passed = qcmetrics.check_reward_volumes(self.data)
self.assertTrue(np.mean(passed) == 0.6, 'failed to detect incorrect reward volumes')
[docs]
def test_check_reward_volume_set(self):
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertTrue(all(x in {0.0, 3.0} for x in metric), 'failed to return correct metric')
self.assertTrue(passed)
# Add a new volume to the set
id = np.argmax(self.data['correct'])
self.data['rewardVolume'][id] = 2.3
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertFalse(passed, 'failed to detect incorrect reward volume set')
# Set 0 volumes to new value; set length still 2 but should fail anyway
self.data['rewardVolume'][~self.data['correct']] = 2.3
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertFalse(passed, 'failed to detect incorrect reward volume set')
[docs]
def test_check_audio_pre_trial(self):
# Create Sound sync fake data that is OK
BNC2_OK = {
'times': self.data['goCue_times'] + 1e-1,
'polarities': np.array([1, -1, 1, -1, 1]),
}
# Create Sound sync fake data that is NOT OK
BNC2_NOK = {
'times': self.data['goCue_times'] - 1e-1,
'polarities': np.array([1, -1, 1, -1, 1]),
}
metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_OK)
self.assertTrue(~np.all(metric))
self.assertTrue(np.all(passed))
metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_NOK)
self.assertTrue(np.all(metric))
self.assertTrue(~np.all(passed))
[docs]
def test_check_wheel_freeze_during_quiescence(self):
metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data)
self.assertTrue(np.all(passed))
# Make one trial move more
n = 1 # Index of trial to manipulate
t1 = self.data['intervals'][n, 0]
t2 = self.data['stimOnTrigger_times'][n]
ts, pos = (self.data['wheel_timestamps'], self.data['wheel_position'])
wh_idx = np.argmax(ts > t1)
if ts[wh_idx] > self.data['stimOnTrigger_times'][n]:
# No sample during quiescence; insert one
self.data['wheel_timestamps'] = np.insert(ts, wh_idx, t2 - .001)
self.data['wheel_position'] = np.insert(pos, wh_idx, np.inf)
else: # Otherwise make one sample infinite
self.data['wheel_position'][wh_idx] = np.inf
metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data)
self.assertFalse(passed[n])
self.assertTrue(metric[n] > 2)
[docs]
def test_check_wheel_move_before_feedback(self):
metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data)
nogo = self.data['choice'] == 0
self.assertTrue(np.all(passed[~nogo]))
self.assertTrue(np.isnan(metric[nogo]).all())
self.assertTrue(np.isnan(passed[nogo]).all())
# Remove wheel data around feedback for choice trial
assert self.data['choice'].any(), 'no choice trials in test data'
n = np.argmax(self.data['choice'] != 0) # Index of choice trial
mask = np.logical_xor(self.data['wheel_timestamps'] > self.data['feedback_times'][n] - 1,
self.data['wheel_timestamps'] < self.data['feedback_times'][n] + 1)
self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask]
self.data['wheel_position'] = self.data['wheel_position'][mask]
metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data)
self.assertFalse(passed[n] or metric[n] != 0)
[docs]
def test_check_wheel_move_during_closed_loop(self):
gain = self.wheel_gain or 4
metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain)
nogo = self.data['choice'] == 0
self.assertTrue(np.all(passed[~nogo]))
self.assertTrue(np.isnan(metric[nogo]).all())
self.assertTrue(np.isnan(passed[nogo]).all())
# Remove wheel data for choice trial
assert self.data['choice'].any(), 'no choice trials in test data'
n = np.argmax(self.data['choice'] != 0) # Index of choice trial
mask = np.logical_xor(self.data['wheel_timestamps'] < self.data['goCue_times'][n],
self.data['wheel_timestamps'] > self.data['response_times'][n])
self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask]
self.data['wheel_position'] = self.data['wheel_position'][mask]
metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain)
self.assertFalse(passed[n])
[docs]
def test_check_wheel_integrity(self):
metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1')
self.assertTrue(np.all(passed))
# Insert some violations and verify that they're caught
idx = np.random.randint(self.data['wheel_timestamps'].size, size=2)
self.data['wheel_timestamps'][idx[0] + 1] -= 1
self.data['wheel_position'][idx[1]] -= 1
metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1')
self.assertFalse(passed[idx].any())
[docs]
def test_check_n_trial_events(self):
metric, passed = qcmetrics.check_n_trial_events(self.data)
self.assertTrue(np.all(passed == 1.) and np.all(metric))
# Change errorCueTriggers
id = np.argmax(self.data['correct'])
self.data['errorCueTrigger_times'][id] = self.data['intervals'][id, 0] + np.random.rand()
_, passed = qcmetrics.check_n_trial_events(self.data)
self.assertFalse(passed[id])
# Change another event
id = id - 1 if id > 0 else id + 1
self.data['goCue_times'][id] = self.data['intervals'][id, 1] + np.random.rand()
_, passed = qcmetrics.check_n_trial_events(self.data)
self.assertFalse(passed[id])
[docs]
def test_check_detected_wheel_moves(self):
metric, passed = qcmetrics.check_detected_wheel_moves(self.data)
self.assertTrue(np.all(self.data['firstMovement_times'] == metric))
self.assertTrue(np.all(passed))
# Change a movement time
id = np.argmax(self.data['choice'] != 0)
self.data['firstMovement_times'][id] = self.data['goCue_times'][id] - 0.3
_, passed = qcmetrics.check_detected_wheel_moves(self.data)
self.assertEqual(0.75, np.nanmean(passed))
# Change the min_qt
_, passed = qcmetrics.check_detected_wheel_moves(self.data, min_qt=0.3)
self.assertTrue(np.all(passed))
[docs]
@unittest.skip('not implemented')
def test_check_stimulus_move_before_goCue(self):
pass # TODO Nicco?
[docs]
def test_check_stimOff_itiIn_delays(self):
metric, passed = qcmetrics.check_stimOff_itiIn_delays(self.data)
self.assertTrue(np.nanmean(passed))
# No go should be NaN
id = np.argmax(self.data['choice'] == 0)
self.assertTrue(np.isnan(passed[id]), 'No go trials should be excluded')
# Change a trial
id = np.argmax(self.data['choice'] != 0)
self.data['stimOff_times'][id] = self.data['itiIn_times'][id] + 1e-4
_, passed = qcmetrics.check_stimOff_itiIn_delays(self.data) # recompute
self.assertEqual(0.75, np.nanmean(passed))
[docs]
def test_check_iti_delays(self):
metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=True)
# We want the metric to return positive values that are close to 0.1, given the test data
self.assertTrue(np.allclose(metric[:-1], 1e-2, atol=0.001),
'failed to return correct metric')
self.assertTrue(np.isnan(metric[-1]), 'last trial should be NaN')
self.assertTrue(np.all(passed))
# Paused trials should fail when subtract_pauses is False
pauses = self.data['pause_duration'][:-1]
metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=False)
self.assertTrue(np.allclose(metric[:-1], pauses + 1e-2, atol=0.001))
self.assertFalse(np.any(passed[:-1][pauses > 0]))
# Mess up a trial
id = 3
self.data['intervals'][id + 1, 0] += 0.5 # Next trial starts 0.5 sec later
metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=True)
n_trials = len(self.data['stimOff_times']) - 1 # Last trial NaN here
expected = (n_trials - 1) / n_trials
self.assertTrue(expected, np.nanmean(passed))
[docs]
@unittest.skip('not implemented')
def test_check_frame_frequency(self):
pass # TODO Miles
[docs]
@unittest.skip('not implemented')
def test_check_frame_updates(self):
pass # TODO Nicco?
[docs]
class TestHabituationQC(unittest.TestCase):
"""Test HabituationQC class
NB: For complete coverage this should be run along slide the integration tests
"""
[docs]
def setUp(self):
eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7'
one = ONE(**TEST_DB)
self.qc = qcmetrics.HabituationQC(eid, one=one)
# Dummy extractor obj
self.qc.extractor = Bunch({'data': self.load_fake_bpod_data(), 'settings': {}})
[docs]
@staticmethod
def load_fake_bpod_data(n=5):
"""Create fake extractor output of bpodqc.load_data
:param n: the number of trials
:return: a dict of simulated trial data
"""
trigg_delay = 1e-4 # an ideal delay between triggers and measured times
iti_length = 0.5 # the so-called 'inter-trial interval'
blank_length = 1. # the time between trial start and stim on
stimCenter_length = 1. # the length of time the stimulus is in the center
# the lengths of time between stim on and stim center
stimOn_length = np.random.normal(size=(n,)) + 10
# trial lengths include couple small trigger delays and iti
trial_lengths = blank_length + stimOn_length + 1e-1 + stimCenter_length
start_times = np.concatenate(([0], np.cumsum(trial_lengths)[:-1]))
end_times = np.cumsum(trial_lengths) - 1e-2
data = {
'phase': np.random.uniform(low=0, high=2 * np.pi, size=(n,)),
'stimOnTrigger_times': start_times + blank_length,
'intervals': np.c_[start_times, end_times],
'itiIn_times': end_times - iti_length,
'position': np.random.choice([-1, 1], n, replace=True) * 35,
'feedbackType': np.ones(n),
'feedback_times': end_times - 0.5,
'rewardVolume': np.ones(n) * 3.,
'stimOff_times': end_times + trigg_delay,
'stimOffTrigger_times': end_times
}
data['stimOn_times'] = data['stimOnTrigger_times'] + trigg_delay
data['goCueTrigger_times'] = data['stimOnTrigger_times']
data['goCue_times'] = data['goCueTrigger_times'] + trigg_delay
data['stimCenter_times'] = data['feedback_times'] - 0.5
data['stimCenterTrigger_times'] = data['stimCenter_times'] - trigg_delay
data['valveOpen_times'] = data['feedback_times']
return data
[docs]
def test_compute(self):
# All should pass except one NOT_SET
self.qc.compute()
self.assertIsNotNone(self.qc.metrics)
_, _, outcomes = self.qc.compute_session_status()
if self.qc.passed['_task_habituation_time'] is None:
self.assertEqual(outcomes['_task_habituation_time'], spec.QC.NOT_SET)
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)