Source code for ibllib.qc.task_metrics

"""Behaviour QC.

This module runs a list of quality control metrics on the behaviour data.

.. warning::
    The QC should be loaded using :meth:`ibllib.pipes.base_tasks.BehaviourTask.run_qc` and not
    instantiated directly.

Examples
--------
Running on a behaviour rig computer and updating QC fields in Alyx:

>>> from ibllib.qc.task_qc_viewer.task_qc import show_session_task_qc
>>> qc = show_session_task_qc(session_path, bpod_only=True, local=True)  # must close Viewer window
>>> qc = qc.run(update=True)

Downloading the required data and inspecting the QC on a different computer:

>>> from ibllib.pipes.dynamic_pipeline import get_trials_tasks
>>> from one.api import ONE
>>> task = get_trials_tasks(session_path, one=ONE())[0]  # get first task run
>>> task.location = 'remote'
>>> task.setUp()  # download required data
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()

Inspecting individual test outcomes

>>> outcome, results, outcomes = qc.compute_session_status()

Running bpod QC on ephys session (when not on behaviour rig PC)

>>> from ibllib.qc.task_qc_viewer.task_qc import get_bpod_trials_task, get_trials_tasks
>>> from one.api import ONE
>>> tasks = get_trials_tasks(session_path, one=ONE())
>>> task = get_bpod_trials_task(tasks[0])  # Ensure Bpod only on behaviour rig
>>> task.location = 'remote'
>>> task.setUp()  # download required data
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()

Running ephys QC, from local server PC (after ephys + bpod data have been copied to a same folder)

>>> from ibllib.pipes.dynamic_pipeline import get_trials_tasks
>>> task = get_trials_tasks(session_path, one=ONE())[0]  # get first task run
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()
"""
import logging
import sys
from packaging import version
from pathlib import Path, PurePosixPath
from datetime import datetime, timedelta
from inspect import getmembers, isfunction
from functools import reduce
from collections.abc import Sized

import numpy as np
from scipy.stats import chisquare

from brainbox.behavior.wheel import cm_to_rad, traces_by_trial
from ibllib.io.extractors import ephys_fpga
from one.alf import spec
from . import base

_log = logging.getLogger(__name__)

# todo the 2 followint parameters should be read from the task parameters for each session
ITI_DELAY_SECS = .5
FEEDBACK_NOGO_DELAY_SECS = 2

BWM_CRITERIA = {
    'default': {'PASS': 0.99, 'WARNING': 0.90, 'FAIL': 0},  # Note: WARNING was 0.95 prior to Aug 2022
    '_task_stimOff_itiIn_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_positive_feedback_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_negative_feedback_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_wheel_move_during_closed_loop': {'PASS': 0.99, 'WARNING': 0},
    '_task_response_stimFreeze_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_detected_wheel_moves': {'PASS': 0.99, 'WARNING': 0},
    '_task_trial_length': {'PASS': 0.99, 'WARNING': 0},
    '_task_goCue_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_errorCue_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_stimOn_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_stimFreeze_delays': {'PASS': 0.99, 'WARNING': 0},
    '_task_iti_delays': {'NOT_SET': 0},
    '_task_passed_trial_checks': {'NOT_SET': 0}
}


[docs] def compute_session_status_from_dict(results, criteria=None): """ Compute overall task QC value from QC check results. Given a dictionary of results, computes the overall session QC for each key and aggregates in a single value. Parameters ---------- results : dict A dictionary of QC keys containing (usually scalar) values. criteria : dict A dictionary of qc keys containing map of PASS, WARNING, FAIL thresholds. Returns ------- one.alf.spec.QC Overall session QC outcome. dict A map of QC tests and their outcomes. """ if not criteria: criteria = {'default': BWM_CRITERIA['default']} outcomes = {k: TaskQC.thresholding(v, thresholds=criteria.get(k, criteria['default'])) for k, v in results.items()} # Criteria map is in order of severity so the max index is our overall QC outcome session_outcome = base.QC.overall_outcome(outcomes.values()) return session_outcome, outcomes
[docs] def update_dataset_qc(qc, registered_datasets, one, override=False): """ Update QC values for individual datasets. Parameters ---------- qc : ibllib.qc.task_metrics.TaskQC A TaskQC object that has been run. registered_datasets : list of dict A list of Alyx dataset records. one : one.api.OneAlyx An online instance of ONE. override : bool If True the QC field is updated even if new value is better than previous. Returns ------- list of dict The list of registered datasets but with the 'qc' fields updated. """ # Create map of dataset name, sans extension, to dataset id stem2id = {PurePosixPath(dset['name']).stem: dset.get('id') for dset in registered_datasets} # Ensure dataset stems are unique assert len(stem2id) == len(registered_datasets), 'ambiguous dataset names' # dict of QC check to outcome (as enum value) *_, outcomes = qc.compute_session_status() # work over map of dataset name (sans extension) to outcome (enum or dict of columns: enum) for name, outcome in qc.compute_dataset_qc_status(outcomes).items(): # if outcome is a dict, calculate aggregate outcome for each column if isinstance(outcome, dict): extended_qc = outcome outcome = qc.overall_outcome(outcome.values()) else: extended_qc = {} # check if dataset was registered to Alyx if not (did := stem2id.get(name)): _log.debug('dataset %s not registered, skipping', name) continue # update the dataset QC value on Alyx if outcome > spec.QC.NOT_SET or override: dset_qc = base.QC(did, one=one, log=_log, endpoint='datasets') dset = next(x for x in registered_datasets if did == x.get('id')) dset['qc'] = dset_qc.update(outcome, namespace='', override=override).name if extended_qc: dset_qc.update_extended_qc(extended_qc) return registered_datasets
[docs] class TaskQC(base.QC): """Task QC for training, biased, and ephys choice world.""" criteria = BWM_CRITERIA extractor = None """ibllib.qc.task_extractors.TaskQCExtractor: A task extractor object containing raw and extracted data."""
[docs] @staticmethod def thresholding(qc_value, thresholds=None) -> spec.QC: """ Compute the outcome of a single key by applying thresholding. Parameters ---------- qc_value : float Proportion of passing qcs, between 0 and 1. thresholds : dict Dictionary with keys 'PASS', 'WARNING', 'FAIL', (or enum integers, c.f. one.alf.spec.QC). Returns ------- one.alf.spec.QC The outcome. """ thresholds = {spec.QC.validate(k): v for k, v in thresholds.items() or {}} MAX_BOUND, MIN_BOUND = (1, 0) if qc_value is None or np.isnan(qc_value): return spec.QC.NOT_SET elif (qc_value > MAX_BOUND) or (qc_value < MIN_BOUND): raise ValueError('Values out of bound') for crit in filter(None, sorted(spec.QC)): if crit in thresholds.keys() and qc_value >= thresholds[crit]: return crit # if None of this applies, return 'NOT_SET' return spec.QC.NOT_SET
def __init__(self, session_path_or_eid, **kwargs): """ Task QC for training, biased, and ephys choice world. :param session_path_or_eid: A session eid or path :param log: A logging.Logger instance, if None the 'ibllib' logger is used :param one: An ONE instance for fetching and setting the QC on Alyx """ # When an eid is provided, we will download the required data by default (if necessary) self.download_data = not spec.is_session_path(Path(session_path_or_eid)) super().__init__(session_path_or_eid, **kwargs) # Data self.extractor = None # Metrics and passed trials self.metrics = None self.passed = None # Criteria (initialize as outcomes vary by class, task, and hardware) self.criteria = BWM_CRITERIA.copy()
[docs] def compute(self, **kwargs): """Compute and store the QC metrics. Runs the QC on the session and stores a map of the metrics for each datapoint for each test, and a map of which datapoints passed for each test. Parameters ---------- bpod_only : bool If True no data is extracted from the FPGA for ephys sessions. """ assert self.extractor is not None ver = self.extractor.settings.get('IBLRIG_VERSION', '') or '0.0.0' if version.parse(ver) >= version.parse('8.0.0'): self.criteria['_task_iti_delays'] = {'PASS': 0.99, 'WARNING': 0} self.criteria['_task_passed_trial_checks'] = {'PASS': 0.7, 'WARNING': 0} else: self.criteria['_task_iti_delays'] = {'NOT_SET': 0} self.criteria['_task_passed_trial_checks'] = {'NOT_SET': 0} self.log.info(f'Session {self.session_path}: Running QC on behavior data...') self.get_bpodqc_metrics_frame( self.extractor.data, wheel_gain=self.extractor.settings['STIM_GAIN'], # The wheel gain photodiode=self.extractor.frame_ttls, audio=self.extractor.audio_ttls, re_encoding=self.extractor.wheel_encoding or 'X1', min_qt=self.extractor.settings.get('QUIESCENT_PERIOD') or 0.2, audio_output=self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown') )
def _get_checks(self): """ Find all methods that begin with 'check_'. Returns ------- Dict[str, function] A map of QC check function names and the corresponding functions that return `metric` (any), `passed` (bool). """ def is_metric(x): return isfunction(x) and x.__name__.startswith('check_') return dict(getmembers(sys.modules[__name__], is_metric))
[docs] def get_bpodqc_metrics_frame(self, data, **kwargs): """ Evaluate task QC metrics. Evaluates all the QC metric functions in this module (those starting with 'check') and returns the results. The optional kwargs listed below are passed to each QC metric function. Parameters ---------- data : dict The extracted task data. re_encoding : str {'X1', 'X2', 'X4'} The encoding configuration of the rotary encoder. enc_res : int The rotary encoder resolution as number of fronts per revolution. wheel_gain : float The STIM_GAIN task parameter. photodiode : dict The fronts from Bpod's BNC1 input or FPGA frame2ttl channel. audio : dict The fronts from Bpod's BNC2 input FPGA audio sync channel. min_qt : float The QUIESCENT_PERIOD task parameter. Returns ------- dict Map of checks and their QC metric values (1 per trial). dict Map of checks and a float array of which samples passed. """ # Find all methods that begin with 'check_' checks = self._get_checks() prefix = '_task_' # Extended QC fields will start with this # Method 'check_foobar' stored with key '_task_foobar' in metrics map qc_metrics_map = {prefix + k[6:]: fn(data, **kwargs) for k, fn in checks.items()} # Split metrics and passed frames self.metrics = {} self.passed = {} for k in qc_metrics_map: self.metrics[k], self.passed[k] = qc_metrics_map[k] # Add a check for trial level pass: did a given trial pass all checks? n_trials = data['intervals'].shape[0] # Trial-level checks return an array the length that equals the number of trials trial_level_passed = [m for m in self.passed.values() if isinstance(m, Sized) and len(m) == n_trials] name = prefix + 'passed_trial_checks' self.metrics[name] = reduce(np.logical_and, trial_level_passed or (None, None)) self.passed[name] = self.metrics[name].astype(float) if trial_level_passed else None
[docs] def run(self, update=False, namespace='task', **kwargs): """ Compute the QC outcomes and return overall task QC outcome. Parameters ---------- update : bool If True, updates the session QC fields on Alyx. namespace : str The namespace of the QC fields in the Alyx JSON field. bpod_only : bool If True no data is extracted from the FPGA for ephys sessions. Returns ------- str Overall task QC outcome. dict A map of QC tests and the proportion of data points that passed them. """ if self.metrics is None: self.compute(**kwargs) outcome, results, _ = self.compute_session_status() if update: self.update_extended_qc(results) self.update(outcome, namespace) return outcome, results
[docs] def compute_session_status(self): """ Compute the overall session QC for each key and aggregates in a single value. Returns ------- str Overall session QC outcome. dict A map of QC tests and the proportion of data points that passed them. dict A map of QC tests and their outcomes. """ if self.passed is None: raise AttributeError('passed is None; compute QC first') # Get mean passed of each check, or None if passed is None or all NaN results = {k: None if v is None or np.isnan(v).all() else np.nanmean(v) for k, v in self.passed.items()} session_outcome, outcomes = compute_session_status_from_dict(results, self.criteria) return session_outcome, results, outcomes
[docs] @staticmethod def compute_dataset_qc_status(outcomes): """Return map of dataset specific QC values. Parameters ---------- outcomes : dict Map of checks and their individual outcomes. Returns ------- dict Map of dataset names and their outcome. """ trials_table_outcomes = { 'intervals': outcomes.get('_task_iti_delays', spec.QC.NOT_SET), 'goCue_times': outcomes.get('_task_goCue_delays', spec.QC.NOT_SET), 'response_times': spec.QC.NOT_SET, 'choice': spec.QC.NOT_SET, 'stimOn_times': outcomes.get('_task_stimOn_delays', spec.QC.NOT_SET), 'contrastLeft': spec.QC.NOT_SET, 'contrastRight': spec.QC.NOT_SET, 'feedbackType': spec.QC.NOT_SET, 'probabilityLeft': spec.QC.NOT_SET, 'feedback_times': outcomes.get('_task_errorCue_delays', spec.QC.NOT_SET), 'firstMovement_times': spec.QC.NOT_SET } reward_checks = ('_task_reward_volumes', '_task_reward_volume_set') trials_table_outcomes['rewardVolume']: TaskQC.overall_outcome( (outcomes.get(x, spec.QC.NOT_SET) for x in reward_checks) ) dataset_outcomes = { '_ibl_trials.stimOff_times': outcomes.get('_task_stimOff_delays', spec.QC.NOT_SET), '_ibl_trials.table': trials_table_outcomes, } return dataset_outcomes
[docs] class HabituationQC(TaskQC): """Task QC for habituation choice world."""
[docs] def compute(self, **kwargs): """Compute and store the QC metrics. Runs the QC on the session and stores a map of the metrics for each datapoint for each test, and a map of which datapoints passed for each test. """ assert self.extractor is not None self.log.info(f'Session {self.session_path}: Running QC on habituation data...') # Initialize checks prefix = '_task_' data = self.extractor.data audio_output = self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown') metrics = {} passed = {} # Modify criteria based on version ver = self.extractor.settings.get('IBLRIG_VERSION', '') or '0.0.0' is_v8 = version.parse(ver) >= version.parse('8.0.0') self.criteria['_task_iti_delays'] = {'PASS': 0.99, 'WARNING': 0} if is_v8 else {'NOT_SET': 0} # Check all reward volumes == 3.0ul check = prefix + 'reward_volumes' metrics[check] = data['rewardVolume'] passed[check] = metrics[check] == 3.0 # Check session durations are increasing in steps >= 12 minutes check = prefix + 'habituation_time' if not self.one or not self.session_path: self.log.warning('unable to determine session trials without ONE') metrics[check] = passed[check] = None else: subject, session_date = self.session_path.parts[-3:-1] # compute from the date specified date_minus_week = ( datetime.strptime(session_date, '%Y-%m-%d') - timedelta(days=7) ).strftime('%Y-%m-%d') sessions = self.one.alyx.rest('sessions', 'list', subject=subject, date_range=[date_minus_week, session_date], task_protocol='habituation') # Remove the current session if already registered if sessions and sessions[0]['start_time'].startswith(session_date): sessions = sessions[1:] metric = ([0, data['intervals'][-1, 1] - data['intervals'][0, 0]] + [(datetime.fromisoformat(x['end_time']) - datetime.fromisoformat(x['start_time'])).total_seconds() / 60 for x in [self.one.alyx.get(s['url']) for s in sessions]]) # The duration from raw trial data # duration = map(float, self.extractor.raw_data[-1]['elapsed_time'].split(':')) # duration = timedelta(**dict(zip(('hours', 'minutes', 'seconds'), # duration))).total_seconds() / 60 metrics[check] = np.array(metric) passed[check] = np.diff(metric) >= 12 # Check event orders: trial_start < stim on < stim center < feedback < stim off check = prefix + 'trial_event_sequence' nans = ( np.isnan(data['intervals'][:, 0]) | # noqa np.isnan(data['stimOn_times']) | # noqa np.isnan(data['stimCenter_times']) | np.isnan(data['valveOpen_times']) | # noqa np.isnan(data['stimOff_times']) ) a = np.less(data['intervals'][:, 0], data['stimOn_times'], where=~nans) b = np.less(data['stimOn_times'], data['stimCenter_times'], where=~nans) c = np.less(data['stimCenter_times'], data['valveOpen_times'], where=~nans) d = np.less(data['valveOpen_times'], data['stimOff_times'], where=~nans) metrics[check] = a & b & c & d & ~nans passed[check] = metrics[check].astype(float) # Check that the time difference between the visual stimulus center-command being # triggered and the stimulus effectively appearing in the center is smaller than 150 ms. check = prefix + 'stimCenter_delays' metric = np.nan_to_num(data['stimCenter_times'] - data['stimCenterTrigger_times'], nan=np.inf) passed[check] = (metric <= 0.15) & (metric > 0) metrics[check] = metric # Phase check check = prefix + 'phase' metric = data['phase'] passed[check] = (metric <= 2 * np.pi) & (metric >= 0) metrics[check] = metric # This is not very useful as a check because there are so few trials check = prefix + 'phase_distribution' metric, _ = np.histogram(data['phase']) _, p = chisquare(metric) passed[check] = p < 0.05 if len(data['phase']) >= 400 else None # skip if too few trials metrics[check] = metric # Check that the period of gray screen between stim off and the start of the next trial is # 1s +/- 10%. check = prefix + 'iti_delays' iti = (np.roll(data['stimOn_times'], -1) - data['stimOff_times'])[:-1] metric = np.r_[np.nan_to_num(iti, nan=np.inf), np.nan] - 1. passed[check] = np.abs(metric) <= 0.1 passed[check][-1] = np.nan metrics[check] = metric # Checks common to training QC checks = [check_goCue_delays, check_stimOn_goCue_delays, check_stimOn_delays, check_stimOff_delays] for fcn in checks: check = prefix + fcn.__name__[6:] metrics[check], passed[check] = fcn(data, audio_output=audio_output) self.metrics, self.passed = (metrics, passed)
# SINGLE METRICS # ---------------------------------------------------------------------------- # # === Delays between events checks ===
[docs] def check_stimOn_goCue_delays(data, audio_output='harp', **_): """ Check the go cue tone occurs less than 10ms before stimulus on. Checks that the time difference between the onset of the visual stimulus and the onset of the go cue tone is positive and less than 10ms. Metric: M = stimOn_times - goCue_times Criteria: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('goCue_times', 'stimOn_times', 'intervals') :param audio_output: audio output device name. Notes ----- - For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ # Calculate the difference between stimOn and goCue times. # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. threshold = 0.01 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['goCue_times'] - data['stimOn_times'], nan=np.inf) passed = (metric < threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_response_feedback_delays(data, audio_output='harp', **_): """ Check the feedback delivered within 10ms of the response threshold. Checks that the time difference between the response and the feedback onset (error sound or valve) is positive and less than 10ms. Metric: M = feedback_time - response_time Criterion: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('feedback_times', 'response_times', 'intervals') :param audio_output: audio output device name. Notes ----- - For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.01 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['feedback_times'] - data['response_times'], nan=np.inf) passed = (metric < threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_response_stimFreeze_delays(data, **_): """ Check the stimulus freezes within 100ms of the expected time. Checks that the time difference between the visual stimulus freezing and the response is positive and less than 100ms. Metric: M = (stimFreeze_times - response_times) Criterion: 0 < M < 0.100 s Units: seconds [s] :param data: dict of trial data with keys ('stimFreeze_times', 'response_times', 'intervals', 'choice') """ # Calculate the difference between stimOn and goCue times. # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['stimFreeze_times'] - data['response_times'], nan=np.inf) # Test for valid values passed = ((metric < 0.1) & (metric > 0)).astype(float) # Finally remove no_go trials (stimFreeze triggered differently in no_go trials) # These values are ignored in calculation of proportion passed passed[data['choice'] == 0] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOff_itiIn_delays(data, **_): """Check that the start of the trial interval is within 10ms of the visual stimulus turning off. Metric: M = itiIn_times - stimOff_times Criterion: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'itiIn_times', 'intervals', 'choice') """ # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['itiIn_times'] - data['stimOff_times'], nan=np.inf) passed = ((metric < 0.01) & (metric >= 0)).astype(float) # Remove no_go trials (stimOff triggered differently in no_go trials) # NaN values are ignored in calculation of proportion passed metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_iti_delays(data, subtract_pauses=False, iti_delay_secs=ITI_DELAY_SECS, feedback_nogo_delay_secs=FEEDBACK_NOGO_DELAY_SECS, **_): """ Check the open-loop grey screen period is approximately 1 second. Check that the period of grey screen between stim off and the start of the next trial is 1s +/- 10%. If the trial was paused during this time, the check will account for that Metric: M = stimOff (n) - trialStart (n+1) - 1. Criterion: |M| < 0.1 Units: seconds [s] Parameters ---------- data : dict Trial data with keys ('stimOff_times', 'intervals', 'pause_duration'). subtract_pauses: bool If True, account for experimenter-initiated pauses between trials; if False, trials where the experimenter paused the task may fail this check. Returns ------- numpy.array An array of metric values to threshold. numpy.array An array of boolean values, 1 per trial, where True means trial passes QC threshold. """ metric = np.full(data['intervals'].shape[0], np.nan) passed = metric.copy() pauses = (data['pause_duration'] if subtract_pauses else np.zeros_like(metric))[:-1] # Get the difference between stim off and the start of the next trial # Missing data are set to Inf, except for the last trial which is a NaN metric[:-1] = np.nan_to_num( data['intervals'][1:, 0] - data['stimOff_times'][:-1] - iti_delay_secs - pauses, nan=np.inf ) metric[data['choice'] == 0] = metric[data['choice'] == 0] - feedback_nogo_delay_secs passed[:-1] = np.abs(metric[:-1]) < (iti_delay_secs / 10) # Last trial is not counted assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_positive_feedback_stimOff_delays(data, **_): """ Check stimulus offset occurs approximately 1 second after reward delivered. Check that the time difference between the valve onset and the visual stimulus turning off is 1 ± 0.150 seconds. Metric: M = stimOff_times - feedback_times - 1s Criterion: |M| < 0.150 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'feedback_times', 'intervals', 'correct') """ # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['stimOff_times'] - data['feedback_times'] - 1, nan=np.inf) passed = (np.abs(metric) < 0.15).astype(float) # NaN values are ignored in calculation of proportion passed; ignore incorrect trials here metric[~data['correct']] = passed[~data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_negative_feedback_stimOff_delays(data, feedback_nogo_delay_secs=FEEDBACK_NOGO_DELAY_SECS, **_): """ Check the stimulus offset occurs approximately 2 seconds after negative feedback delivery. Check that the time difference between the error sound and the visual stimulus turning off is 2 ± 0.150 seconds. Metric: M = stimOff_times - errorCue_times - 2s Criterion: |M| < 0.150 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'errorCue_times', 'intervals') """ metric = np.nan_to_num(data['stimOff_times'] - data['errorCue_times'] - 2, nan=np.inf) # for the nogo trials, the feedback is the same as the stimOff metric[data['choice'] == 0] = metric[data['choice'] == 0] + feedback_nogo_delay_secs # Apply criteria passed = (np.abs(metric) < 0.15).astype(float) # Remove none negative feedback trials metric[data['correct']] = passed[data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Wheel movement during trial checks ===
[docs] def check_wheel_move_before_feedback(data, **_): """Check that the wheel does move within 100ms of the feedback onset (error sound or valve). Metric: M = (w_t - 0.05) - (w_t + 0.05), where t = feedback_times Criterion: M != 0 Units: radians :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice', 'intervals', 'feedback_times') """ # Get tuple of wheel times and positions within 100ms of feedback traces = traces_by_trial( data['wheel_timestamps'], data['wheel_position'], start=data['feedback_times'] - 0.05, end=data['feedback_times'] + 0.05, ) metric = np.zeros_like(data['feedback_times']) # For each trial find the displacement for i, trial in enumerate(traces): pos = trial[1] if pos.size > 1: metric[i] = pos[-1] - pos[0] # except no-go trials metric[data['choice'] == 0] = np.nan # NaN = trial ignored for this check nans = np.isnan(metric) passed = np.zeros_like(metric) * np.nan passed[~nans] = (metric[~nans] != 0).astype(float) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
def _wheel_move_during_closed_loop(re_ts, re_pos, data, wheel_gain=None, tol=1, **_): """ Check the wheel moves the correct amount to reach threshold. Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees Criterion: displacement < tol visual degree Units: degrees angle of wheel turn :param re_ts: extracted wheel timestamps in seconds :param re_pos: extracted wheel positions in radians :param data: a dict with the keys (goCueTrigger_times, response_times, feedback_times, position, choice, intervals) :param wheel_gain: the 'STIM_GAIN' task setting :param tol: the criterion in visual degrees """ if wheel_gain is None: _log.warning('No wheel_gain input in function call, returning None') return None, None # Get tuple of wheel times and positions over each trial's closed-loop period traces = traces_by_trial(re_ts, re_pos, start=data['goCueTrigger_times'], end=data['response_times']) metric = np.zeros_like(data['feedback_times']) # For each trial find the absolute displacement for i, trial in enumerate(traces): t, pos = trial if pos.size != 0: # Find the position of the preceding sample and subtract it idx = np.abs(re_ts - t[0]).argmin() - 1 origin = re_pos[idx] metric[i] = np.abs(pos - origin).max() # Load wheel_gain and thresholds for each trial wheel_gain = np.array([wheel_gain] * len(data['position'])) thresh = data['position'] # abs displacement, s, in mm required to move 35 visual degrees s_mm = np.abs(thresh / wheel_gain) # don't care about direction criterion = cm_to_rad(s_mm * 1e-1) # convert abs displacement to radians (wheel pos is in rad) metric = metric - criterion # difference should be close to 0 rad_per_deg = cm_to_rad(1 / wheel_gain * 1e-1) passed = (np.abs(metric) < rad_per_deg * tol).astype(float) # less than 1 visual degree off metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan # except no-go trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_wheel_move_during_closed_loop(data, wheel_gain=None, **_): """ Check the wheel moves the correct amount to reach threshold. Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees Criterion: displacement < 3 visual degrees Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice', 'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position') :param wheel_gain: the 'STIM_GAIN' task setting """ # Get the Bpod extracted wheel data timestamps = data['wheel_timestamps'] position = data['wheel_position'] return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=3)
[docs] def check_wheel_move_during_closed_loop_bpod(data, wheel_gain=None, **_): """ Check the wheel moves the correct amount to reach threshold. Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. This check uses the Bpod wheel data (measured at a lower resolution) with a stricter tolerance (1 visual degree). Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees. Criterion: displacement < 1 visual degree Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps(_bpod)', 'wheel_position(_bpod)', 'choice', 'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position') :param wheel_gain: the 'STIM_GAIN' task setting """ # Get the Bpod extracted wheel data timestamps = data.get('wheel_timestamps_bpod', data['wheel_timestamps']) position = data.get('wheel_position_bpod', data['wheel_position']) return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=1)
[docs] def check_wheel_freeze_during_quiescence(data, **_): """ Check the wheel is indeed still during the quiescent period. Check that the wheel does not move more than 2 degrees in each direction during the quiescence interval before the stimulus appears. Metric: M = |max(W) - min(W)| where W is wheel pos over quiescence interval; interval = [stimOnTrigger_times - quiescent_duration, stimOnTrigger_times] Criterion: M < 2 degrees Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'quiescence', 'intervals', 'stimOnTrigger_times') """ assert np.all(np.diff(data['wheel_timestamps']) >= 0) assert data['quiescence'].size == data['stimOnTrigger_times'].size # Get tuple of wheel times and positions over each trial's quiescence period qevt_start_times = data['stimOnTrigger_times'] - data['quiescence'] traces = traces_by_trial( data['wheel_timestamps'], data['wheel_position'], start=qevt_start_times, end=data['stimOnTrigger_times'] ) metric = np.zeros((len(data['quiescence']), 2)) # (n_trials, n_directions) for i, trial in enumerate(traces): t, pos = trial # Get the last position before the period began if pos.size > 0: # Find the position of the preceding sample and subtract it idx = np.abs(data['wheel_timestamps'] - t[0]).argmin() - 1 origin = data['wheel_position'][idx if idx != -1 else 0] # Find the absolute min and max relative to the last sample metric[i, :] = np.abs([np.min(pos - origin), np.max(pos - origin)]) # Reduce to the largest displacement found in any direction metric = np.max(metric, axis=1) metric = 180 * metric / np.pi # convert to degrees from radians criterion = 2 # Position shouldn't change more than 2 in either direction passed = metric < criterion assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_detected_wheel_moves(data, min_qt=0, **_): """Check that the detected first movement times are reasonable. Metric: M = firstMovement times Criterion: (goCue trigger time - min quiescent period) < M < response time Units: Seconds [s] :param data: dict of trial data with keys ('firstMovement_times', 'goCueTrigger_times', 'response_times', 'choice', 'intervals') :param min_qt: the minimum possible quiescent period (the QUIESCENT_PERIOD task parameter) """ # Depending on task version this may be a single value or an array of quiescent periods min_qt = np.array(min_qt) if min_qt.size > data['intervals'].shape[0]: min_qt = min_qt[:data['intervals'].shape[0]] metric = data['firstMovement_times'] qevt_start = data['goCueTrigger_times'] - np.array(min_qt) response = data['response_times'] # First movement time for each trial should be after the quiescent period and before feedback passed = np.array([a < m < b for m, a, b in zip(metric, qevt_start, response)], dtype=float) nogo = data['choice'] == 0 passed[nogo] = np.nan # No go trial may have no movement times and that's fine return metric, passed
# === Sequence of events checks ===
[docs] def check_error_trial_event_sequence(data, **_): """ Check trial events occur in correct order for negative feedback trials. Check that on incorrect / miss trials, there are exactly: 2 audio events (go cue sound and error sound) and 2 Bpod events (trial start, ITI), occurring in the correct order Metric: M = Bpod (trial start) > audio (go cue) > audio (error) > Bpod (ITI) > Bpod (trial end) Criterion: M == True Units: -none- :param data: dict of trial data with keys ('errorCue_times', 'goCue_times', 'intervals', 'itiIn_times', 'correct') """ # An array the length of N trials where True means at least one event time was NaN (bad) nans = ( np.isnan(data['intervals'][:, 0]) | np.isnan(data['goCue_times']) | # noqa np.isnan(data['errorCue_times']) | # noqa np.isnan(data['itiIn_times']) | # noqa np.isnan(data['intervals'][:, 1]) ) # For each trial check that the events happened in the correct order (ignore NaN values) a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue b = np.less(data['goCue_times'], data['errorCue_times'], where=~nans) # Go cue < error cue c = np.less(data['errorCue_times'], data['itiIn_times'], where=~nans) # Error cue < ITI start d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time # For each trial check all events were in order AND all event times were not NaN metric = a & b & c & d & ~nans passed = metric.astype(float) passed[data['correct']] = np.nan # Look only at incorrect trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_correct_trial_event_sequence(data, **_): """ Check trial events occur in correct order for positive feedback trials. Check that on correct trials, there are exactly: 1 audio events and 3 Bpod events (valve open, trial start, ITI), occurring in the correct order Metric: M = Bpod (trial start) > audio (go cue) > Bpod (valve) > Bpod (ITI) > Bpod (trial end) Criterion: M == True Units: -none- :param data: dict of trial data with keys ('valveOpen_times', 'goCue_times', 'intervals', 'itiIn_times', 'correct') """ # An array the length of N trials where True means at least one event time was NaN (bad) nans = ( np.isnan(data['intervals'][:, 0]) | np.isnan(data['goCue_times']) | # noqa np.isnan(data['valveOpen_times']) | np.isnan(data['itiIn_times']) | # noqa np.isnan(data['intervals'][:, 1]) ) # For each trial check that the events happened in the correct order (ignore NaN values) a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue b = np.less(data['goCue_times'], data['valveOpen_times'], where=~nans) # Go cue < feedback c = np.less(data['valveOpen_times'], data['itiIn_times'], where=~nans) # Feedback < ITI start d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time # For each trial True means all events were in order AND all event times were not NaN metric = a & b & c & d & ~nans passed = metric.astype(float) passed[~data['correct']] = np.nan # Look only at correct trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_n_trial_events(data, **_): """Check that the number events per trial is correct. Within every trial interval there should be one of each trial event, except for goCueTrigger_times which should only be defined for incorrect trials Metric: M = all(start < event < end) for all event times except errorCueTrigger_times where start < error_trigger < end if not correct trial, else error_trigger == NaN Criterion: M == True Units: -none-, boolean :param data: dict of trial data with keys ('intervals', 'stimOnTrigger_times', 'stimOffTrigger_times', 'stimOn_times', 'stimOff_times', 'stimFreezeTrigger_times', 'errorCueTrigger_times', 'itiIn_times', 'goCueTrigger_times', 'goCue_times', 'response_times', 'feedback_times') """ intervals = data['intervals'] correct = data['correct'] err_trig = data['errorCueTrigger_times'] # Exclude these fields; valve and errorCue times are the same as feedback_times and we must # test errorCueTrigger_times separately # stimFreeze_times fails often due to TTL flicker exclude = ['camera_timestamps', 'errorCueTrigger_times', 'errorCue_times', 'wheelMoves_peakVelocity_times', 'valveOpen_times', 'wheelMoves_peakAmplitude', 'wheelMoves_intervals', 'wheel_timestamps', 'stimFreeze_times'] events = [k for k in data.keys() if k.endswith('_times') and k not in exclude] metric = np.zeros(data['intervals'].shape[0], dtype=bool) # For each trial interval check that one of each trial event occurred. For incorrect trials, # check the error cue trigger occurred within the interval, otherwise check it is nan. for i, (start, end) in enumerate(intervals): metric[i] = (all([start < data[k][i] < end for k in events]) and (np.isnan(err_trig[i]) if correct[i] else start < err_trig[i] < end)) passed = metric.astype(bool) assert intervals.shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_trial_length(data, **_): """ Check open-loop duration positive and <= 1 minute. Check that the time difference between the onset of the go cue sound and the feedback (error sound or valve) is positive and smaller than 60.1 s. Metric: M = feedback_times - goCue_times Criteria: 0 < M < 60.1 s Units: seconds [s] :param data: dict of trial data with keys ('feedback_times', 'goCue_times', 'intervals') """ # NaN values are usually ignored so replace them with Inf so they fail the threshold metric = np.nan_to_num(data['feedback_times'] - data['goCue_times'], nan=np.inf) passed = (metric < 60.1) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Trigger-response delay checks ===
[docs] def check_goCue_delays(data, audio_output='harp', **_): """ Check the go cue tone occurs within 1ms of the intended time. Check that the time difference between the go cue sound being triggered and effectively played is smaller than 1ms. Metric: M = goCue_times - goCueTrigger_times Criterion: 0 < M <= 0.0015 s Units: seconds [s] :param data: dict of trial data with keys ('goCue_times', 'goCueTrigger_times', 'intervals'). :param audio_output: audio output device name. Notes ----- - For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.0015 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['goCue_times'] - data['goCueTrigger_times'], nan=np.inf) passed = (metric <= threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_errorCue_delays(data, audio_output='harp', **_): """ Check the error tone occurs within 1.5ms of the intended time. Check that the time difference between the error sound being triggered and effectively played is smaller than 1ms. Metric: M = errorCue_times - errorCueTrigger_times Criterion: 0 < M <= 0.0015 s Units: seconds [s] :param data: dict of trial data with keys ('errorCue_times', 'errorCueTrigger_times', 'intervals', 'correct') :param audio_output: audio output device name. Notes ----- - For non-harp sound card the permissible delay is 0.062s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.0015 if audio_output.lower() == 'harp' else 0.062 metric = np.nan_to_num(data['errorCue_times'] - data['errorCueTrigger_times'], nan=np.inf) passed = ((metric <= threshold) & (metric > 0)).astype(float) passed[data['correct']] = metric[data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOn_delays(data, **_): """ Check the visual stimulus onset occurs within 150ms of the intended time. Check that the time difference between the visual stimulus onset-command being triggered and the stimulus effectively appearing on the screen is smaller than 150 ms. Metric: M = stimOn_times - stimOnTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimOn_times', 'stimOnTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimOn_times'] - data['stimOnTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOff_delays(data, **_): """ Check stimulus offset occurs within 150ms of the intended time. Check that the time difference between the visual stimulus offset-command being triggered and the visual stimulus effectively turning off on the screen is smaller than 150 ms. Metric: M = stimOff_times - stimOffTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'stimOffTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimOff_times'] - data['stimOffTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimFreeze_delays(data, **_): """Check the stimulus freezes within 150ms of the intended time. Check that the time difference between the visual stimulus freeze-command being triggered and the visual stimulus effectively freezing on the screen is smaller than 150 ms. Metric: M = stimFreeze_times - stimFreezeTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimFreeze_times', 'stimFreezeTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimFreeze_times'] - data['stimFreezeTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Data integrity checks ===
[docs] def check_reward_volumes(data, **_): """Check that the reward volume is between 1.5 and 3 uL for correct trials, 0 for incorrect. Metric: M = reward volume Criterion: 1.5 <= M <= 3 if correct else M == 0 Units: uL :param data: dict of trial data with keys ('rewardVolume', 'correct', 'intervals') """ metric = data['rewardVolume'] correct = data['correct'] passed = np.zeros_like(metric, dtype=bool) # Check correct trials within correct range passed[correct] = (1.5 <= metric[correct]) & (metric[correct] <= 3.) # Check incorrect trials are 0 passed[~correct] = metric[~correct] == 0 assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_reward_volume_set(data, **_): """Check that there is only two reward volumes within a session, one of which is 0. Metric: M = set(rewardVolume) Criterion: (0 < len(M) <= 2) and 0 in M :param data: dict of trial data with keys ('rewardVolume') """ metric = data['rewardVolume'] passed = 0 < len(set(metric)) <= 2 and 0. in metric return metric, passed
[docs] def check_wheel_integrity(data, re_encoding='X1', enc_res=None, **_): """ Check wheel position sampled at the expected resolution. Check that the difference between wheel position samples is close to the encoder resolution and that the wheel timestamps strictly increase. Metric: M = (absolute difference of the positions < 1.5 * encoder resolution) + 1 if (difference of timestamps <= 0) else 0 Criterion: M ~= 0 Units: arbitrary (radians, sometimes + 1) :param data: dict of wheel data with keys ('wheel_timestamps', 'wheel_position') :param re_encoding: the encoding of the wheel data, X1, X2 or X4 :param enc_res: the rotary encoder resolution (default 1024 ticks per revolution) Notes ----- - At high velocities some samples are missed due to the scanning frequency of the DAQ. This checks for more than 1 missing sample in a row (i.e. the difference between samples >= 2) """ if isinstance(re_encoding, str): re_encoding = int(re_encoding[-1]) # The expected difference between samples in the extracted units resolution = 1 / (enc_res or ephys_fpga.WHEEL_TICKS ) * np.pi * 2 * ephys_fpga.WHEEL_RADIUS_CM / re_encoding # We expect the difference of neighbouring positions to be close to the resolution pos_check = np.abs(np.diff(data['wheel_position'])) # Timestamps should be strictly increasing ts_check = np.diff(data['wheel_timestamps']) <= 0. metric = pos_check + ts_check.astype(float) # all values should be close to zero passed = metric < 1.5 * resolution return metric, passed
# === Pre-stimulus checks ===
[docs] def check_stimulus_move_before_goCue(data, photodiode=None, **_): """ Check there are no stimulus events before the go cue tone. Check that there are no visual stimulus change(s) between the start of the trial and the go cue sound onset, except for stim on. Metric: M = number of visual stimulus change events between trial start and goCue_times Criterion: M == 1 Units: -none-, integer Parameters ---------- data : dict Trial data with keys ('goCue_times', 'intervals', 'choice'). photodiode : dict The fronts from Bpod's BNC1 input or FPGA frame2ttl channel. Returns ------- numpy.array An array of metric values to threshold. numpy.array An array of boolean values, 1 per trial, where True means trial passes QC threshold. Notes ----- - There should be exactly 1 stimulus change before goCue; stimulus onset. Even if the stimulus contrast is 0, the sync square will still flip at stimulus onset, etc. - If there are no goCue times (all are NaN), the status should be NOT_SET. """ if photodiode is None: _log.warning('No photodiode TTL input in function call, returning None') return None photodiode_clean = ephys_fpga._clean_frame2ttl(photodiode) s = photodiode_clean['times'] s = s[~np.isnan(s)] # Remove NaNs metric = np.array([]) for i, c in zip(data['intervals'][:, 0], data['goCue_times']): metric = np.append(metric, np.count_nonzero(s[s > i] < c)) passed = (metric == 1).astype(float) passed[np.isnan(data['goCue_times'])] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_audio_pre_trial(data, audio=None, **_): """ Check no audio stimuli before the go cue. Check there are no audio outputs between the start of the trial and the go cue sound onset - 20 ms. Metric: M = sum(start_times < audio TTL < (goCue_times - 20ms)) Criterion: M == 0 Units: -none-, integer Parameters ---------- data : dict Trial data with keys ('goCue_times', 'intervals'). audio : dict The fronts from Bpod's BNC2 input FPGA audio sync channel. Returns ------- numpy.array An array of metric values to threshold. numpy.array An array of boolean values, 1 per trial, where True means trial passes QC threshold. """ if audio is None: _log.warning('No BNC2 input in function call, retuning None') return None, None s = audio['times'][~np.isnan(audio['times'])] # Audio TTLs with NaNs removed metric = np.array([], dtype=np.int8) for i, c in zip(data['intervals'][:, 0], data['goCue_times']): metric = np.append(metric, sum(s[s > i] < (c - 0.02))) passed = metric == 0 assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed