Source code for ibllib.tests.qc.test_task_metrics

import unittest
from unittest import mock
from functools import partial
from pathlib import Path
from uuid import uuid4

import numpy as np

from iblutil.util import Bunch
from one.api import ONE
from one.alf import spec
from ibllib.tests import TEST_DB
from ibllib.qc import task_metrics as qcmetrics

from brainbox.behavior.wheel import cm_to_rad


def _create_test_qc_outcomes():
    """Create task QC outcomes dict.

     Used by TestAggregateOutcome.test_compute_dateset_qc_status and TestDatasetQC.
     """
    outcomes = {'_task_' + k[6:]: spec.QC.NOT_SET for k in qcmetrics.TaskQC._get_checks(...)}
    outcomes['_task_reward_volumes'] = outcomes['_task_stimOn_delays'] = spec.QC.WARNING
    outcomes['_task_reward_volume_set'] = outcomes['_task_goCue_delays'] = spec.QC.FAIL
    outcomes['_task_errorCue_delays'] = outcomes['_task_stimOff_delays'] = spec.QC.PASS
    outcomes['_task_iti_delays'] = spec.QC.CRITICAL
    return outcomes


[docs] class TestAggregateOutcome(unittest.TestCase):
[docs] def test_outcome_from_dict_default(self): # For a task that has no costume thresholds, default is 0.99 PASS and 0.9 WARNING and 0 FAIL, # np.nan and None return not set qc_dict = {'gnap': .99, 'gnop': np.nan, 'gnip': None, 'gnep': 0.9, 'gnup': 0.89} expect = { 'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, 'gnip': spec.QC.NOT_SET, 'gnep': spec.QC.WARNING, 'gnup': spec.QC.FAIL} outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA) self.assertEqual(outcome, spec.QC.FAIL) self.assertEqual(expect, outcome_dict)
[docs] def test_outcome_from_dict_stimFreeze_delays(self): # For '_task_stimFreeze_delays' the threshold are 0.99 PASS and 0 WARNING qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_stimFreeze_delays': .1} expect = {'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, '_task_stimFreeze_delays': spec.QC.WARNING} outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA) self.assertEqual(outcome, spec.QC.WARNING) self.assertEqual(expect, outcome_dict)
[docs] def test_outcome_from_dict_iti_delays(self): # For '_task_iti_delays' the threshold is 0 NOT_SET qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_iti_delays': .1} expect = {'gnap': spec.QC.PASS, 'gnop': spec.QC.NOT_SET, '_task_iti_delays': spec.QC.NOT_SET} outcome, outcome_dict = qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA) self.assertEqual(outcome, spec.QC.PASS) self.assertEqual(expect, outcome_dict)
[docs] def test_out_of_bounds(self): # When qc values are below 0 or above 1, give error qc_dict = {'gnap': 1.01, 'gnop': 0, 'gnip': 0.99} with self.assertRaises(ValueError) as e: qcmetrics.compute_session_status_from_dict(qc_dict, qcmetrics.BWM_CRITERIA) self.assertTrue(e.exception.args[0] == 'Values out of bound')
[docs] def test_compute_dateset_qc_status(self): """Test TaskQC.compute_dateset_qc_status method.""" outcomes = _create_test_qc_outcomes() dataset_outcomes = qcmetrics.TaskQC.compute_dataset_qc_status(outcomes) expected = {'_ibl_trials.stimOff_times': spec.QC.PASS, '_ibl_trials.table': { 'intervals': spec.QC.CRITICAL, 'goCue_times': spec.QC.FAIL, 'response_times': spec.QC.NOT_SET, 'choice': spec.QC.NOT_SET, 'stimOn_times': spec.QC.WARNING, 'contrastLeft': spec.QC.NOT_SET, 'contrastRight': spec.QC.NOT_SET, 'feedbackType': spec.QC.NOT_SET, 'probabilityLeft': spec.QC.NOT_SET, 'feedback_times': spec.QC.PASS, 'firstMovement_times': spec.QC.NOT_SET}} self.assertDictEqual(expected, dataset_outcomes)
[docs] class TestDatasetQC(unittest.TestCase):
[docs] def test_update_dataset_qc(self): """Test task_metrics.update_dataset_qc function.""" registered_datasets = [ {'name': '_ibl_trials.table.pqt', 'qc': 'NOT_SET', 'id': str(uuid4())}, {'name': '_ibl_other.intervals.npy', 'qc': 'PASS', 'id': str(uuid4())}, {'name': '_ibl_trials.stimOff_times.npy', 'qc': 'NOT_SET', 'id': str(uuid4())} ] one = mock.MagicMock() one.alyx.get.side_effect = lambda *args, **kwargs: {'qc': spec.QC.NOT_SET.name, 'json': {'extended_qc': None}} one.alyx.rest.side_effect = lambda *args, **kwargs: kwargs.get('data') one.offline = False qc = qcmetrics.TaskQC('subject/2020-01-01/001', one=one) task_qc_results = (spec.QC.CRITICAL, {}, _create_test_qc_outcomes()) # Inject some toy trials QC results with mock.patch.object(qc, 'compute_session_status', return_value=task_qc_results): out = qcmetrics.update_dataset_qc(qc, registered_datasets.copy(), one, override=False) self.assertEqual(3, len(out)) self.assertEqual(['CRITICAL', 'PASS', 'PASS'], [x['qc'] for x in out]) # Check extended qc extended_qc = one.alyx.json_field_update.call_args.kwargs.get('data', {}).get('extended_qc', {}) # Check a few of the fields self.assertEqual(spec.QC.WARNING, extended_qc.get('stimOn_times')) self.assertEqual(spec.QC.CRITICAL, extended_qc.get('intervals')) self.assertEqual(spec.QC.FAIL, extended_qc.get('goCue_times')) self.assertEqual(spec.QC.NOT_SET, extended_qc.get('response_times')) # Test behaviour when dataset QC not in registered datasets list one.reset_mock() with mock.patch.object(qc, 'compute_session_status', return_value=task_qc_results), \ mock.patch.object(qc, 'compute_dataset_qc_status', return_value={'_ibl_foo.bar': spec.QC.PASS}), \ self.assertLogs(qcmetrics.__name__, level=10) as cm: out = qcmetrics.update_dataset_qc(qc, registered_datasets.copy(), one, override=False) self.assertEqual(registered_datasets, out) self.assertIn('dataset _ibl_foo.bar not registered', cm.output[-1]) one.alyx.get.assert_not_called() one.alyx.rest.assert_not_called() # Test assertion on duplicate dataset stems registered_datasets.append({ 'name': '_ibl_other.intervals.csv', 'qc': 'FAIL', 'id': str(uuid4()) }) self.assertRaises(AssertionError, qcmetrics.update_dataset_qc, qc, registered_datasets.copy(), one)
[docs] class TestTaskMetrics(unittest.TestCase):
[docs] def setUp(self): self.data = self.load_fake_bpod_data() self.wheel_gain = 4 wheel_data = self.load_fake_wheel_data(self.data, wheel_gain=self.wheel_gain) self.data.update(wheel_data)
[docs] @staticmethod def load_fake_bpod_data(n=5): """Create fake extractor output of bpodqc.load_data :param n: the number of trials :return: a dict of simulated trial data """ trigg_delay = 1e-4 # an ideal delay between triggers and measured times resp_feeback_delay = 1e-3 # delay between feedback and response stimOff_itiIn_delay = 5e-3 # delay between stimOff and itiIn N = partial(np.random.normal, (n,)) # Convenience function for norm dist sampling choice = np.ones((n,), dtype=int) choice[[1, 3]] = -1 # a couple of incorrect trials choice[0] = 0 # a nogo trial # One trial of each type incorrect correct = choice != 0 correct[np.argmax(choice == 1)] = 0 correct[np.argmax(choice == -1)] = 0 pauses = np.zeros(n, dtype=float) # add a 5s pause on 3rd trial pauses[2] = 5. quiescence_length = 0.2 + np.random.standard_exponential(size=(n,)) iti_length = .5 # inter-trial interval # trial lengths include quiescence period, a couple small trigger delays and iti trial_lengths = quiescence_length + resp_feeback_delay + (trigg_delay * 4) + iti_length # add on 60 + 2s for nogos + feedback time (1 or 2s) + ~0.5s for other responses trial_lengths += (choice == 0) * 60 + (~correct + 1) + (choice != 0) * N(0.5) start_times = (np.r_[0, np.cumsum(trial_lengths)] + np.r_[0, np.cumsum(pauses)])[:-1] end_times = np.cumsum(trial_lengths) - 1e-2 + np.r_[0, np.cumsum(pauses)][:-1] data = { 'phase': np.random.uniform(low=0, high=2 * np.pi, size=(n,)), 'quiescence': quiescence_length, 'choice': choice, 'correct': correct, 'intervals': np.c_[start_times, end_times], 'itiIn_times': end_times - iti_length + stimOff_itiIn_delay, 'position': np.ones_like(choice) * 35, 'pause_duration': pauses } data['stimOnTrigger_times'] = start_times + data['quiescence'] + 1e-4 data['stimOn_times'] = data['stimOnTrigger_times'] + 1e-1 data['goCueTrigger_times'] = data['stimOn_times'] + 1e-3 data['goCue_times'] = data['goCueTrigger_times'] + trigg_delay data['response_times'] = end_times - ( resp_feeback_delay + iti_length + (~correct + 1) ) data['feedback_times'] = data['response_times'] + resp_feeback_delay data['stimFreeze_times'] = data['response_times'] + 1e-2 data['stimFreezeTrigger_times'] = data['stimFreeze_times'] - trigg_delay data['feedbackType'] = np.vectorize(lambda x: -1 if x == 0 else x)(data['correct']) outcome = data['feedbackType'].copy() outcome[data['choice'] == 0] = 0 data['outcome'] = outcome # Delay of 1 second if correct, 2 seconds if incorrect, and stim off at feedback for nogo data['stimOffTrigger_times'] = data['feedback_times'] + (~correct + 1) - (choice == 0) * 2 data['stimOff_times'] = data['stimOffTrigger_times'] + trigg_delay # Error tone times nan on incorrect trials outcome_times = np.vectorize(lambda x, y: x + 1e-2 if y else np.nan) data['errorCueTrigger_times'] = outcome_times(data['feedback_times'], ~data['correct']) data['errorCue_times'] = data['errorCueTrigger_times'] + trigg_delay data['valveOpen_times'] = outcome_times(data['feedback_times'], data['correct']) data['rewardVolume'] = ~np.isnan(data['valveOpen_times']) * 3.0 return data
[docs] @staticmethod def load_fake_wheel_data(trial_data, wheel_gain=4): # Load a wheel fragment: a numpy array of the form [timestamps, positions], for a wheel # movement during one trial. Wheel is X1 bpod RE in radians. wh_path = Path(__file__).parent.joinpath('..', 'fixtures', 'qc').resolve() wheel_frag = np.load(wh_path.joinpath('wheel.npy')) resolution = np.mean(np.abs(np.diff(wheel_frag[:, 1]))) # pos diff between samples # abs displacement, s, in mm required to move 35 visual degrees POS_THRESH = 35 s_mm = np.abs(POS_THRESH / wheel_gain) # don't care about direction # convert abs displacement to radians (wheel pos is in rad) pos_thresh = cm_to_rad(s_mm * 1e-1) # index of threshold cross pos_thresh_idx = np.argmax(np.abs(wheel_frag[:, 1]) > pos_thresh) def qt_wheel_fill(start, end, t_step=0.001, p_step=None): if p_step is None: p_step = 2 * np.pi / 1024 t = np.arange(start, end, t_step) p = np.random.randint(-1, 2, len(t)) t = t[p != 0] p = p[p != 0].cumsum() * p_step return t, p wheel_data = [] # List generated of wheel data fragments movement_times = [] # List of generated first movement times def add_frag(t, p): """Add wheel data fragments to list, adjusting positions to be within one sample of one another""" last_samp = getattr(add_frag, 'last_samp', (0, 0)) p += last_samp[1] if np.abs(p[0] - last_samp[1]) == 0: p += resolution wheel_data.append((t, p)) add_frag.last_samp = (t[-1], p[-1]) for i in np.arange(len(trial_data['choice'])): # Iterate over trials generating wheel samples for the necessary periods # trial start to stim on; should be below quiescence threshold stimOn_trig = trial_data['stimOnTrigger_times'][i] trial_start = trial_data['intervals'][i, 0] t, p = qt_wheel_fill(trial_start, stimOn_trig, .5, resolution) if len(t) > 0: # Possible for no movement during quiescence add_frag(t, p) # stim on to trial end trial_end = trial_data['intervals'][i, 1] if trial_data['choice'][i] == 0: # Add random wheel movements for duration of trial goCue = trial_data['goCue_times'][i] t, p = qt_wheel_fill(goCue, trial_end, .1, resolution) add_frag(t, p) movement_times.append(t[0]) else: # Align wheel fragment with response time response_time = trial_data['response_times'][i] t = wheel_frag[:, 0] + response_time - wheel_frag[pos_thresh_idx, 0] p = np.abs(wheel_frag[:, 1]) * trial_data['choice'][i] assert t[0] > add_frag.last_samp[0] movement_times.append(t[1]) add_frag(t, p) # Fill in random movements between end of response and trial t, p = qt_wheel_fill(t[-1] + 0.01, trial_end, p_step=resolution) add_frag(t, p) # Stitch wheel fragments and assert no skips wheel_data = np.concatenate(list(map(np.column_stack, wheel_data))) assert np.all(np.diff(wheel_data[:, 0]) > 0), 'timestamps don\'t strictly increase' np.testing.assert_allclose(np.abs(np.diff(wheel_data[:, 1])), resolution) assert len(movement_times) == trial_data['intervals'].shape[0] return { 'wheel_timestamps': wheel_data[:, 0], 'wheel_position': wheel_data[:, 1], 'firstMovement_times': np.array(movement_times) }
[docs] def test_check_stimOn_goCue_delays(self): metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data) self.assertTrue(np.allclose(metric, 0.0011), 'failed to return correct metric') # Set incorrect timestamp (goCue occurs before stimOn) self.data['goCue_times'][-1] = self.data['stimOn_times'][-1] - 1e-4 metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data) n = len(self.data['stimOn_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_response_feedback_delays(self): metric, passed = qcmetrics.check_response_feedback_delays(self.data) self.assertTrue(np.allclose(metric, 0.001), 'failed to return correct metric') # Set incorrect timestamp (feedback occurs before response) self.data['feedback_times'][-1] = self.data['response_times'][-1] - 1e-4 metric, passed = qcmetrics.check_response_feedback_delays(self.data) n = len(self.data['feedback_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_response_stimFreeze_delays(self): metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data) self.assertTrue(np.allclose(metric, 1e-2), 'failed to return correct metric') # Set incorrect timestamp (stimFreeze occurs before response) self.data['stimFreeze_times'][-1] = self.data['response_times'][-1] - 1e-4 metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data) n = len(self.data['feedback_times']) - np.sum(self.data['choice'] == 0) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_positive_feedback_stimOff_delays(self): metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data) self.assertTrue( np.allclose(metric[self.data['correct']], 1e-4), 'failed to return correct metric' ) # Set incorrect timestamp (stimOff occurs just after response) id = np.argmax(self.data['correct']) self.data['stimOff_times'][id] = self.data['response_times'][id] + 1e-2 metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data) expected = (self.data['correct'].sum() - 1) / self.data['correct'].sum() self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_negative_feedback_stimOff_delays(self): err_trial = ~self.data['correct'] metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data) values = np.abs(metric[err_trial]) self.assertTrue(np.allclose(values, 1e-2), 'failed to return correct metric') # Set incorrect timestamp (stimOff occurs 1s after response) id = np.argmax(err_trial) self.data['stimOff_times'][id] = self.data['response_times'][id] + 1 metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data) expected = (err_trial.sum() - 1) / err_trial.sum() self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_error_trial_event_sequence(self): metric, passed = qcmetrics.check_error_trial_event_sequence(self.data) self.assertTrue(np.all(metric == ~self.data['correct']), 'failed to return correct metric') self.assertTrue(np.all(passed)) # Set incorrect timestamp (itiIn occurs before errorCue) err_trial = ~self.data['correct'] (id,) = np.where(err_trial) self.data['intervals'][id[0], 0] = np.inf self.data['errorCue_times'][id[1]] = 0 metric, passed = qcmetrics.check_error_trial_event_sequence(self.data) expected = (err_trial.sum() - 2) / err_trial.sum() self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_correct_trial_event_sequence(self): metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data) self.assertTrue(np.all(metric == self.data['correct']), 'failed to return correct metric') self.assertTrue(np.all(passed)) # Set incorrect timestamp correct = self.data['correct'] id = np.argmax(correct) self.data['intervals'][id, 0] = np.inf metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data) expected = (correct.sum() - 1) / correct.sum() self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_trial_length(self): metric, passed = qcmetrics.check_trial_length(self.data) self.assertTrue(np.all(metric), 'failed to return correct metric') # Set incorrect timestamp self.data['goCue_times'][-1] = 0 metric, passed = qcmetrics.check_trial_length(self.data) n = len(self.data['goCue_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_goCue_delays(self): metric, passed = qcmetrics.check_goCue_delays(self.data) self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric') # Set incorrect timestamp self.data['goCue_times'][1] = self.data['goCueTrigger_times'][1] + 0.1 metric, passed = qcmetrics.check_goCue_delays(self.data) n = len(self.data['goCue_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_errorCue_delays(self): metric, passed = qcmetrics.check_errorCue_delays(self.data) err_trial = ~self.data['correct'] self.assertTrue(np.allclose(metric[err_trial], 1e-4), 'failed to return correct metric') # Set incorrect timestamp id = np.argmax(err_trial) self.data['errorCue_times'][id] = self.data['errorCueTrigger_times'][id] + 0.1 metric, passed = qcmetrics.check_errorCue_delays(self.data) n = err_trial.sum() expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_stimOn_delays(self): metric, passed = qcmetrics.check_stimOn_delays(self.data) self.assertTrue(np.allclose(metric, 1e-1), 'failed to return correct metric') # Set incorrect timestamp self.data['stimOn_times'][-1] = self.data['stimOnTrigger_times'][-1] + 0.2 metric, passed = qcmetrics.check_stimOn_delays(self.data) n = len(self.data['stimOn_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_stimOff_delays(self): metric, passed = qcmetrics.check_stimOff_delays(self.data) self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric') # Set incorrect timestamp self.data['stimOff_times'][-1] = self.data['stimOffTrigger_times'][-1] + 0.2 metric, passed = qcmetrics.check_stimOff_delays(self.data) n = len(self.data['stimOff_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_stimFreeze_delays(self): metric, passed = qcmetrics.check_stimFreeze_delays(self.data) self.assertTrue(np.allclose(metric, 1e-4), 'failed to return correct metric') # Set incorrect timestamp self.data['stimFreeze_times'][-1] = self.data['stimFreezeTrigger_times'][-1] + 0.2 metric, passed = qcmetrics.check_stimFreeze_delays(self.data) n = len(self.data['stimFreeze_times']) expected = (n - 1) / n self.assertEqual(np.nanmean(passed), expected, 'failed to detect dodgy timestamp')
[docs] def test_check_reward_volumes(self): metric, passed = qcmetrics.check_reward_volumes(self.data) self.assertTrue(all(x in {0.0, 3.0} for x in metric), 'failed to return correct metric') self.assertTrue(np.all(passed)) # Set incorrect volume id = np.array([np.argmax(self.data['correct']), np.argmax(~self.data['correct'])]) self.data['rewardVolume'][id] = self.data['rewardVolume'][id] + 1 metric, passed = qcmetrics.check_reward_volumes(self.data) self.assertTrue(np.mean(passed) == 0.6, 'failed to detect incorrect reward volumes')
[docs] def test_check_reward_volume_set(self): metric, passed = qcmetrics.check_reward_volume_set(self.data) self.assertTrue(all(x in {0.0, 3.0} for x in metric), 'failed to return correct metric') self.assertTrue(passed) # Add a new volume to the set id = np.argmax(self.data['correct']) self.data['rewardVolume'][id] = 2.3 metric, passed = qcmetrics.check_reward_volume_set(self.data) self.assertFalse(passed, 'failed to detect incorrect reward volume set') # Set 0 volumes to new value; set length still 2 but should fail anyway self.data['rewardVolume'][~self.data['correct']] = 2.3 metric, passed = qcmetrics.check_reward_volume_set(self.data) self.assertFalse(passed, 'failed to detect incorrect reward volume set')
[docs] def test_check_audio_pre_trial(self): # Create Sound sync fake data that is OK BNC2_OK = { 'times': self.data['goCue_times'] + 1e-1, 'polarities': np.array([1, -1, 1, -1, 1]), } # Create Sound sync fake data that is NOT OK BNC2_NOK = { 'times': self.data['goCue_times'] - 1e-1, 'polarities': np.array([1, -1, 1, -1, 1]), } metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_OK) self.assertTrue(~np.all(metric)) self.assertTrue(np.all(passed)) metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_NOK) self.assertTrue(np.all(metric)) self.assertTrue(~np.all(passed))
[docs] def test_check_wheel_freeze_during_quiescence(self): metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data) self.assertTrue(np.all(passed)) # Make one trial move more n = 1 # Index of trial to manipulate t1 = self.data['intervals'][n, 0] t2 = self.data['stimOnTrigger_times'][n] ts, pos = (self.data['wheel_timestamps'], self.data['wheel_position']) wh_idx = np.argmax(ts > t1) if ts[wh_idx] > self.data['stimOnTrigger_times'][n]: # No sample during quiescence; insert one self.data['wheel_timestamps'] = np.insert(ts, wh_idx, t2 - .001) self.data['wheel_position'] = np.insert(pos, wh_idx, np.inf) else: # Otherwise make one sample infinite self.data['wheel_position'][wh_idx] = np.inf metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data) self.assertFalse(passed[n]) self.assertTrue(metric[n] > 2)
[docs] def test_check_wheel_move_before_feedback(self): metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data) nogo = self.data['choice'] == 0 self.assertTrue(np.all(passed[~nogo])) self.assertTrue(np.isnan(metric[nogo]).all()) self.assertTrue(np.isnan(passed[nogo]).all()) # Remove wheel data around feedback for choice trial assert self.data['choice'].any(), 'no choice trials in test data' n = np.argmax(self.data['choice'] != 0) # Index of choice trial mask = np.logical_xor(self.data['wheel_timestamps'] > self.data['feedback_times'][n] - 1, self.data['wheel_timestamps'] < self.data['feedback_times'][n] + 1) self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask] self.data['wheel_position'] = self.data['wheel_position'][mask] metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data) self.assertFalse(passed[n] or metric[n] != 0)
[docs] def test_check_wheel_move_during_closed_loop(self): gain = self.wheel_gain or 4 metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain) nogo = self.data['choice'] == 0 self.assertTrue(np.all(passed[~nogo])) self.assertTrue(np.isnan(metric[nogo]).all()) self.assertTrue(np.isnan(passed[nogo]).all()) # Remove wheel data for choice trial assert self.data['choice'].any(), 'no choice trials in test data' n = np.argmax(self.data['choice'] != 0) # Index of choice trial mask = np.logical_xor(self.data['wheel_timestamps'] < self.data['goCue_times'][n], self.data['wheel_timestamps'] > self.data['response_times'][n]) self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask] self.data['wheel_position'] = self.data['wheel_position'][mask] metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain) self.assertFalse(passed[n])
[docs] def test_check_wheel_integrity(self): metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1') self.assertTrue(np.all(passed)) # Insert some violations and verify that they're caught idx = np.random.randint(self.data['wheel_timestamps'].size, size=2) self.data['wheel_timestamps'][idx[0] + 1] -= 1 self.data['wheel_position'][idx[1]] -= 1 metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1') self.assertFalse(passed[idx].any())
[docs] def test_check_n_trial_events(self): metric, passed = qcmetrics.check_n_trial_events(self.data) self.assertTrue(np.all(passed == 1.) and np.all(metric)) # Change errorCueTriggers id = np.argmax(self.data['correct']) self.data['errorCueTrigger_times'][id] = self.data['intervals'][id, 0] + np.random.rand() _, passed = qcmetrics.check_n_trial_events(self.data) self.assertFalse(passed[id]) # Change another event id = id - 1 if id > 0 else id + 1 self.data['goCue_times'][id] = self.data['intervals'][id, 1] + np.random.rand() _, passed = qcmetrics.check_n_trial_events(self.data) self.assertFalse(passed[id])
[docs] def test_check_detected_wheel_moves(self): metric, passed = qcmetrics.check_detected_wheel_moves(self.data) self.assertTrue(np.all(self.data['firstMovement_times'] == metric)) self.assertTrue(np.all(passed)) # Change a movement time id = np.argmax(self.data['choice'] != 0) self.data['firstMovement_times'][id] = self.data['goCue_times'][id] - 0.3 _, passed = qcmetrics.check_detected_wheel_moves(self.data) self.assertEqual(0.75, np.nanmean(passed)) # Change the min_qt _, passed = qcmetrics.check_detected_wheel_moves(self.data, min_qt=0.3) self.assertTrue(np.all(passed))
[docs] @unittest.skip('not implemented') def test_check_stimulus_move_before_goCue(self): pass # TODO Nicco?
[docs] def test_check_stimOff_itiIn_delays(self): metric, passed = qcmetrics.check_stimOff_itiIn_delays(self.data) self.assertTrue(np.nanmean(passed)) # No go should be NaN id = np.argmax(self.data['choice'] == 0) self.assertTrue(np.isnan(passed[id]), 'No go trials should be excluded') # Change a trial id = np.argmax(self.data['choice'] != 0) self.data['stimOff_times'][id] = self.data['itiIn_times'][id] + 1e-4 _, passed = qcmetrics.check_stimOff_itiIn_delays(self.data) # recompute self.assertEqual(0.75, np.nanmean(passed))
[docs] def test_check_iti_delays(self): metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=True) # We want the metric to return positive values that are close to 0.1, given the test data self.assertTrue(np.allclose(metric[:-1], 1e-2, atol=0.001), 'failed to return correct metric') self.assertTrue(np.isnan(metric[-1]), 'last trial should be NaN') self.assertTrue(np.all(passed)) # Paused trials should fail when subtract_pauses is False pauses = self.data['pause_duration'][:-1] metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=False) self.assertTrue(np.allclose(metric[:-1], pauses + 1e-2, atol=0.001)) self.assertFalse(np.any(passed[:-1][pauses > 0])) # Mess up a trial id = 3 self.data['intervals'][id + 1, 0] += 0.5 # Next trial starts 0.5 sec later metric, passed = qcmetrics.check_iti_delays(self.data, subtract_pauses=True) n_trials = len(self.data['stimOff_times']) - 1 # Last trial NaN here expected = (n_trials - 1) / n_trials self.assertTrue(expected, np.nanmean(passed))
[docs] @unittest.skip('not implemented') def test_check_frame_frequency(self): pass # TODO Miles
[docs] @unittest.skip('not implemented') def test_check_frame_updates(self): pass # TODO Nicco?
[docs] class TestHabituationQC(unittest.TestCase): """Test HabituationQC class NB: For complete coverage this should be run along slide the integration tests """
[docs] def setUp(self): eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7' one = ONE(**TEST_DB) self.qc = qcmetrics.HabituationQC(eid, one=one) # Dummy extractor obj self.qc.extractor = Bunch({'data': self.load_fake_bpod_data(), 'settings': {}})
[docs] @staticmethod def load_fake_bpod_data(n=5): """Create fake extractor output of bpodqc.load_data :param n: the number of trials :return: a dict of simulated trial data """ trigg_delay = 1e-4 # an ideal delay between triggers and measured times iti_length = 0.5 # the so-called 'inter-trial interval' blank_length = 1. # the time between trial start and stim on stimCenter_length = 1. # the length of time the stimulus is in the center # the lengths of time between stim on and stim center stimOn_length = np.random.normal(size=(n,)) + 10 # trial lengths include couple small trigger delays and iti trial_lengths = blank_length + stimOn_length + 1e-1 + stimCenter_length start_times = np.concatenate(([0], np.cumsum(trial_lengths)[:-1])) end_times = np.cumsum(trial_lengths) - 1e-2 data = { 'phase': np.random.uniform(low=0, high=2 * np.pi, size=(n,)), 'stimOnTrigger_times': start_times + blank_length, 'intervals': np.c_[start_times, end_times], 'itiIn_times': end_times - iti_length, 'position': np.random.choice([-1, 1], n, replace=True) * 35, 'feedbackType': np.ones(n), 'feedback_times': end_times - 0.5, 'rewardVolume': np.ones(n) * 3., 'stimOff_times': end_times + trigg_delay, 'stimOffTrigger_times': end_times } data['stimOn_times'] = data['stimOnTrigger_times'] + trigg_delay data['goCueTrigger_times'] = data['stimOnTrigger_times'] data['goCue_times'] = data['goCueTrigger_times'] + trigg_delay data['stimCenter_times'] = data['feedback_times'] - 0.5 data['stimCenterTrigger_times'] = data['stimCenter_times'] - trigg_delay data['valveOpen_times'] = data['feedback_times'] return data
[docs] def test_compute(self): # All should pass except one NOT_SET self.qc.compute() self.assertIsNotNone(self.qc.metrics) _, _, outcomes = self.qc.compute_session_status() if self.qc.passed['_task_habituation_time'] is None: self.assertEqual(outcomes['_task_habituation_time'], spec.QC.NOT_SET)
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)