"""Behaviour QC.
This module runs a list of quality control metrics on the behaviour data.
.. warning::
The QC should be loaded using :meth:`ibllib.pipes.base_tasks.BehaviourTask.run_qc` and not
instantiated directly.
Examples
--------
Running on a behaviour rig computer and updating QC fields in Alyx:
>>> from ibllib.qc.task_qc_viewer.task_qc import show_session_task_qc
>>> qc = show_session_task_qc(session_path, bpod_only=True, local=True) # must close Viewer window
>>> qc = qc.run(update=True)
Downloading the required data and inspecting the QC on a different computer:
>>> from ibllib.pipes.dynamic_pipeline import get_trials_tasks
>>> from one.api import ONE
>>> task = get_trials_tasks(session_path, one=ONE())[0] # get first task run
>>> task.location = 'remote'
>>> task.setUp() # download required data
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()
Inspecting individual test outcomes
>>> outcome, results, outcomes = qc.compute_session_status()
Running bpod QC on ephys session (when not on behaviour rig PC)
>>> from ibllib.qc.task_qc_viewer.task_qc import get_bpod_trials_task, get_trials_tasks
>>> from one.api import ONE
>>> tasks = get_trials_tasks(session_path, one=ONE())
>>> task = get_bpod_trials_task(tasks[0]) # Ensure Bpod only on behaviour rig
>>> task.location = 'remote'
>>> task.setUp() # download required data
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()
Running ephys QC, from local server PC (after ephys + bpod data have been copied to a same folder)
>>> from ibllib.pipes.dynamic_pipeline import get_trials_tasks
>>> task = get_trials_tasks(session_path, one=ONE())[0] # get first task run
>>> qc = task.run_qc(update=False)
>>> outcome, results = qc.run()
"""
import logging
import sys
from packaging import version
from pathlib import Path, PurePosixPath
from datetime import datetime, timedelta
from inspect import getmembers, isfunction
from functools import reduce
from collections.abc import Sized
import numpy as np
from scipy.stats import chisquare
from brainbox.behavior.wheel import cm_to_rad, traces_by_trial
from ibllib.io.extractors import ephys_fpga
from one.alf import spec
from . import base
_log = logging.getLogger(__name__)
# todo the 2 followint parameters should be read from the task parameters for each session
ITI_DELAY_SECS = .5
FEEDBACK_NOGO_DELAY_SECS = 2
BWM_CRITERIA = {
'default': {'PASS': 0.99, 'WARNING': 0.90, 'FAIL': 0}, # Note: WARNING was 0.95 prior to Aug 2022
'_task_stimOff_itiIn_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_positive_feedback_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_negative_feedback_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_wheel_move_during_closed_loop': {'PASS': 0.99, 'WARNING': 0},
'_task_response_stimFreeze_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_detected_wheel_moves': {'PASS': 0.99, 'WARNING': 0},
'_task_trial_length': {'PASS': 0.99, 'WARNING': 0},
'_task_goCue_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_errorCue_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_stimOn_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_stimOff_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_stimFreeze_delays': {'PASS': 0.99, 'WARNING': 0},
'_task_iti_delays': {'NOT_SET': 0},
'_task_passed_trial_checks': {'NOT_SET': 0}
}
[docs]
def compute_session_status_from_dict(results, criteria=None):
"""
Compute overall task QC value from QC check results.
Given a dictionary of results, computes the overall session QC for each key and aggregates
in a single value.
Parameters
----------
results : dict
A dictionary of QC keys containing (usually scalar) values.
criteria : dict
A dictionary of qc keys containing map of PASS, WARNING, FAIL thresholds.
Returns
-------
one.alf.spec.QC
Overall session QC outcome.
dict
A map of QC tests and their outcomes.
"""
if not criteria:
criteria = {'default': BWM_CRITERIA['default']}
outcomes = {k: TaskQC.thresholding(v, thresholds=criteria.get(k, criteria['default']))
for k, v in results.items()}
# Criteria map is in order of severity so the max index is our overall QC outcome
session_outcome = base.QC.overall_outcome(outcomes.values())
return session_outcome, outcomes
[docs]
def update_dataset_qc(qc, registered_datasets, one, override=False):
"""
Update QC values for individual datasets.
Parameters
----------
qc : ibllib.qc.task_metrics.TaskQC
A TaskQC object that has been run.
registered_datasets : list of dict
A list of Alyx dataset records.
one : one.api.OneAlyx
An online instance of ONE.
override : bool
If True the QC field is updated even if new value is better than previous.
Returns
-------
list of dict
The list of registered datasets but with the 'qc' fields updated.
"""
# Create map of dataset name, sans extension, to dataset id
stem2id = {PurePosixPath(dset['name']).stem: dset.get('id') for dset in registered_datasets}
# Ensure dataset stems are unique
assert len(stem2id) == len(registered_datasets), 'ambiguous dataset names'
# dict of QC check to outcome (as enum value)
*_, outcomes = qc.compute_session_status()
# work over map of dataset name (sans extension) to outcome (enum or dict of columns: enum)
for name, outcome in qc.compute_dataset_qc_status(outcomes).items():
# if outcome is a dict, calculate aggregate outcome for each column
if isinstance(outcome, dict):
extended_qc = outcome
outcome = qc.overall_outcome(outcome.values())
else:
extended_qc = {}
# check if dataset was registered to Alyx
if not (did := stem2id.get(name)):
_log.debug('dataset %s not registered, skipping', name)
continue
# update the dataset QC value on Alyx
if outcome > spec.QC.NOT_SET or override:
dset_qc = base.QC(did, one=one, log=_log, endpoint='datasets')
dset = next(x for x in registered_datasets if did == x.get('id'))
dset['qc'] = dset_qc.update(outcome, namespace='', override=override).name
if extended_qc:
dset_qc.update_extended_qc(extended_qc)
return registered_datasets
[docs]
class TaskQC(base.QC):
"""Task QC for training, biased, and ephys choice world."""
criteria = BWM_CRITERIA
extractor = None
"""ibllib.qc.task_extractors.TaskQCExtractor: A task extractor object containing raw and extracted data."""
[docs]
@staticmethod
def thresholding(qc_value, thresholds=None) -> spec.QC:
"""
Compute the outcome of a single key by applying thresholding.
Parameters
----------
qc_value : float
Proportion of passing qcs, between 0 and 1.
thresholds : dict
Dictionary with keys 'PASS', 'WARNING', 'FAIL', (or enum
integers, c.f. one.alf.spec.QC).
Returns
-------
one.alf.spec.QC
The outcome.
"""
thresholds = {spec.QC.validate(k): v for k, v in thresholds.items() or {}}
MAX_BOUND, MIN_BOUND = (1, 0)
if qc_value is None or np.isnan(qc_value):
return spec.QC.NOT_SET
elif (qc_value > MAX_BOUND) or (qc_value < MIN_BOUND):
raise ValueError('Values out of bound')
for crit in filter(None, sorted(spec.QC)):
if crit in thresholds.keys() and qc_value >= thresholds[crit]:
return crit
# if None of this applies, return 'NOT_SET'
return spec.QC.NOT_SET
def __init__(self, session_path_or_eid, **kwargs):
"""
Task QC for training, biased, and ephys choice world.
:param session_path_or_eid: A session eid or path
:param log: A logging.Logger instance, if None the 'ibllib' logger is used
:param one: An ONE instance for fetching and setting the QC on Alyx
"""
# When an eid is provided, we will download the required data by default (if necessary)
self.download_data = not spec.is_session_path(Path(session_path_or_eid))
super().__init__(session_path_or_eid, **kwargs)
# Data
self.extractor = None
# Metrics and passed trials
self.metrics = None
self.passed = None
# Criteria (initialize as outcomes vary by class, task, and hardware)
self.criteria = BWM_CRITERIA.copy()
[docs]
def compute(self, **kwargs):
"""Compute and store the QC metrics.
Runs the QC on the session and stores a map of the metrics for each datapoint for each
test, and a map of which datapoints passed for each test.
Parameters
----------
bpod_only : bool
If True no data is extracted from the FPGA for ephys sessions.
"""
assert self.extractor is not None
ver = self.extractor.settings.get('IBLRIG_VERSION', '') or '0.0.0'
if version.parse(ver) >= version.parse('8.0.0'):
self.criteria['_task_iti_delays'] = {'PASS': 0.99, 'WARNING': 0}
self.criteria['_task_passed_trial_checks'] = {'PASS': 0.7, 'WARNING': 0}
else:
self.criteria['_task_iti_delays'] = {'NOT_SET': 0}
self.criteria['_task_passed_trial_checks'] = {'NOT_SET': 0}
self.log.info(f'Session {self.session_path}: Running QC on behavior data...')
self.get_bpodqc_metrics_frame(
self.extractor.data,
wheel_gain=self.extractor.settings['STIM_GAIN'], # The wheel gain
photodiode=self.extractor.frame_ttls,
audio=self.extractor.audio_ttls,
re_encoding=self.extractor.wheel_encoding or 'X1',
min_qt=self.extractor.settings.get('QUIESCENT_PERIOD') or 0.2,
audio_output=self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown')
)
def _get_checks(self):
"""
Find all methods that begin with 'check_'.
Returns
-------
Dict[str, function]
A map of QC check function names and the corresponding functions that return `metric`
(any), `passed` (bool).
"""
def is_metric(x):
return isfunction(x) and x.__name__.startswith('check_')
return dict(getmembers(sys.modules[__name__], is_metric))
[docs]
def get_bpodqc_metrics_frame(self, data, **kwargs):
"""
Evaluate task QC metrics.
Evaluates all the QC metric functions in this module (those starting with 'check') and
returns the results. The optional kwargs listed below are passed to each QC metric function.
Parameters
----------
data : dict
The extracted task data.
re_encoding : str {'X1', 'X2', 'X4'}
The encoding configuration of the rotary encoder.
enc_res : int
The rotary encoder resolution as number of fronts per revolution.
wheel_gain : float
The STIM_GAIN task parameter.
photodiode : dict
The fronts from Bpod's BNC1 input or FPGA frame2ttl channel.
audio : dict
The fronts from Bpod's BNC2 input FPGA audio sync channel.
min_qt : float
The QUIESCENT_PERIOD task parameter.
Returns
-------
dict
Map of checks and their QC metric values (1 per trial).
dict
Map of checks and a float array of which samples passed.
"""
# Find all methods that begin with 'check_'
checks = self._get_checks()
prefix = '_task_' # Extended QC fields will start with this
# Method 'check_foobar' stored with key '_task_foobar' in metrics map
qc_metrics_map = {prefix + k[6:]: fn(data, **kwargs) for k, fn in checks.items()}
# Split metrics and passed frames
self.metrics = {}
self.passed = {}
for k in qc_metrics_map:
self.metrics[k], self.passed[k] = qc_metrics_map[k]
# Add a check for trial level pass: did a given trial pass all checks?
n_trials = data['intervals'].shape[0]
# Trial-level checks return an array the length that equals the number of trials
trial_level_passed = [m for m in self.passed.values() if isinstance(m, Sized) and len(m) == n_trials]
name = prefix + 'passed_trial_checks'
self.metrics[name] = reduce(np.logical_and, trial_level_passed or (None, None))
self.passed[name] = self.metrics[name].astype(float) if trial_level_passed else None
[docs]
def run(self, update=False, namespace='task', **kwargs):
"""
Compute the QC outcomes and return overall task QC outcome.
Parameters
----------
update : bool
If True, updates the session QC fields on Alyx.
namespace : str
The namespace of the QC fields in the Alyx JSON field.
bpod_only : bool
If True no data is extracted from the FPGA for ephys sessions.
Returns
-------
str
Overall task QC outcome.
dict
A map of QC tests and the proportion of data points that passed them.
"""
if self.metrics is None:
self.compute(**kwargs)
outcome, results, _ = self.compute_session_status()
if update:
self.update_extended_qc(results)
self.update(outcome, namespace)
return outcome, results
[docs]
def compute_session_status(self):
"""
Compute the overall session QC for each key and aggregates in a single value.
Returns
-------
str
Overall session QC outcome.
dict
A map of QC tests and the proportion of data points that passed them.
dict
A map of QC tests and their outcomes.
"""
if self.passed is None:
raise AttributeError('passed is None; compute QC first')
# Get mean passed of each check, or None if passed is None or all NaN
results = {k: None if v is None or np.isnan(v).all() else np.nanmean(v)
for k, v in self.passed.items()}
session_outcome, outcomes = compute_session_status_from_dict(results, self.criteria)
return session_outcome, results, outcomes
[docs]
@staticmethod
def compute_dataset_qc_status(outcomes):
"""Return map of dataset specific QC values.
Parameters
----------
outcomes : dict
Map of checks and their individual outcomes.
Returns
-------
dict
Map of dataset names and their outcome.
"""
trials_table_outcomes = {
'intervals': outcomes.get('_task_iti_delays', spec.QC.NOT_SET),
'goCue_times': outcomes.get('_task_goCue_delays', spec.QC.NOT_SET),
'response_times': spec.QC.NOT_SET, 'choice': spec.QC.NOT_SET,
'stimOn_times': outcomes.get('_task_stimOn_delays', spec.QC.NOT_SET),
'contrastLeft': spec.QC.NOT_SET, 'contrastRight': spec.QC.NOT_SET,
'feedbackType': spec.QC.NOT_SET, 'probabilityLeft': spec.QC.NOT_SET,
'feedback_times': outcomes.get('_task_errorCue_delays', spec.QC.NOT_SET),
'firstMovement_times': spec.QC.NOT_SET
}
reward_checks = ('_task_reward_volumes', '_task_reward_volume_set')
trials_table_outcomes['rewardVolume']: TaskQC.overall_outcome(
(outcomes.get(x, spec.QC.NOT_SET) for x in reward_checks)
)
dataset_outcomes = {
'_ibl_trials.stimOff_times': outcomes.get('_task_stimOff_delays', spec.QC.NOT_SET),
'_ibl_trials.table': trials_table_outcomes,
}
return dataset_outcomes
[docs]
class HabituationQC(TaskQC):
"""Task QC for habituation choice world."""
[docs]
def compute(self, **kwargs):
"""Compute and store the QC metrics.
Runs the QC on the session and stores a map of the metrics for each datapoint for each
test, and a map of which datapoints passed for each test.
"""
assert self.extractor is not None
self.log.info(f'Session {self.session_path}: Running QC on habituation data...')
# Initialize checks
prefix = '_task_'
data = self.extractor.data
audio_output = self.extractor.settings.get('device_sound', {}).get('OUTPUT', 'unknown')
metrics = {}
passed = {}
# Modify criteria based on version
ver = self.extractor.settings.get('IBLRIG_VERSION', '') or '0.0.0'
is_v8 = version.parse(ver) >= version.parse('8.0.0')
self.criteria['_task_iti_delays'] = {'PASS': 0.99, 'WARNING': 0} if is_v8 else {'NOT_SET': 0}
# Check all reward volumes == 3.0ul
check = prefix + 'reward_volumes'
metrics[check] = data['rewardVolume']
passed[check] = metrics[check] == 3.0
# Check session durations are increasing in steps >= 12 minutes
check = prefix + 'habituation_time'
if not self.one or not self.session_path:
self.log.warning('unable to determine session trials without ONE')
metrics[check] = passed[check] = None
else:
subject, session_date = self.session_path.parts[-3:-1]
# compute from the date specified
date_minus_week = (
datetime.strptime(session_date, '%Y-%m-%d') - timedelta(days=7)
).strftime('%Y-%m-%d')
sessions = self.one.alyx.rest('sessions', 'list', subject=subject,
date_range=[date_minus_week, session_date],
task_protocol='habituation')
# Remove the current session if already registered
if sessions and sessions[0]['start_time'].startswith(session_date):
sessions = sessions[1:]
metric = ([0, data['intervals'][-1, 1] - data['intervals'][0, 0]] +
[(datetime.fromisoformat(x['end_time']) -
datetime.fromisoformat(x['start_time'])).total_seconds() / 60
for x in [self.one.alyx.get(s['url']) for s in sessions]])
# The duration from raw trial data
# duration = map(float, self.extractor.raw_data[-1]['elapsed_time'].split(':'))
# duration = timedelta(**dict(zip(('hours', 'minutes', 'seconds'),
# duration))).total_seconds() / 60
metrics[check] = np.array(metric)
passed[check] = np.diff(metric) >= 12
# Check event orders: trial_start < stim on < stim center < feedback < stim off
check = prefix + 'trial_event_sequence'
nans = (
np.isnan(data['intervals'][:, 0]) | # noqa
np.isnan(data['stimOn_times']) | # noqa
np.isnan(data['stimCenter_times']) |
np.isnan(data['valveOpen_times']) | # noqa
np.isnan(data['stimOff_times'])
)
a = np.less(data['intervals'][:, 0], data['stimOn_times'], where=~nans)
b = np.less(data['stimOn_times'], data['stimCenter_times'], where=~nans)
c = np.less(data['stimCenter_times'], data['valveOpen_times'], where=~nans)
d = np.less(data['valveOpen_times'], data['stimOff_times'], where=~nans)
metrics[check] = a & b & c & d & ~nans
passed[check] = metrics[check].astype(float)
# Check that the time difference between the visual stimulus center-command being
# triggered and the stimulus effectively appearing in the center is smaller than 150 ms.
check = prefix + 'stimCenter_delays'
metric = np.nan_to_num(data['stimCenter_times'] - data['stimCenterTrigger_times'],
nan=np.inf)
passed[check] = (metric <= 0.15) & (metric > 0)
metrics[check] = metric
# Phase check
check = prefix + 'phase'
metric = data['phase']
passed[check] = (metric <= 2 * np.pi) & (metric >= 0)
metrics[check] = metric
# This is not very useful as a check because there are so few trials
check = prefix + 'phase_distribution'
metric, _ = np.histogram(data['phase'])
_, p = chisquare(metric)
passed[check] = p < 0.05 if len(data['phase']) >= 400 else None # skip if too few trials
metrics[check] = metric
# Check that the period of gray screen between stim off and the start of the next trial is
# 1s +/- 10%.
check = prefix + 'iti_delays'
iti = (np.roll(data['stimOn_times'], -1) - data['stimOff_times'])[:-1]
metric = np.r_[np.nan_to_num(iti, nan=np.inf), np.nan] - 1.
passed[check] = np.abs(metric) <= 0.1
passed[check][-1] = np.nan
metrics[check] = metric
# Checks common to training QC
checks = [check_goCue_delays, check_stimOn_goCue_delays,
check_stimOn_delays, check_stimOff_delays]
for fcn in checks:
check = prefix + fcn.__name__[6:]
metrics[check], passed[check] = fcn(data, audio_output=audio_output)
self.metrics, self.passed = (metrics, passed)
# SINGLE METRICS
# ---------------------------------------------------------------------------- #
# === Delays between events checks ===
[docs]
def check_stimOn_goCue_delays(data, audio_output='harp', **_):
"""
Check the go cue tone occurs less than 10ms before stimulus on.
Checks that the time difference between the onset of the visual stimulus
and the onset of the go cue tone is positive and less than 10ms.
Metric:
M = stimOn_times - goCue_times
Criteria:
0 < M < 0.010 s
Units:
seconds [s]
:param data: dict of trial data with keys ('goCue_times', 'stimOn_times', 'intervals')
:param audio_output: audio output device name.
Notes
-----
- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th
percentile of delays over 500 training sessions using the Xonar soundcard.
"""
# Calculate the difference between stimOn and goCue times.
# If either are NaN, the result will be Inf to ensure that it crosses the failure threshold.
threshold = 0.01 if audio_output.lower() == 'harp' else 0.053
metric = np.nan_to_num(data['goCue_times'] - data['stimOn_times'], nan=np.inf)
passed = (metric < threshold) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_response_feedback_delays(data, audio_output='harp', **_):
"""
Check the feedback delivered within 10ms of the response threshold.
Checks that the time difference between the response and the feedback onset
(error sound or valve) is positive and less than 10ms.
Metric:
M = feedback_time - response_time
Criterion:
0 < M < 0.010 s
Units:
seconds [s]
:param data: dict of trial data with keys ('feedback_times', 'response_times', 'intervals')
:param audio_output: audio output device name.
Notes
-----
- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th
percentile of delays over 500 training sessions using the Xonar soundcard.
"""
threshold = 0.01 if audio_output.lower() == 'harp' else 0.053
metric = np.nan_to_num(data['feedback_times'] - data['response_times'], nan=np.inf)
passed = (metric < threshold) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_response_stimFreeze_delays(data, **_):
"""
Check the stimulus freezes within 100ms of the expected time.
Checks that the time difference between the visual stimulus freezing and the
response is positive and less than 100ms.
Metric:
M = (stimFreeze_times - response_times)
Criterion:
0 < M < 0.100 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimFreeze_times', 'response_times', 'intervals',
'choice')
"""
# Calculate the difference between stimOn and goCue times.
# If either are NaN, the result will be Inf to ensure that it crosses the failure threshold.
metric = np.nan_to_num(data['stimFreeze_times'] - data['response_times'], nan=np.inf)
# Test for valid values
passed = ((metric < 0.1) & (metric > 0)).astype(float)
# Finally remove no_go trials (stimFreeze triggered differently in no_go trials)
# These values are ignored in calculation of proportion passed
passed[data['choice'] == 0] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_stimOff_itiIn_delays(data, **_):
"""Check that the start of the trial interval is within 10ms of the visual stimulus turning off.
Metric:
M = itiIn_times - stimOff_times
Criterion:
0 < M < 0.010 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimOff_times', 'itiIn_times', 'intervals',
'choice')
"""
# If either are NaN, the result will be Inf to ensure that it crosses the failure threshold.
metric = np.nan_to_num(data['itiIn_times'] - data['stimOff_times'], nan=np.inf)
passed = ((metric < 0.01) & (metric >= 0)).astype(float)
# Remove no_go trials (stimOff triggered differently in no_go trials)
# NaN values are ignored in calculation of proportion passed
metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_iti_delays(data, subtract_pauses=False, iti_delay_secs=ITI_DELAY_SECS,
feedback_nogo_delay_secs=FEEDBACK_NOGO_DELAY_SECS, **_):
"""
Check the open-loop grey screen period is approximately 1 second.
Check that the period of grey screen between stim off and the start of the next trial is
1s +/- 10%. If the trial was paused during this time, the check will account for that
Metric:
M = stimOff (n) - trialStart (n+1) - 1.
Criterion:
|M| < 0.1
Units:
seconds [s]
Parameters
----------
data : dict
Trial data with keys ('stimOff_times', 'intervals', 'pause_duration').
subtract_pauses: bool
If True, account for experimenter-initiated pauses between trials; if False, trials where
the experimenter paused the task may fail this check.
Returns
-------
numpy.array
An array of metric values to threshold.
numpy.array
An array of boolean values, 1 per trial, where True means trial passes QC threshold.
"""
metric = np.full(data['intervals'].shape[0], np.nan)
passed = metric.copy()
pauses = (data['pause_duration'] if subtract_pauses else np.zeros_like(metric))[:-1]
# Get the difference between stim off and the start of the next trial
# Missing data are set to Inf, except for the last trial which is a NaN
metric[:-1] = np.nan_to_num(
data['intervals'][1:, 0] - data['stimOff_times'][:-1] - iti_delay_secs - pauses,
nan=np.inf
)
metric[data['choice'] == 0] = metric[data['choice'] == 0] - feedback_nogo_delay_secs
passed[:-1] = np.abs(metric[:-1]) < (iti_delay_secs / 10) # Last trial is not counted
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_positive_feedback_stimOff_delays(data, **_):
"""
Check stimulus offset occurs approximately 1 second after reward delivered.
Check that the time difference between the valve onset and the visual stimulus turning off
is 1 ± 0.150 seconds.
Metric:
M = stimOff_times - feedback_times - 1s
Criterion:
|M| < 0.150 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimOff_times', 'feedback_times', 'intervals',
'correct')
"""
# If either are NaN, the result will be Inf to ensure that it crosses the failure threshold.
metric = np.nan_to_num(data['stimOff_times'] - data['feedback_times'] - 1, nan=np.inf)
passed = (np.abs(metric) < 0.15).astype(float)
# NaN values are ignored in calculation of proportion passed; ignore incorrect trials here
metric[~data['correct']] = passed[~data['correct']] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_negative_feedback_stimOff_delays(data, feedback_nogo_delay_secs=FEEDBACK_NOGO_DELAY_SECS, **_):
"""
Check the stimulus offset occurs approximately 2 seconds after negative feedback delivery.
Check that the time difference between the error sound and the visual stimulus
turning off is 2 ± 0.150 seconds.
Metric:
M = stimOff_times - errorCue_times - 2s
Criterion:
|M| < 0.150 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimOff_times', 'errorCue_times', 'intervals')
"""
metric = np.nan_to_num(data['stimOff_times'] - data['errorCue_times'] - 2, nan=np.inf)
# for the nogo trials, the feedback is the same as the stimOff
metric[data['choice'] == 0] = metric[data['choice'] == 0] + feedback_nogo_delay_secs
# Apply criteria
passed = (np.abs(metric) < 0.15).astype(float)
# Remove none negative feedback trials
metric[data['correct']] = passed[data['correct']] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
# === Wheel movement during trial checks ===
[docs]
def check_wheel_move_before_feedback(data, **_):
"""Check that the wheel does move within 100ms of the feedback onset (error sound or valve).
Metric:
M = (w_t - 0.05) - (w_t + 0.05), where t = feedback_times
Criterion:
M != 0
Units:
radians
:param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice',
'intervals', 'feedback_times')
"""
# Get tuple of wheel times and positions within 100ms of feedback
traces = traces_by_trial(
data['wheel_timestamps'],
data['wheel_position'],
start=data['feedback_times'] - 0.05,
end=data['feedback_times'] + 0.05,
)
metric = np.zeros_like(data['feedback_times'])
# For each trial find the displacement
for i, trial in enumerate(traces):
pos = trial[1]
if pos.size > 1:
metric[i] = pos[-1] - pos[0]
# except no-go trials
metric[data['choice'] == 0] = np.nan # NaN = trial ignored for this check
nans = np.isnan(metric)
passed = np.zeros_like(metric) * np.nan
passed[~nans] = (metric[~nans] != 0).astype(float)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
def _wheel_move_during_closed_loop(re_ts, re_pos, data, wheel_gain=None, tol=1, **_):
"""
Check the wheel moves the correct amount to reach threshold.
Check that the wheel moves by approximately 35 degrees during the closed-loop period
on trials where a feedback (error sound or valve) is delivered.
Metric:
M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response
time, w_t0 = position at go cue time, threshold_displacement = displacement required to
move 35 visual degrees
Criterion:
displacement < tol visual degree
Units:
degrees angle of wheel turn
:param re_ts: extracted wheel timestamps in seconds
:param re_pos: extracted wheel positions in radians
:param data: a dict with the keys (goCueTrigger_times, response_times, feedback_times,
position, choice, intervals)
:param wheel_gain: the 'STIM_GAIN' task setting
:param tol: the criterion in visual degrees
"""
if wheel_gain is None:
_log.warning('No wheel_gain input in function call, returning None')
return None, None
# Get tuple of wheel times and positions over each trial's closed-loop period
traces = traces_by_trial(re_ts, re_pos,
start=data['goCueTrigger_times'],
end=data['response_times'])
metric = np.zeros_like(data['feedback_times'])
# For each trial find the absolute displacement
for i, trial in enumerate(traces):
t, pos = trial
if pos.size != 0:
# Find the position of the preceding sample and subtract it
idx = np.abs(re_ts - t[0]).argmin() - 1
origin = re_pos[idx]
metric[i] = np.abs(pos - origin).max()
# Load wheel_gain and thresholds for each trial
wheel_gain = np.array([wheel_gain] * len(data['position']))
thresh = data['position']
# abs displacement, s, in mm required to move 35 visual degrees
s_mm = np.abs(thresh / wheel_gain) # don't care about direction
criterion = cm_to_rad(s_mm * 1e-1) # convert abs displacement to radians (wheel pos is in rad)
metric = metric - criterion # difference should be close to 0
rad_per_deg = cm_to_rad(1 / wheel_gain * 1e-1)
passed = (np.abs(metric) < rad_per_deg * tol).astype(float) # less than 1 visual degree off
metric[data['choice'] == 0] = passed[data['choice'] == 0] = np.nan # except no-go trials
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_wheel_move_during_closed_loop(data, wheel_gain=None, **_):
"""
Check the wheel moves the correct amount to reach threshold.
Check that the wheel moves by approximately 35 degrees during the closed-loop period
on trials where a feedback (error sound or valve) is delivered.
Metric:
M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response
time, w_t0 = position at go cue time, threshold_displacement = displacement required to
move 35 visual degrees
Criterion:
displacement < 3 visual degrees
Units:
degrees angle of wheel turn
:param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'choice',
'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position')
:param wheel_gain: the 'STIM_GAIN' task setting
"""
# Get the Bpod extracted wheel data
timestamps = data['wheel_timestamps']
position = data['wheel_position']
return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=3)
[docs]
def check_wheel_move_during_closed_loop_bpod(data, wheel_gain=None, **_):
"""
Check the wheel moves the correct amount to reach threshold.
Check that the wheel moves by approximately 35 degrees during the closed-loop period
on trials where a feedback (error sound or valve) is delivered. This check uses the Bpod
wheel data (measured at a lower resolution) with a stricter tolerance (1 visual degree).
Metric:
M = abs(w_resp - w_t0) - threshold_displacement, where w_resp = position at response
time, w_t0 = position at go cue time, threshold_displacement = displacement required to
move 35 visual degrees.
Criterion:
displacement < 1 visual degree
Units:
degrees angle of wheel turn
:param data: dict of trial data with keys ('wheel_timestamps(_bpod)', 'wheel_position(_bpod)',
'choice', 'intervals', 'goCueTrigger_times', 'response_times', 'feedback_times', 'position')
:param wheel_gain: the 'STIM_GAIN' task setting
"""
# Get the Bpod extracted wheel data
timestamps = data.get('wheel_timestamps_bpod', data['wheel_timestamps'])
position = data.get('wheel_position_bpod', data['wheel_position'])
return _wheel_move_during_closed_loop(timestamps, position, data, wheel_gain, tol=1)
[docs]
def check_wheel_freeze_during_quiescence(data, **_):
"""
Check the wheel is indeed still during the quiescent period.
Check that the wheel does not move more than 2 degrees in each direction during the
quiescence interval before the stimulus appears.
Metric:
M = |max(W) - min(W)| where W is wheel pos over quiescence interval;
interval = [stimOnTrigger_times - quiescent_duration, stimOnTrigger_times]
Criterion:
M < 2 degrees
Units:
degrees angle of wheel turn
:param data: dict of trial data with keys ('wheel_timestamps', 'wheel_position', 'quiescence',
'intervals', 'stimOnTrigger_times')
"""
assert np.all(np.diff(data['wheel_timestamps']) >= 0)
assert data['quiescence'].size == data['stimOnTrigger_times'].size
# Get tuple of wheel times and positions over each trial's quiescence period
qevt_start_times = data['stimOnTrigger_times'] - data['quiescence']
traces = traces_by_trial(
data['wheel_timestamps'],
data['wheel_position'],
start=qevt_start_times,
end=data['stimOnTrigger_times']
)
metric = np.zeros((len(data['quiescence']), 2)) # (n_trials, n_directions)
for i, trial in enumerate(traces):
t, pos = trial
# Get the last position before the period began
if pos.size > 0:
# Find the position of the preceding sample and subtract it
idx = np.abs(data['wheel_timestamps'] - t[0]).argmin() - 1
origin = data['wheel_position'][idx if idx != -1 else 0]
# Find the absolute min and max relative to the last sample
metric[i, :] = np.abs([np.min(pos - origin), np.max(pos - origin)])
# Reduce to the largest displacement found in any direction
metric = np.max(metric, axis=1)
metric = 180 * metric / np.pi # convert to degrees from radians
criterion = 2 # Position shouldn't change more than 2 in either direction
passed = metric < criterion
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_detected_wheel_moves(data, min_qt=0, **_):
"""Check that the detected first movement times are reasonable.
Metric:
M = firstMovement times
Criterion:
(goCue trigger time - min quiescent period) < M < response time
Units:
Seconds [s]
:param data: dict of trial data with keys ('firstMovement_times', 'goCueTrigger_times',
'response_times', 'choice', 'intervals')
:param min_qt: the minimum possible quiescent period (the QUIESCENT_PERIOD task parameter)
"""
# Depending on task version this may be a single value or an array of quiescent periods
min_qt = np.array(min_qt)
if min_qt.size > data['intervals'].shape[0]:
min_qt = min_qt[:data['intervals'].shape[0]]
metric = data['firstMovement_times']
qevt_start = data['goCueTrigger_times'] - np.array(min_qt)
response = data['response_times']
# First movement time for each trial should be after the quiescent period and before feedback
passed = np.array([a < m < b for m, a, b in zip(metric, qevt_start, response)], dtype=float)
nogo = data['choice'] == 0
passed[nogo] = np.nan # No go trial may have no movement times and that's fine
return metric, passed
# === Sequence of events checks ===
[docs]
def check_error_trial_event_sequence(data, **_):
"""
Check trial events occur in correct order for negative feedback trials.
Check that on incorrect / miss trials, there are exactly:
2 audio events (go cue sound and error sound) and 2 Bpod events (trial start, ITI), occurring
in the correct order
Metric:
M = Bpod (trial start) > audio (go cue) > audio (error) > Bpod (ITI) > Bpod (trial end)
Criterion:
M == True
Units:
-none-
:param data: dict of trial data with keys ('errorCue_times', 'goCue_times', 'intervals',
'itiIn_times', 'correct')
"""
# An array the length of N trials where True means at least one event time was NaN (bad)
nans = (
np.isnan(data['intervals'][:, 0]) |
np.isnan(data['goCue_times']) | # noqa
np.isnan(data['errorCue_times']) | # noqa
np.isnan(data['itiIn_times']) | # noqa
np.isnan(data['intervals'][:, 1])
)
# For each trial check that the events happened in the correct order (ignore NaN values)
a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue
b = np.less(data['goCue_times'], data['errorCue_times'], where=~nans) # Go cue < error cue
c = np.less(data['errorCue_times'], data['itiIn_times'], where=~nans) # Error cue < ITI start
d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time
# For each trial check all events were in order AND all event times were not NaN
metric = a & b & c & d & ~nans
passed = metric.astype(float)
passed[data['correct']] = np.nan # Look only at incorrect trials
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_correct_trial_event_sequence(data, **_):
"""
Check trial events occur in correct order for positive feedback trials.
Check that on correct trials, there are exactly:
1 audio events and 3 Bpod events (valve open, trial start, ITI), occurring in the correct order
Metric:
M = Bpod (trial start) > audio (go cue) > Bpod (valve) > Bpod (ITI) > Bpod (trial end)
Criterion:
M == True
Units:
-none-
:param data: dict of trial data with keys ('valveOpen_times', 'goCue_times', 'intervals',
'itiIn_times', 'correct')
"""
# An array the length of N trials where True means at least one event time was NaN (bad)
nans = (
np.isnan(data['intervals'][:, 0]) |
np.isnan(data['goCue_times']) | # noqa
np.isnan(data['valveOpen_times']) |
np.isnan(data['itiIn_times']) | # noqa
np.isnan(data['intervals'][:, 1])
)
# For each trial check that the events happened in the correct order (ignore NaN values)
a = np.less(data['intervals'][:, 0], data['goCue_times'], where=~nans) # Start time < go cue
b = np.less(data['goCue_times'], data['valveOpen_times'], where=~nans) # Go cue < feedback
c = np.less(data['valveOpen_times'], data['itiIn_times'], where=~nans) # Feedback < ITI start
d = np.less(data['itiIn_times'], data['intervals'][:, 1], where=~nans) # ITI start < end time
# For each trial True means all events were in order AND all event times were not NaN
metric = a & b & c & d & ~nans
passed = metric.astype(float)
passed[~data['correct']] = np.nan # Look only at correct trials
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_n_trial_events(data, **_):
"""Check that the number events per trial is correct.
Within every trial interval there should be one of each trial event, except for
goCueTrigger_times which should only be defined for incorrect trials
Metric:
M = all(start < event < end) for all event times except errorCueTrigger_times where
start < error_trigger < end if not correct trial, else error_trigger == NaN
Criterion:
M == True
Units:
-none-, boolean
:param data: dict of trial data with keys ('intervals', 'stimOnTrigger_times',
'stimOffTrigger_times', 'stimOn_times', 'stimOff_times',
'stimFreezeTrigger_times', 'errorCueTrigger_times', 'itiIn_times',
'goCueTrigger_times', 'goCue_times', 'response_times', 'feedback_times')
"""
intervals = data['intervals']
correct = data['correct']
err_trig = data['errorCueTrigger_times']
# Exclude these fields; valve and errorCue times are the same as feedback_times and we must
# test errorCueTrigger_times separately
# stimFreeze_times fails often due to TTL flicker
exclude = ['camera_timestamps', 'errorCueTrigger_times', 'errorCue_times',
'wheelMoves_peakVelocity_times', 'valveOpen_times', 'wheelMoves_peakAmplitude',
'wheelMoves_intervals', 'wheel_timestamps', 'stimFreeze_times']
events = [k for k in data.keys() if k.endswith('_times') and k not in exclude]
metric = np.zeros(data['intervals'].shape[0], dtype=bool)
# For each trial interval check that one of each trial event occurred. For incorrect trials,
# check the error cue trigger occurred within the interval, otherwise check it is nan.
for i, (start, end) in enumerate(intervals):
metric[i] = (all([start < data[k][i] < end for k in events]) and
(np.isnan(err_trig[i]) if correct[i] else start < err_trig[i] < end))
passed = metric.astype(bool)
assert intervals.shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_trial_length(data, **_):
"""
Check open-loop duration positive and <= 1 minute.
Check that the time difference between the onset of the go cue sound
and the feedback (error sound or valve) is positive and smaller than 60.1 s.
Metric:
M = feedback_times - goCue_times
Criteria:
0 < M < 60.1 s
Units:
seconds [s]
:param data: dict of trial data with keys ('feedback_times', 'goCue_times', 'intervals')
"""
# NaN values are usually ignored so replace them with Inf so they fail the threshold
metric = np.nan_to_num(data['feedback_times'] - data['goCue_times'], nan=np.inf)
passed = (metric < 60.1) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
# === Trigger-response delay checks ===
[docs]
def check_goCue_delays(data, audio_output='harp', **_):
"""
Check the go cue tone occurs within 1ms of the intended time.
Check that the time difference between the go cue sound being triggered and
effectively played is smaller than 1ms.
Metric:
M = goCue_times - goCueTrigger_times
Criterion:
0 < M <= 0.0015 s
Units:
seconds [s]
:param data: dict of trial data with keys ('goCue_times', 'goCueTrigger_times', 'intervals').
:param audio_output: audio output device name.
Notes
-----
- For non-harp sound card the permissible delay is 0.053s. This was chosen by taking the 99.5th
percentile of delays over 500 training sessions using the Xonar soundcard.
"""
threshold = 0.0015 if audio_output.lower() == 'harp' else 0.053
metric = np.nan_to_num(data['goCue_times'] - data['goCueTrigger_times'], nan=np.inf)
passed = (metric <= threshold) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_errorCue_delays(data, audio_output='harp', **_):
"""
Check the error tone occurs within 1.5ms of the intended time.
Check that the time difference between the error sound being triggered and
effectively played is smaller than 1ms.
Metric:
M = errorCue_times - errorCueTrigger_times
Criterion:
0 < M <= 0.0015 s
Units:
seconds [s]
:param data: dict of trial data with keys ('errorCue_times', 'errorCueTrigger_times',
'intervals', 'correct')
:param audio_output: audio output device name.
Notes
-----
- For non-harp sound card the permissible delay is 0.062s. This was chosen by taking the 99.5th
percentile of delays over 500 training sessions using the Xonar soundcard.
"""
threshold = 0.0015 if audio_output.lower() == 'harp' else 0.062
metric = np.nan_to_num(data['errorCue_times'] - data['errorCueTrigger_times'], nan=np.inf)
passed = ((metric <= threshold) & (metric > 0)).astype(float)
passed[data['correct']] = metric[data['correct']] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_stimOn_delays(data, **_):
"""
Check the visual stimulus onset occurs within 150ms of the intended time.
Check that the time difference between the visual stimulus onset-command being triggered
and the stimulus effectively appearing on the screen is smaller than 150 ms.
Metric:
M = stimOn_times - stimOnTrigger_times
Criterion:
0 < M < 0.15 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimOn_times', 'stimOnTrigger_times',
'intervals')
"""
metric = np.nan_to_num(data['stimOn_times'] - data['stimOnTrigger_times'], nan=np.inf)
passed = (metric <= 0.15) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_stimOff_delays(data, **_):
"""
Check stimulus offset occurs within 150ms of the intended time.
Check that the time difference between the visual stimulus offset-command
being triggered and the visual stimulus effectively turning off on the screen
is smaller than 150 ms.
Metric:
M = stimOff_times - stimOffTrigger_times
Criterion:
0 < M < 0.15 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimOff_times', 'stimOffTrigger_times',
'intervals')
"""
metric = np.nan_to_num(data['stimOff_times'] - data['stimOffTrigger_times'], nan=np.inf)
passed = (metric <= 0.15) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_stimFreeze_delays(data, **_):
"""Check the stimulus freezes within 150ms of the intended time.
Check that the time difference between the visual stimulus freeze-command
being triggered and the visual stimulus effectively freezing on the screen
is smaller than 150 ms.
Metric:
M = stimFreeze_times - stimFreezeTrigger_times
Criterion:
0 < M < 0.15 s
Units:
seconds [s]
:param data: dict of trial data with keys ('stimFreeze_times', 'stimFreezeTrigger_times',
'intervals')
"""
metric = np.nan_to_num(data['stimFreeze_times'] - data['stimFreezeTrigger_times'], nan=np.inf)
passed = (metric <= 0.15) & (metric > 0)
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
# === Data integrity checks ===
[docs]
def check_reward_volumes(data, **_):
"""Check that the reward volume is between 1.5 and 3 uL for correct trials, 0 for incorrect.
Metric:
M = reward volume
Criterion:
1.5 <= M <= 3 if correct else M == 0
Units:
uL
:param data: dict of trial data with keys ('rewardVolume', 'correct', 'intervals')
"""
metric = data['rewardVolume']
correct = data['correct']
passed = np.zeros_like(metric, dtype=bool)
# Check correct trials within correct range
passed[correct] = (1.5 <= metric[correct]) & (metric[correct] <= 3.)
# Check incorrect trials are 0
passed[~correct] = metric[~correct] == 0
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_reward_volume_set(data, **_):
"""Check that there is only two reward volumes within a session, one of which is 0.
Metric:
M = set(rewardVolume)
Criterion:
(0 < len(M) <= 2) and 0 in M
:param data: dict of trial data with keys ('rewardVolume')
"""
metric = data['rewardVolume']
passed = 0 < len(set(metric)) <= 2 and 0. in metric
return metric, passed
[docs]
def check_wheel_integrity(data, re_encoding='X1', enc_res=None, **_):
"""
Check wheel position sampled at the expected resolution.
Check that the difference between wheel position samples is close to the encoder resolution
and that the wheel timestamps strictly increase.
Metric:
M = (absolute difference of the positions < 1.5 * encoder resolution)
+ 1 if (difference of timestamps <= 0) else 0
Criterion:
M ~= 0
Units:
arbitrary (radians, sometimes + 1)
:param data: dict of wheel data with keys ('wheel_timestamps', 'wheel_position')
:param re_encoding: the encoding of the wheel data, X1, X2 or X4
:param enc_res: the rotary encoder resolution (default 1024 ticks per revolution)
Notes
-----
- At high velocities some samples are missed due to the scanning frequency of the DAQ.
This checks for more than 1 missing sample in a row (i.e. the difference between samples >= 2)
"""
if isinstance(re_encoding, str):
re_encoding = int(re_encoding[-1])
# The expected difference between samples in the extracted units
resolution = 1 / (enc_res or ephys_fpga.WHEEL_TICKS
) * np.pi * 2 * ephys_fpga.WHEEL_RADIUS_CM / re_encoding
# We expect the difference of neighbouring positions to be close to the resolution
pos_check = np.abs(np.diff(data['wheel_position']))
# Timestamps should be strictly increasing
ts_check = np.diff(data['wheel_timestamps']) <= 0.
metric = pos_check + ts_check.astype(float) # all values should be close to zero
passed = metric < 1.5 * resolution
return metric, passed
# === Pre-stimulus checks ===
[docs]
def check_stimulus_move_before_goCue(data, photodiode=None, **_):
"""
Check there are no stimulus events before the go cue tone.
Check that there are no visual stimulus change(s) between the start of the trial and the
go cue sound onset, except for stim on.
Metric:
M = number of visual stimulus change events between trial start and goCue_times
Criterion:
M == 1
Units:
-none-, integer
Parameters
----------
data : dict
Trial data with keys ('goCue_times', 'intervals', 'choice').
photodiode : dict
The fronts from Bpod's BNC1 input or FPGA frame2ttl channel.
Returns
-------
numpy.array
An array of metric values to threshold.
numpy.array
An array of boolean values, 1 per trial, where True means trial passes QC threshold.
Notes
-----
- There should be exactly 1 stimulus change before goCue; stimulus onset. Even if the stimulus
contrast is 0, the sync square will still flip at stimulus onset, etc.
- If there are no goCue times (all are NaN), the status should be NOT_SET.
"""
if photodiode is None:
_log.warning('No photodiode TTL input in function call, returning None')
return None
photodiode_clean = ephys_fpga._clean_frame2ttl(photodiode)
s = photodiode_clean['times']
s = s[~np.isnan(s)] # Remove NaNs
metric = np.array([])
for i, c in zip(data['intervals'][:, 0], data['goCue_times']):
metric = np.append(metric, np.count_nonzero(s[s > i] < c))
passed = (metric == 1).astype(float)
passed[np.isnan(data['goCue_times'])] = np.nan
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed
[docs]
def check_audio_pre_trial(data, audio=None, **_):
"""
Check no audio stimuli before the go cue.
Check there are no audio outputs between the start of the trial and the go cue sound onset - 20 ms.
Metric:
M = sum(start_times < audio TTL < (goCue_times - 20ms))
Criterion:
M == 0
Units:
-none-, integer
Parameters
----------
data : dict
Trial data with keys ('goCue_times', 'intervals').
audio : dict
The fronts from Bpod's BNC2 input FPGA audio sync channel.
Returns
-------
numpy.array
An array of metric values to threshold.
numpy.array
An array of boolean values, 1 per trial, where True means trial passes QC threshold.
"""
if audio is None:
_log.warning('No BNC2 input in function call, retuning None')
return None, None
s = audio['times'][~np.isnan(audio['times'])] # Audio TTLs with NaNs removed
metric = np.array([], dtype=np.int8)
for i, c in zip(data['intervals'][:, 0], data['goCue_times']):
metric = np.append(metric, sum(s[s > i] < (c - 0.02)))
passed = metric == 0
assert data['intervals'].shape[0] == len(metric) == len(passed)
return metric, passed