Source code for ibllib.qc.task_metrics

"""Behaviour QC.

This module runs a list of quality control metrics on the behaviour data.

Examples
--------
Running on a rig computer and updating QC fields in Alyx:

>>> from ibllib.qc.task_metrics import TaskQC
>>> TaskQC('path/to/session').run(update=True)

Downloading the required data and inspecting the QC on a different computer:

>>> from ibllib.qc.task_metrics import TaskQC
>>> qc = TaskQC(eid)
>>> outcome, results = qc.run()

Inspecting individual test outcomes

>>> from ibllib.qc.task_metrics import TaskQC
>>> qc = TaskQC(eid)
>>> outcome, results, outcomes = qc.compute().compute_session_status()

Running bpod QC on ephys session

>>> from ibllib.qc.task_metrics import TaskQC
>>> qc = TaskQC(eid)
>>> qc.load_data(bpod_only=True)  # Extract without FPGA
>>> bpod_qc = qc.run()

Running bpod QC only, from training rig PC

>>> from ibllib.qc.task_metrics import TaskQC
>>> from ibllib.qc.qcplots import plot_results
>>> session_path = r'/home/nico/Downloads/FlatIron/mrsicflogellab/Subjects/SWC_023/2020-02-14/001'
>>> qc = TaskQC(session_path)
>>> qc.load_data(bpod_only=True, download_data=False)  # Extract without FPGA
>>> qc.run()
>>> plot_results(qc, save_path=session_path)

Running ephys QC, from local server PC (after ephys + bpod data have been copied to a same folder)

>>> from ibllib.qc.task_metrics import TaskQC
>>> from ibllib.qc.qcplots import plot_results
>>> session_path = r'/home/nico/Downloads/FlatIron/mrsicflogellab/Subjects/SWC_023/2020-02-14/001'
>>> qc = TaskQC(session_path)
>>> qc.run()
>>> plot_results(qc, save_path=session_path)
"""
import logging
import sys
from datetime import datetime, timedelta
from inspect import getmembers, isfunction
from functools import reduce
from collections.abc import Sized

import numpy as np
from packaging import version
from scipy.stats import chisquare

from brainbox.behavior.wheel import cm_to_rad, traces_by_trial
from ibllib.qc.task_extractors import TaskQCExtractor
from ibllib.io.extractors import ephys_fpga
from one.alf.spec import is_session_path
from . import base

_log = logging.getLogger(__name__)


[docs] class TaskQC(base.QC): """A class for computing task QC metrics""" criteria = dict() criteria['default'] = {'PASS': 0.99, 'WARNING': 0.90, 'FAIL': 0} # Note: WARNING was 0.95 prior to Aug 2022 criteria['_task_stimOff_itiIn_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_positive_feedback_stimOff_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_negative_feedback_stimOff_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_wheel_move_during_closed_loop'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_response_stimFreeze_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_detected_wheel_moves'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_trial_length'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_goCue_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_errorCue_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_stimOn_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_stimOff_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_stimFreeze_delays'] = {'PASS': 0.99, 'WARNING': 0} criteria['_task_iti_delays'] = {'NOT_SET': 0} criteria['_task_passed_trial_checks'] = {'NOT_SET': 0} extractor = None """ibllib.qc.task_extractors.TaskQCExtractor: A task extractor object containing raw and extracted data.""" @staticmethod def _thresholding(qc_value, thresholds=None): """ Computes the outcome of a single key by applying thresholding. :param qc_value: proportion of passing qcs, between 0 and 1 :param thresholds: dictionary with keys 'PASS', 'WARNING', 'FAIL' (cf. TaskQC.criteria attribute) :return: int where -1: NOT_SET, 0: PASS, 1: WARNING, 2: FAIL """ MAX_BOUND, MIN_BOUND = (1, 0) if not thresholds: thresholds = TaskQC.criteria['default'].copy() if qc_value is None or np.isnan(qc_value): return int(-1) elif (qc_value > MAX_BOUND) or (qc_value < MIN_BOUND): raise ValueError('Values out of bound') if 'PASS' in thresholds.keys() and qc_value >= thresholds['PASS']: return 0 if 'WARNING' in thresholds.keys() and qc_value >= thresholds['WARNING']: return 1 if 'FAIL' in thresholds and qc_value >= thresholds['FAIL']: return 2 if 'NOT_SET' in thresholds and qc_value >= thresholds['NOT_SET']: return -1 # if None of this applies, return 'NOT_SET' return -1 def __init__(self, session_path_or_eid, **kwargs): """ :param session_path_or_eid: A session eid or path :param log: A logging.Logger instance, if None the 'ibllib' logger is used :param one: An ONE instance for fetching and setting the QC on Alyx """ # When an eid is provided, we will download the required data by default (if necessary) self.download_data = not is_session_path(session_path_or_eid) super().__init__(session_path_or_eid, **kwargs) # Data self.extractor = None # Metrics and passed trials self.metrics = None self.passed = None
[docs] def load_data(self, bpod_only=False, download_data=True): """Extract the data from raw data files. Extracts all the required task data from the raw data files. Parameters ---------- bpod_only : bool If True no data is extracted from the FPGA for ephys sessions. download_data : bool If True, any missing raw data is downloaded via ONE. By default data are not downloaded if a session path was provided to the constructor. """ self.extractor = TaskQCExtractor( self.session_path, one=self.one, download_data=download_data, bpod_only=bpod_only)
[docs] def compute(self, **kwargs): """Compute and store the QC metrics. Runs the QC on the session and stores a map of the metrics for each datapoint for each test, and a map of which datapoints passed for each test. Parameters ---------- bpod_only : bool If True no data is extracted from the FPGA for ephys sessions. download_data : bool If True, any missing raw data is downloaded via ONE. By default data are not downloaded if a session path was provided to the constructor. """ if self.extractor is None: kwargs['download_data'] = kwargs.pop('download_data', self.download_data) self.load_data(**kwargs) ver = self.extractor.settings.get('IBLRIG_VERSION', '') or '0.0.0' if version.parse(ver) >= version.parse('8.0.0'): self.criteria['_task_iti_delays'] = {'PASS': 0.99, 'WARNING': 0} self.criteria['_task_passed_trial_checks'] = {'PASS': 0.7, 'WARNING': 0} else: self.criteria['_task_iti_delays'] = {'NOT_SET': 0} self.criteria['_task_passed_trial_checks'] = {'NOT_SET': 0} self.log.info(f'Session {self.session_path}: Running QC on behavior data...') self.get_bpodqc_metrics_frame( self.extractor.data, wheel_gain=self.extractor.settings['STIM_GAIN'], # The wheel gain photodiode=self.extractor.frame_ttls, audio=self.extractor.audio_ttls, re_encoding=self.extractor.wheel_encoding or 'X1', min_qt=self.extractor.settings.get('QUIESCENT_PERIOD') or 0.2, audio_output=self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown') )
def _get_checks(self): """ Find all methods that begin with 'check_'. Returns ------- Dict[str, function] A map of QC check function names and the corresponding functions that return `metric` (any), `passed` (bool). """ def is_metric(x): return isfunction(x) and x.__name__.startswith('check_') return dict(getmembers(sys.modules[__name__], is_metric))
[docs] def get_bpodqc_metrics_frame(self, data, **kwargs): """ Evaluates all the QC metric functions in this module (those starting with 'check') and returns the results. The optional kwargs listed below are passed to each QC metric function. :param data: dict of extracted task data :param re_encoding: the encoding of the wheel data, X1, X2 or X4 :param enc_res: the rotary encoder resolution :param wheel_gain: the STIM_GAIN task parameter :param photodiode: the fronts from Bpod's BNC1 input or FPGA frame2ttl channel :param audio: the fronts from Bpod's BNC2 input FPGA audio sync channel :param min_qt: the QUIESCENT_PERIOD task parameter :return metrics: dict of checks and their QC metrics :return passed: dict of checks and a float array of which samples passed """ # Find all methods that begin with 'check_' checks = self._get_checks() prefix = '_task_' # Extended QC fields will start with this # Method 'check_foobar' stored with key '_task_foobar' in metrics map qc_metrics_map = {prefix + k[6:]: fn(data, **kwargs) for k, fn in checks.items()} # Split metrics and passed frames self.metrics = {} self.passed = {} for k in qc_metrics_map: self.metrics[k], self.passed[k] = qc_metrics_map[k] # Add a check for trial level pass: did a given trial pass all checks? n_trials = data['intervals'].shape[0] # Trial-level checks return an array the length that equals the number of trials trial_level_passed = [m for m in self.passed.values() if isinstance(m, Sized) and len(m) == n_trials] name = prefix + 'passed_trial_checks' self.metrics[name] = reduce(np.logical_and, trial_level_passed or (None, None)) self.passed[name] = self.metrics[name].astype(float) if trial_level_passed else None
[docs] def run(self, update=False, namespace='task', **kwargs): """ Compute the QC outcomes and return overall task QC outcome. Parameters ---------- update : bool If True, updates the session QC fields on Alyx. namespace : str The namespace of the QC fields in the Alyx JSON field. bpod_only : bool If True no data is extracted from the FPGA for ephys sessions. download_data : bool If True, any missing raw data is downloaded via ONE. By default data are not downloaded if a session path was provided to the constructor. Returns ------- str Overall task QC outcome. dict A map of QC tests and the proportion of data points that passed them. """ if self.metrics is None: self.compute(**kwargs) outcome, results, _ = self.compute_session_status() if update: self.update_extended_qc(results) self.update(outcome, namespace) return outcome, results
[docs] @staticmethod def compute_session_status_from_dict(results, criteria=None): """ Given a dictionary of results, computes the overall session QC for each key and aggregates in a single value Parameters ---------- results : dict A dictionary of QC keys containing (usually scalar) values. criteria : dict A dictionary of qc keys containing map of PASS, WARNING, FAIL thresholds. Returns ------- str Overall session QC outcome as a string. dict A map of QC tests and their outcomes. """ indices = np.zeros(len(results), dtype=int) criteria = criteria or TaskQC.criteria for i, k in enumerate(results): if k in criteria.keys(): indices[i] = TaskQC._thresholding(results[k], thresholds=criteria[k]) else: indices[i] = TaskQC._thresholding(results[k], thresholds=criteria['default']) def key_map(x): return 'NOT_SET' if x < 0 else list(TaskQC.criteria['default'].keys())[x] # Criteria map is in order of severity so the max index is our overall QC outcome session_outcome = key_map(max(indices)) outcomes = dict(zip(results.keys(), map(key_map, indices))) return session_outcome, outcomes
[docs] def compute_session_status(self): """ Computes the overall session QC for each key and aggregates in a single value. Returns ------- str Overall session QC outcome. dict A map of QC tests and the proportion of data points that passed them. dict A map of QC tests and their outcomes. """ if self.passed is None: raise AttributeError('passed is None; compute QC first') # Get mean passed of each check, or None if passed is None or all NaN results = {k: None if v is None or np.isnan(v).all() else np.nanmean(v) for k, v in self.passed.items()} session_outcome, outcomes = self.compute_session_status_from_dict(results, self.criteria) return session_outcome, results, outcomes
[docs] class HabituationQC(TaskQC):
[docs] def compute(self, download_data=None, **kwargs): """Compute and store the QC metrics. Runs the QC on the session and stores a map of the metrics for each datapoint for each test, and a map of which datapoints passed for each test. :return: """ if self.extractor is None: # If download_data is None, decide based on whether eid or session path was provided ensure_data = self.download_data if download_data is None else download_data self.load_data(download_data=ensure_data, **kwargs) self.log.info(f'Session {self.session_path}: Running QC on habituation data...') # Initialize checks prefix = '_task_' data = self.extractor.data audio_output = self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown') metrics = {} passed = {} # Check all reward volumes == 3.0ul check = prefix + 'reward_volumes' metrics[check] = data['rewardVolume'] passed[check] = metrics[check] == 3.0 # Check session durations are increasing in steps >= 12 minutes check = prefix + 'habituation_time' if not self.one or not self.session_path: self.log.warning('unable to determine session trials without ONE') metrics[check] = passed[check] = None else: subject, session_date = self.session_path.parts[-3:-1] # compute from the date specified date_minus_week = ( datetime.strptime(session_date, '%Y-%m-%d') - timedelta(days=7) ).strftime('%Y-%m-%d') sessions = self.one.alyx.rest('sessions', 'list', subject=subject, date_range=[date_minus_week, session_date], task_protocol='habituation') # Remove the current session if already registered if sessions and sessions[0]['start_time'].startswith(session_date): sessions = sessions[1:] metric = ([0, data['intervals'][-1, 1] - data['intervals'][0, 0]] + [(datetime.fromisoformat(x['end_time']) - datetime.fromisoformat(x['start_time'])).total_seconds() / 60 for x in [self.one.alyx.get(s['url']) for s in sessions]]) # The duration from raw trial data # duration = map(float, self.extractor.raw_data[-1]['elapsed_time'].split(':')) # duration = timedelta(**dict(zip(('hours', 'minutes', 'seconds'), # duration))).total_seconds() / 60 metrics[check] = np.array(metric) passed[check] = np.diff(metric) >= 12 # Check event orders: trial_start < stim on < stim center < feedback < stim off check = prefix + 'trial_event_sequence' nans = ( np.isnan(data['intervals'][:, 0]) | # noqa np.isnan(data['stimOn_times']) | # noqa np.isnan(data['stimCenter_times']) | np.isnan(data['valveOpen_times']) | # noqa np.isnan(data['stimOff_times']) ) a = np.less(data['intervals'][:, 0], data['stimOn_times'], where=~nans) b = np.less(data['stimOn_times'], data['stimCenter_times'], where=~nans) c = np.less(data['stimCenter_times'], data['valveOpen_times'], where=~nans) d = np.less(data['valveOpen_times'], data['stimOff_times'], where=~nans) metrics[check] = a & b & c & d & ~nans passed[check] = metrics[check].astype(float) # Check that the time difference between the visual stimulus center-command being # triggered and the stimulus effectively appearing in the center is smaller than 150 ms. check = prefix + 'stimCenter_delays' metric = np.nan_to_num(data['stimCenter_times'] - data['stimCenterTrigger_times'], nan=np.inf) passed[check] = (metric <= 0.15) & (metric > 0) metrics[check] = metric # Phase check check = prefix + 'phase' metric = data['phase'] passed[check] = (metric <= 2 * np.pi) & (metric >= 0) metrics[check] = metric # This is not very useful as a check because there are so few trials check = prefix + 'phase_distribution' metric, _ = np.histogram(data['phase']) _, p = chisquare(metric) passed[check] = p < 0.05 if len(data['phase']) >= 400 else None # skip if too few trials metrics[check] = metric # Checks common to training QC checks = [check_goCue_delays, check_stimOn_goCue_delays, check_stimOn_delays, check_stimOff_delays] for fcn in checks: check = prefix + fcn.__name__[6:] metrics[check], passed[check] = fcn(data, audio_output=audio_output) self.metrics, self.passed = (metrics, passed)
# SINGLE METRICS # ---------------------------------------------------------------------------- # # === Delays between events checks ===
[docs] def check_stimOn_goCue_delays(data, audio_output='harp', **_): """ Checks that the time difference between the onset of the visual stimulus and the onset of the go cue tone is positive and less than 10ms. Metric: M = stimOn_times - goCue_times Criteria: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('goCue_times', 'stimOn_times', 'intervals') :param audio_output: audio output device name. Notes ----- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ # Calculate the difference between stimOn and goCue times. # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. threshold = 0.01 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['goCue_times'] - data['stimOn_times'], nan=np.inf) passed = (metric < threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_response_feedback_delays(data, audio_output='harp', **_): """ Checks that the time difference between the response and the feedback onset (error sound or valve) is positive and less than 10ms. Metric: M = feedback_time - response_time Criterion: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('feedback_times', 'response_times', 'intervals') :param audio_output: audio output device name. Notes ----- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.01 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['feedback_times'] - data['response_times'], nan=np.inf) passed = (metric < threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_response_stimFreeze_delays(data, **_): """ Checks that the time difference between the visual stimulus freezing and the response is positive and less than 100ms. Metric: M = (stimFreeze_times - response_times) Criterion: 0 < M < 0.100 s Units: seconds [s] :param data: dict of trial data with keys ('stimFreeze_times', 'response_times', 'intervals', 'choice') """ # Calculate the difference between stimOn and goCue times. # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['stimFreeze_times'] - data['response_times'], nan=np.inf) # Test for valid values passed = ((metric < 0.1) & (metric > 0)).astype(float) # Finally remove no_go trials (stimFreeze triggered differently in no_go trials) # These values are ignored in calculation of proportion passed passed[data['choice'] == 0] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOff_itiIn_delays(data, **_): """ Check that the start of the trial interval is within 10ms of the visual stimulus turning off. Metric: M = itiIn_times - stimOff_times Criterion: 0 < M < 0.010 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'itiIn_times', 'intervals', 'choice') """ # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['itiIn_times'] - data['stimOff_times'], nan=np.inf) passed = ((metric < 0.01) & (metric >= 0)).astype(float) # Remove no_go trials (stimOff triggered differently in no_go trials) # NaN values are ignored in calculation of proportion passed metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_iti_delays(data, **_): """ Check that the period of gray screen between stim off and the start of the next trial is 0.5s +/- 200%. Metric: M = stimOff (n) - trialStart (n+1) - 0.5 Criterion: |M| < 1 Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'intervals') """ # Initialize array the length of completed trials metric = np.full(data['intervals'].shape[0], np.nan) passed = metric.copy() # Get the difference between stim off and the start of the next trial # Missing data are set to Inf, except for the last trial which is a NaN metric[:-1] = \ np.nan_to_num(data['intervals'][1:, 0] - data['stimOff_times'][:-1] - 0.5, nan=np.inf) passed[:-1] = np.abs(metric[:-1]) < .5 # Last trial is not counted assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_positive_feedback_stimOff_delays(data, **_): """ Check that the time difference between the valve onset and the visual stimulus turning off is 1 ± 0.150 seconds. Metric: M = stimOff_times - feedback_times - 1s Criterion: |M| < 0.150 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'feedback_times', 'intervals', 'correct') """ # If either are NaN, the result will be Inf to ensure that it crosses the failure threshold. metric = np.nan_to_num(data['stimOff_times'] - data['feedback_times'] - 1, nan=np.inf) passed = (np.abs(metric) < 0.15).astype(float) # NaN values are ignored in calculation of proportion passed; ignore incorrect trials here metric[~data['correct']] = passed[~data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_negative_feedback_stimOff_delays(data, **_): """ Check that the time difference between the error sound and the visual stimulus turning off is 2 ± 0.150 seconds. Metric: M = stimOff_times - errorCue_times - 2s Criterion: |M| < 0.150 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'errorCue_times', 'intervals') """ metric = np.nan_to_num(data['stimOff_times'] - data['errorCue_times'] - 2, nan=np.inf) # Apply criteria passed = (np.abs(metric) < 0.15).astype(float) # Remove none negative feedback trials metric[data['correct']] = passed[data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Wheel movement during trial checks ===
[docs] def check_wheel_move_before_feedback(data, **_): """ Check that the wheel does move within 100ms of the feedback onset (error sound or valve). Metric: M = (w_t - 0.05) - (w_t + 0.05), where t = feedback_times Criterion: M != 0 Units: radians :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice', 'intervals', 'feedback_times') """ # Get tuple of wheel times and positions within 100ms of feedback traces = traces_by_trial( data['wheel_timestamps'], data['wheel_position'], start=data['feedback_times'] - 0.05, end=data['feedback_times'] + 0.05, ) metric = np.zeros_like(data['feedback_times']) # For each trial find the displacement for i, trial in enumerate(traces): pos = trial[1] if pos.size > 1: metric[i] = pos[-1] - pos[0] # except no-go trials metric[data['choice'] == 0] = np.nan # NaN = trial ignored for this check nans = np.isnan(metric) passed = np.zeros_like(metric) * np.nan passed[~nans] = (metric[~nans] != 0).astype(float) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
def _wheel_move_during_closed_loop(re_ts, re_pos, data, wheel_gain=None, tol=1, **_): """ Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees Criterion: displacement < tol visual degree Units: degrees angle of wheel turn :param re_ts: extracted wheel timestamps in seconds :param re_pos: extracted wheel positions in radians :param data: a dict with the keys (goCueTrigger_times, response_times, feedback_times, position, choice, intervals) :param wheel_gain: the 'STIM_GAIN' task setting :param tol: the criterion in visual degrees """ if wheel_gain is None: _log.warning('No wheel_gain input in function call, returning None') return None, None # Get tuple of wheel times and positions over each trial's closed-loop period traces = traces_by_trial(re_ts, re_pos, start=data['goCueTrigger_times'], end=data['response_times']) metric = np.zeros_like(data['feedback_times']) # For each trial find the absolute displacement for i, trial in enumerate(traces): t, pos = trial if pos.size != 0: # Find the position of the preceding sample and subtract it idx = np.abs(re_ts - t[0]).argmin() - 1 origin = re_pos[idx] metric[i] = np.abs(pos - origin).max() # Load wheel_gain and thresholds for each trial wheel_gain = np.array([wheel_gain] * len(data['position'])) thresh = data['position'] # abs displacement, s, in mm required to move 35 visual degrees s_mm = np.abs(thresh / wheel_gain) # don't care about direction criterion = cm_to_rad(s_mm * 1e-1) # convert abs displacement to radians (wheel pos is in rad) metric = metric - criterion # difference should be close to 0 rad_per_deg = cm_to_rad(1 / wheel_gain * 1e-1) passed = (np.abs(metric) < rad_per_deg * tol).astype(float) # less than 1 visual degree off metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan # except no-go trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_wheel_move_during_closed_loop(data, wheel_gain=None, **_): """ Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees Criterion: displacement < 3 visual degrees Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice', 'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position') :param wheel_gain: the 'STIM_GAIN' task setting """ # Get the Bpod extracted wheel data timestamps = data['wheel_timestamps'] position = data['wheel_position'] return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=3)
[docs] def check_wheel_move_during_closed_loop_bpod(data, wheel_gain=None, **_): """ Check that the wheel moves by approximately 35 degrees during the closed-loop period on trials where a feedback (error sound or valve) is delivered. This check uses the Bpod wheel data (measured at a lower resolution) with a stricter tolerance (1 visual degree). Metric: M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response time, w_t0 = position at go cue time, threshold_displacement = displacement required to move 35 visual degrees Criterion: displacement < 1 visual degree Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps(_bpod)', 'wheel_position(_bpod)', 'choice', 'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position') :param wheel_gain: the 'STIM_GAIN' task setting """ # Get the Bpod extracted wheel data timestamps = data.get('wheel_timestamps_bpod', data['wheel_timestamps']) position = data.get('wheel_position_bpod', data['wheel_position']) return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=1)
[docs] def check_wheel_freeze_during_quiescence(data, **_): """ Check that the wheel does not move more than 2 degrees in each direction during the quiescence interval before the stimulus appears. Metric: M = |max(W) - min(W)| where W is wheel pos over quiescence interval interval = [stimOnTrigger_times - quiescent_duration, stimOnTrigger_times] Criterion: M < 2 degrees Units: degrees angle of wheel turn :param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'quiescence', 'intervals', 'stimOnTrigger_times') """ assert np.all(np.diff(data['wheel_timestamps']) >= 0) assert data['quiescence'].size == data['stimOnTrigger_times'].size # Get tuple of wheel times and positions over each trial's quiescence period qevt_start_times = data['stimOnTrigger_times'] - data['quiescence'] traces = traces_by_trial( data['wheel_timestamps'], data['wheel_position'], start=qevt_start_times, end=data['stimOnTrigger_times'] ) metric = np.zeros((len(data['quiescence']), 2)) # (n_trials, n_directions) for i, trial in enumerate(traces): t, pos = trial # Get the last position before the period began if pos.size > 0: # Find the position of the preceding sample and subtract it idx = np.abs(data['wheel_timestamps'] - t[0]).argmin() - 1 origin = data['wheel_position'][idx if idx != -1 else 0] # Find the absolute min and max relative to the last sample metric[i, :] = np.abs([np.min(pos - origin), np.max(pos - origin)]) # Reduce to the largest displacement found in any direction metric = np.max(metric, axis=1) metric = 180 * metric / np.pi # convert to degrees from radians criterion = 2 # Position shouldn't change more than 2 in either direction passed = metric < criterion assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_detected_wheel_moves(data, min_qt=0, **_): """ Check that the detected first movement times are reasonable. Metric: M = firstMovement times Criterion: (goCue trigger time - min quiescent period) < M < response time Units: Seconds [s] :param data: dict of trial data with keys ('firstMovement_times', 'goCueTrigger_times', 'response_times', 'choice', 'intervals') :param min_qt: the minimum possible quiescent period (the QUIESCENT_PERIOD task parameter) """ # Depending on task version this may be a single value or an array of quiescent periods min_qt = np.array(min_qt) if min_qt.size > data['intervals'].shape[0]: min_qt = min_qt[:data['intervals'].shape[0]] metric = data['firstMovement_times'] qevt_start = data['goCueTrigger_times'] - np.array(min_qt) response = data['response_times'] # First movement time for each trial should be after the quiescent period and before feedback passed = np.array([a < m < b for m, a, b in zip(metric, qevt_start, response)], dtype=float) nogo = data['choice'] == 0 passed[nogo] = np.nan # No go trial may have no movement times and that's fine return metric, passed
# === Sequence of events checks ===
[docs] def check_error_trial_event_sequence(data, **_): """ Check that on incorrect / miss trials, there are exactly: 2 audio events (go cue sound and error sound) and 2 Bpod events (trial start, ITI), occurring in the correct order Metric: M = Bpod (trial start) > audio (go cue) > audio (error) > Bpod (ITI) > Bpod (trial end) Criterion: M == True Units: -none- :param data: dict of trial data with keys ('errorCue_times', 'goCue_times', 'intervals', 'itiIn_times', 'correct') """ # An array the length of N trials where True means at least one event time was NaN (bad) nans = ( np.isnan(data['intervals'][:, 0]) | np.isnan(data['goCue_times']) | # noqa np.isnan(data['errorCue_times']) | # noqa np.isnan(data['itiIn_times']) | # noqa np.isnan(data['intervals'][:, 1]) ) # For each trial check that the events happened in the correct order (ignore NaN values) a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue b = np.less(data['goCue_times'], data['errorCue_times'], where=~nans) # Go cue < error cue c = np.less(data['errorCue_times'], data['itiIn_times'], where=~nans) # Error cue < ITI start d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time # For each trial check all events were in order AND all event times were not NaN metric = a & b & c & d & ~nans passed = metric.astype(float) passed[data['correct']] = np.nan # Look only at incorrect trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_correct_trial_event_sequence(data, **_): """ Check that on correct trials, there are exactly: 1 audio events and 3 Bpod events (valve open, trial start, ITI), occurring in the correct order Metric: M = Bpod (trial start) > audio (go cue) > Bpod (valve) > Bpod (ITI) > Bpod (trial end) Criterion: M == True Units: -none- :param data: dict of trial data with keys ('valveOpen_times', 'goCue_times', 'intervals', 'itiIn_times', 'correct') """ # An array the length of N trials where True means at least one event time was NaN (bad) nans = ( np.isnan(data['intervals'][:, 0]) | np.isnan(data['goCue_times']) | # noqa np.isnan(data['valveOpen_times']) | np.isnan(data['itiIn_times']) | # noqa np.isnan(data['intervals'][:, 1]) ) # For each trial check that the events happened in the correct order (ignore NaN values) a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue b = np.less(data['goCue_times'], data['valveOpen_times'], where=~nans) # Go cue < feedback c = np.less(data['valveOpen_times'], data['itiIn_times'], where=~nans) # Feedback < ITI start d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time # For each trial True means all events were in order AND all event times were not NaN metric = a & b & c & d & ~nans passed = metric.astype(float) passed[~data['correct']] = np.nan # Look only at correct trials assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_n_trial_events(data, **_): """ Check that the number events per trial is correct Within every trial interval there should be one of each trial event, except for goCueTrigger_times which should only be defined for incorrect trials Metric: M = all(start < event < end) for all event times except errorCueTrigger_times where start < error_trigger < end if not correct trial, else error_trigger == NaN Criterion: M == True Units: -none-, boolean :param data: dict of trial data with keys ('intervals', 'stimOnTrigger_times', 'stimOffTrigger_times', 'stimOn_times', 'stimOff_times', 'stimFreezeTrigger_times', 'errorCueTrigger_times', 'itiIn_times', 'goCueTrigger_times', 'goCue_times', 'response_times', 'feedback_times') """ intervals = data['intervals'] correct = data['correct'] err_trig = data['errorCueTrigger_times'] # Exclude these fields; valve and errorCue times are the same as feedback_times and we must # test errorCueTrigger_times separately # stimFreeze_times fails often due to TTL flicker exclude = ['camera_timestamps', 'errorCueTrigger_times', 'errorCue_times', 'firstMovement_times', 'peakVelocity_times', 'valveOpen_times', 'wheel_moves_peak_amplitude', 'wheel_moves_intervals', 'wheel_timestamps', 'wheel_intervals', 'stimFreeze_times'] events = [k for k in data.keys() if k.endswith('_times') and k not in exclude] metric = np.zeros(data['intervals'].shape[0], dtype=bool) # For each trial interval check that one of each trial event occurred. For incorrect trials, # check the error cue trigger occurred within the interval, otherwise check it is nan. for i, (start, end) in enumerate(intervals): metric[i] = (all([start < data[k][i] < end for k in events]) and (np.isnan(err_trig[i]) if correct[i] else start < err_trig[i] < end)) passed = metric.astype(bool) assert intervals.shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_trial_length(data, **_): """ Check that the time difference between the onset of the go cue sound and the feedback (error sound or valve) is positive and smaller than 60.1 s. Metric: M = feedback_times - goCue_times Criteria: 0 < M < 60.1 s Units: seconds [s] :param data: dict of trial data with keys ('feedback_times', 'goCue_times', 'intervals') """ # NaN values are usually ignored so replace them with Inf so they fail the threshold metric = np.nan_to_num(data['feedback_times'] - data['goCue_times'], nan=np.inf) passed = (metric < 60.1) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Trigger-response delay checks ===
[docs] def check_goCue_delays(data, audio_output='harp', **_): """ Check that the time difference between the go cue sound being triggered and effectively played is smaller than 1ms. Metric: M = goCue_times - goCueTrigger_times Criterion: 0 < M <= 0.0015 s Units: seconds [s] :param data: dict of trial data with keys ('goCue_times', 'goCueTrigger_times', 'intervals'). :param audio_output: audio output device name. Notes ----- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.0015 if audio_output.lower() == 'harp' else 0.053 metric = np.nan_to_num(data['goCue_times'] - data['goCueTrigger_times'], nan=np.inf) passed = (metric <= threshold) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_errorCue_delays(data, audio_output='harp', **_): """ Check that the time difference between the error sound being triggered and effectively played is smaller than 1ms. Metric: M = errorCue_times - errorCueTrigger_times Criterion: 0 < M <= 0.0015 s Units: seconds [s] :param data: dict of trial data with keys ('errorCue_times', 'errorCueTrigger_times', 'intervals', 'correct') :param audio_output: audio output device name. Notes ----- For non-harp sound card the permissible delay is 0.062s. This was chosen by taking the 99.5th percentile of delays over 500 training sessions using the Xonar soundcard. """ threshold = 0.0015 if audio_output.lower() == 'harp' else 0.062 metric = np.nan_to_num(data['errorCue_times'] - data['errorCueTrigger_times'], nan=np.inf) passed = ((metric <= threshold) & (metric > 0)).astype(float) passed[data['correct']] = metric[data['correct']] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOn_delays(data, **_): """ Check that the time difference between the visual stimulus onset-command being triggered and the stimulus effectively appearing on the screen is smaller than 150 ms. Metric: M = stimOn_times - stimOnTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimOn_times', 'stimOnTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimOn_times'] - data['stimOnTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimOff_delays(data, **_): """ Check that the time difference between the visual stimulus offset-command being triggered and the visual stimulus effectively turning off on the screen is smaller than 150 ms. Metric: M = stimOff_times - stimOffTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimOff_times', 'stimOffTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimOff_times'] - data['stimOffTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_stimFreeze_delays(data, **_): """ Check that the time difference between the visual stimulus freeze-command being triggered and the visual stimulus effectively freezing on the screen is smaller than 150 ms. Metric: M = stimFreeze_times - stimFreezeTrigger_times Criterion: 0 < M < 0.15 s Units: seconds [s] :param data: dict of trial data with keys ('stimFreeze_times', 'stimFreezeTrigger_times', 'intervals') """ metric = np.nan_to_num(data['stimFreeze_times'] - data['stimFreezeTrigger_times'], nan=np.inf) passed = (metric <= 0.15) & (metric > 0) assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
# === Data integrity checks ===
[docs] def check_reward_volumes(data, **_): """ Check that the reward volume is between 1.5 and 3 uL for correct trials, 0 for incorrect. Metric: M = reward volume Criterion: 1.5 <= M <= 3 if correct else M == 0 Units: uL :param data: dict of trial data with keys ('rewardVolume', 'correct', 'intervals') """ metric = data['rewardVolume'] correct = data['correct'] passed = np.zeros_like(metric, dtype=bool) # Check correct trials within correct range passed[correct] = (1.5 <= metric[correct]) & (metric[correct] <= 3.) # Check incorrect trials are 0 passed[~correct] = metric[~correct] == 0 assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_reward_volume_set(data, **_): """ Check that there is only two reward volumes within a session, one of which is 0. Metric: M = set(rewardVolume) Criterion: (0 < len(M) <= 2) and 0 in M :param data: dict of trial data with keys ('rewardVolume') """ metric = data['rewardVolume'] passed = 0 < len(set(metric)) <= 2 and 0. in metric return metric, passed
[docs] def check_wheel_integrity(data, re_encoding='X1', enc_res=None, **_): """ Check that the difference between wheel position samples is close to the encoder resolution and that the wheel timestamps strictly increase. Note: At high velocities some samples are missed due to the scanning frequency of the DAQ. This checks for more than 1 missing sample in a row (i.e. the difference between samples >= 2) Metric: M = (absolute difference of the positions < 1.5 * encoder resolution) + 1 if (difference of timestamps <= 0) else 0 Criterion: M ~= 0 Units: arbitrary (radians, sometimes + 1) :param data: dict of wheel data with keys ('wheel_timestamps', 'wheel_position') :param re_encoding: the encoding of the wheel data, X1, X2 or X4 :param enc_res: the rotary encoder resolution (default 1024 ticks per revolution) """ if isinstance(re_encoding, str): re_encoding = int(re_encoding[-1]) # The expected difference between samples in the extracted units resolution = 1 / (enc_res or ephys_fpga.WHEEL_TICKS ) * np.pi * 2 * ephys_fpga.WHEEL_RADIUS_CM / re_encoding # We expect the difference of neighbouring positions to be close to the resolution pos_check = np.abs(np.diff(data['wheel_position'])) # Timestamps should be strictly increasing ts_check = np.diff(data['wheel_timestamps']) <= 0. metric = pos_check + ts_check.astype(float) # all values should be close to zero passed = metric < 1.5 * resolution return metric, passed
# === Pre-stimulus checks ===
[docs] def check_stimulus_move_before_goCue(data, photodiode=None, **_): """ Check that there are no visual stimulus change(s) between the start of the trial and the go cue sound onset, except for stim on. Metric: M = number of visual stimulus change events between trial start and goCue_times Criterion: M == 1 Units: -none-, integer :param data: dict of trial data with keys ('goCue_times', 'intervals', 'choice') :param photodiode: the fronts from Bpod's BNC1 input or FPGA frame2ttl channel Notes ----- - There should be exactly 1 stimulus change before goCue; stimulus onset. Even if the stimulus contrast is 0, the sync square will still flip at stimulus onset, etc. - If there are no goCue times (all are NaN), the status should be NOT_SET. """ if photodiode is None: _log.warning('No photodiode TTL input in function call, returning None') return None photodiode_clean = ephys_fpga._clean_frame2ttl(photodiode) s = photodiode_clean['times'] s = s[~np.isnan(s)] # Remove NaNs metric = np.array([]) for i, c in zip(data['intervals'][:, 0], data['goCue_times']): metric = np.append(metric, np.count_nonzero(s[s > i] < c)) passed = (metric == 1).astype(float) passed[np.isnan(data['goCue_times'])] = np.nan assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed
[docs] def check_audio_pre_trial(data, audio=None, **_): """ Check that there are no audio outputs between the start of the trial and the go cue sound onset - 20 ms. Metric: M = sum(start_times < audio TTL < (goCue_times - 20ms)) Criterion: M == 0 Units: -none-, integer :param data: dict of trial data with keys ('goCue_times', 'intervals') :param audio: the fronts from Bpod's BNC2 input FPGA audio sync channel """ if audio is None: _log.warning('No BNC2 input in function call, retuning None') return None s = audio['times'][~np.isnan(audio['times'])] # Audio TTLs with NaNs removed metric = np.array([], dtype=np.int8) for i, c in zip(data['intervals'][:, 0], data['goCue_times']): metric = np.append(metric, sum(s[s > i] < (c - 0.02))) passed = metric == 0 assert data['intervals'].shape[0] == len(metric) == len(passed) return metric, passed