import logging
import numpy as np
from itertools import accumulate
from packaging import version
from one.alf.io import AlfBunch
import ibllib.io.raw_data_loaders as raw
from ibllib.io.extractors.base import BaseBpodTrialsExtractor, run_extractor_classes
from ibllib.io.extractors.training_wheel import Wheel
_logger = logging.getLogger(__name__)
__all__ = ['TrainingTrials', 'extract_all']
class FeedbackType(BaseBpodTrialsExtractor):
"""
Get the feedback that was delivered to subject.
**Optional:** saves _ibl_trials.feedbackType.npy
Checks in raw datafile for error and reward state.
Will raise an error if more than one of the mutually exclusive states have
been triggered.
Sets feedbackType to -1 if error state was triggered (applies to no-go)
Sets feedbackType to +1 if reward state was triggered
"""
save_names = '_ibl_trials.feedbackType.npy'
var_names = 'feedbackType'
def _extract(self):
feedbackType = np.zeros(len(self.bpod_trials), np.int64)
for i, t in enumerate(self.bpod_trials):
state_names = ['correct', 'error', 'no_go', 'omit_correct', 'omit_error', 'omit_no_go']
outcome = {sn: ~np.isnan(t['behavior_data']['States timestamps'].get(sn, [[np.NaN]])[0][0]) for sn in state_names}
assert np.sum(list(outcome.values())) == 1
outcome = next(k for k in outcome if outcome[k])
if outcome == 'correct':
feedbackType[i] = 1
elif outcome in ['error', 'no_go']:
feedbackType[i] = -1
return feedbackType
class ContrastLR(BaseBpodTrialsExtractor):
"""
Get left and right contrasts from raw datafile. Optionally, saves
_ibl_trials.contrastLeft.npy and _ibl_trials.contrastRight.npy to alf folder.
Uses signed_contrast to create left and right contrast vectors.
"""
save_names = ('_ibl_trials.contrastLeft.npy', '_ibl_trials.contrastRight.npy')
var_names = ('contrastLeft', 'contrastRight')
def _extract(self):
# iblrigv8 has only flat values in the trial table so we can switch to parquet table when times come
# and all the clutter here would fit in ~30 lines
if isinstance(self.bpod_trials[0]['contrast'], float):
contrastLeft = np.array([t['contrast'] if np.sign(
t['position']) < 0 else np.nan for t in self.bpod_trials])
contrastRight = np.array([t['contrast'] if np.sign(
t['position']) > 0 else np.nan for t in self.bpod_trials])
else:
contrastLeft = np.array([t['contrast']['value'] if np.sign(
t['position']) < 0 else np.nan for t in self.bpod_trials])
contrastRight = np.array([t['contrast']['value'] if np.sign(
t['position']) > 0 else np.nan for t in self.bpod_trials])
return contrastLeft, contrastRight
class ProbabilityLeft(BaseBpodTrialsExtractor):
save_names = '_ibl_trials.probabilityLeft.npy'
var_names = 'probabilityLeft'
def _extract(self, **kwargs):
return np.array([t['stim_probability_left'] for t in self.bpod_trials])
class Choice(BaseBpodTrialsExtractor):
"""
Get the subject's choice in every trial.
**Optional:** saves _ibl_trials.choice.npy to alf folder.
Uses signed_contrast and trial_correct.
-1 is a CCW turn (towards the left)
+1 is a CW turn (towards the right)
0 is a no_go trial
If a trial is correct the choice of the animal was the inverse of the sign
of the position.
>>> choice[t] = -np.sign(position[t]) if trial_correct[t]
"""
save_names = '_ibl_trials.choice.npy'
var_names = 'choice'
def _extract(self):
sitm_side = np.array([np.sign(t['position']) for t in self.bpod_trials])
trial_correct = np.array([t['trial_correct'] for t in self.bpod_trials])
trial_nogo = np.array(
[~np.isnan(t['behavior_data']['States timestamps']['no_go'][0][0])
for t in self.bpod_trials])
choice = sitm_side.copy()
choice[trial_correct] = -choice[trial_correct]
choice[trial_nogo] = 0
choice = choice.astype(int)
return choice
class RepNum(BaseBpodTrialsExtractor):
"""
Count the consecutive repeated trials.
**Optional:** saves _ibl_trials.repNum.npy to alf folder.
Creates trial_repeated from trial['contrast']['type'] == 'RepeatContrast'
>>> trial_repeated = [0, 1, 1, 0, 1, 0, 1, 1, 1, 0]
>>> repNum = [0, 1, 2, 0, 1, 0, 1, 2, 3, 0]
"""
save_names = '_ibl_trials.repNum.npy'
var_names = 'repNum'
def _extract(self):
def get_trial_repeat(trial):
if 'debias_trial' in trial:
return trial['debias_trial']
elif 'contrast' in trial and isinstance(trial['contrast'], dict):
return trial['contrast']['type'] == 'RepeatContrast'
else:
# For advanced choice world before version 8.19.0 there was no 'debias_trial' field
# and no debiasing protocol applied, so simply return False
assert self.settings['PYBPOD_PROTOCOL'].startswith('_iblrig_tasks_advancedChoiceWorld')
return False
trial_repeated = np.fromiter(map(get_trial_repeat, self.bpod_trials), int)
repNum = np.fromiter(accumulate(trial_repeated, lambda x, y: x + y if y else 0), int)
return repNum
class RewardVolume(BaseBpodTrialsExtractor):
"""
Load reward volume delivered for each trial.
**Optional:** saves _ibl_trials.rewardVolume.npy
Uses reward_current to accumulate the amount of
"""
save_names = '_ibl_trials.rewardVolume.npy'
var_names = 'rewardVolume'
def _extract(self):
trial_volume = [x['reward_amount']
if x['trial_correct'] else 0 for x in self.bpod_trials]
reward_volume = np.array(trial_volume).astype(np.float64)
assert len(reward_volume) == len(self.bpod_trials)
return reward_volume
class FeedbackTimes(BaseBpodTrialsExtractor):
"""
Get the times the water or error tone was delivered to the animal.
**Optional:** saves _ibl_trials.feedback_times.npy
Gets reward and error state init times vectors,
checks if the intersection of nans is empty, then
merges the 2 vectors.
"""
save_names = '_ibl_trials.feedback_times.npy'
var_names = 'feedback_times'
@staticmethod
def get_feedback_times_lt5(session_path, task_collection='raw_behavior_data', data=False):
if not data:
data = raw.load_data(session_path, task_collection=task_collection)
rw_times = [tr['behavior_data']['States timestamps']['reward'][0][0]
for tr in data]
err_times = [tr['behavior_data']['States timestamps']['error'][0][0]
for tr in data]
nogo_times = [tr['behavior_data']['States timestamps']['no_go'][0][0]
for tr in data]
assert sum(np.isnan(rw_times) &
np.isnan(err_times) & np.isnan(nogo_times)) == 0
merge = np.array([np.array(times)[~np.isnan(times)] for times in
zip(rw_times, err_times, nogo_times)]).squeeze()
return np.array(merge)
@staticmethod
def get_feedback_times_ge5(session_path, task_collection='raw_behavior_data', data=False):
# ger err and no go trig times -- look for BNC2High of trial -- verify
# only 2 onset times go tone and noise, select 2nd/-1 OR select the one
# that is grater than the nogo or err trial onset time
if not data:
data = raw.load_data(session_path, task_collection=task_collection)
missed_bnc2 = 0
rw_times, err_sound_times, merge = [np.zeros([len(data), ]) for _ in range(3)]
for ind, tr in enumerate(data):
st = tr['behavior_data']['Events timestamps'].get('BNC2High', None)
if not st:
st = np.array([np.nan, np.nan])
missed_bnc2 += 1
# xonar soundcard duplicates events, remove consecutive events too close together
st = np.delete(st, np.where(np.diff(st) < 0.020)[0] + 1)
rw_times[ind] = tr['behavior_data']['States timestamps']['reward'][0][0]
# get the error sound only if the reward is nan
err_sound_times[ind] = st[-1] if st.size >= 2 and np.isnan(rw_times[ind]) else np.nan
if missed_bnc2 == len(data):
_logger.warning('No BNC2 for feedback times, filling error trials NaNs')
merge *= np.nan
merge[~np.isnan(rw_times)] = rw_times[~np.isnan(rw_times)]
merge[~np.isnan(err_sound_times)] = err_sound_times[~np.isnan(err_sound_times)]
return merge
def _extract(self):
# Version check
if version.parse(self.settings['IBLRIG_VERSION'] or '100.0.0') >= version.parse('5.0.0'):
merge = self.get_feedback_times_ge5(self.session_path, task_collection=self.task_collection, data=self.bpod_trials)
else:
merge = self.get_feedback_times_lt5(self.session_path, task_collection=self.task_collection, data=self.bpod_trials)
return np.array(merge)
class Intervals(BaseBpodTrialsExtractor):
"""
Trial start to trial end. Trial end includes 1 or 2 seconds after feedback,
(depending on the feedback) and 0.5 seconds of iti.
**Optional:** saves _ibl_trials.intervals.npy
Uses the corrected Trial start and Trial end timestamp values form PyBpod.
"""
save_names = '_ibl_trials.intervals.npy'
var_names = 'intervals'
def _extract(self):
starts = [t['behavior_data']['Trial start timestamp'] for t in self.bpod_trials]
ends = [t['behavior_data']['Trial end timestamp'] for t in self.bpod_trials]
return np.array([starts, ends]).T
class ResponseTimes(BaseBpodTrialsExtractor):
"""
Time (in absolute seconds from session start) when a response was recorded.
**Optional:** saves _ibl_trials.response_times.npy
Uses the timestamp of the end of the closed_loop state.
"""
save_names = '_ibl_trials.response_times.npy'
var_names = 'response_times'
def _extract(self):
rt = np.array([tr['behavior_data']['States timestamps']['closed_loop'][0][1]
for tr in self.bpod_trials])
return rt
class ItiDuration(BaseBpodTrialsExtractor):
"""
Calculate duration of iti from state timestamps.
**Optional:** saves _ibl_trials.iti_duration.npy
Uses Trial end timestamp and get_response_times to calculate iti.
"""
save_names = '_ibl_trials.itiDuration.npy'
var_names = 'iti_dur'
def _extract(self):
rt, _ = ResponseTimes(self.session_path).extract(
save=False, task_collection=self.task_collection, bpod_trials=self.bpod_trials, settings=self.settings)
ends = np.array([t['behavior_data']['Trial end timestamp'] for t in self.bpod_trials])
iti_dur = ends - rt
return iti_dur
class GoCueTriggerTimes(BaseBpodTrialsExtractor):
"""
Get trigger times of goCue from state machine.
Current software solution for triggering sounds uses PyBpod soft codes.
Delays can be in the order of 10's of ms. This is the time when the command
to play the sound was executed. To measure accurate time, either getting the
sound onset from xonar soundcard sync pulse (latencies may vary).
"""
save_names = '_ibl_trials.goCueTrigger_times.npy'
var_names = 'goCueTrigger_times'
def _extract(self):
if version.parse(self.settings['IBLRIG_VERSION'] or '100.0.0') >= version.parse('5.0.0'):
goCue = np.array([tr['behavior_data']['States timestamps']
['play_tone'][0][0] for tr in self.bpod_trials])
else:
goCue = np.array([tr['behavior_data']['States timestamps']
['closed_loop'][0][0] for tr in self.bpod_trials])
return goCue
class TrialType(BaseBpodTrialsExtractor):
save_names = '_ibl_trials.type.npy'
var_name = 'trial_type'
def _extract(self):
trial_type = []
for tr in self.bpod_trials:
if ~np.isnan(tr["behavior_data"]["States timestamps"]["reward"][0][0]):
trial_type.append(1)
elif ~np.isnan(tr["behavior_data"]["States timestamps"]["error"][0][0]):
trial_type.append(-1)
elif ~np.isnan(tr["behavior_data"]["States timestamps"]["no_go"][0][0]):
trial_type.append(0)
else:
_logger.warning("Trial is not in set {-1, 0, 1}, appending NaN to trialType")
trial_type.append(np.nan)
return np.array(trial_type)
class GoCueTimes(BaseBpodTrialsExtractor):
"""
Get trigger times of goCue from state machine.
Current software solution for triggering sounds uses PyBpod soft codes.
Delays can be in the order of 10-100s of ms. This is the time when the command
to play the sound was executed. To measure accurate time, either getting the
sound onset from the future microphone OR the new xonar soundcard and
setup developed by Sanworks guarantees a set latency (in testing).
"""
save_names = '_ibl_trials.goCue_times.npy'
var_names = 'goCue_times'
def _extract(self):
go_cue_times = np.zeros([len(self.bpod_trials), ])
for ind, tr in enumerate(self.bpod_trials):
if raw.get_port_events(tr, 'BNC2'):
bnchigh = tr['behavior_data']['Events timestamps'].get('BNC2High', None)
if bnchigh:
go_cue_times[ind] = bnchigh[0]
continue
bnclow = tr['behavior_data']['Events timestamps'].get('BNC2Low', None)
if bnclow:
go_cue_times[ind] = bnclow[0] - 0.1
continue
go_cue_times[ind] = np.nan
else:
go_cue_times[ind] = np.nan
nmissing = np.sum(np.isnan(go_cue_times))
# Check if all stim_syncs have failed to be detected
if np.all(np.isnan(go_cue_times)):
_logger.warning(
f'{self.session_path}: Missing ALL !! BNC2 TTLs ({nmissing} trials)')
# Check if any stim_sync has failed be detected for every trial
elif np.any(np.isnan(go_cue_times)):
_logger.warning(f'{self.session_path}: Missing BNC2 TTLs on {nmissing} trials')
return go_cue_times
class IncludedTrials(BaseBpodTrialsExtractor):
save_names = '_ibl_trials.included.npy'
var_names = 'included'
def _extract(self):
if version.parse(self.settings['IBLRIG_VERSION'] or '100.0.0') >= version.parse('5.0.0'):
trials_included = self.get_included_trials_ge5(
data=self.bpod_trials, settings=self.settings)
else:
trials_included = self.get_included_trials_lt5(data=self.bpod_trials)
return trials_included
@staticmethod
def get_included_trials_lt5(data=False):
trials_included = np.ones(len(data), dtype=bool)
return trials_included
@staticmethod
def get_included_trials_ge5(data=False, settings=False):
trials_included = np.array([True for t in data])
if ('SUBJECT_DISENGAGED_TRIGGERED' in settings.keys() and settings[
'SUBJECT_DISENGAGED_TRIGGERED'] is not False):
idx = settings['SUBJECT_DISENGAGED_TRIALNUM'] - 1
trials_included[idx:] = False
return trials_included
class ItiInTimes(BaseBpodTrialsExtractor):
var_names = 'itiIn_times'
def _extract(self):
if version.parse(self.settings["IBLRIG_VERSION"] or '100.0.0') < version.parse("5.0.0"):
iti_in = np.ones(len(self.bpod_trials)) * np.nan
else:
iti_in = np.array(
[tr["behavior_data"]["States timestamps"]
["exit_state"][0][0] for tr in self.bpod_trials]
)
return iti_in
class ErrorCueTriggerTimes(BaseBpodTrialsExtractor):
var_names = 'errorCueTrigger_times'
def _extract(self):
errorCueTrigger_times = np.zeros(len(self.bpod_trials)) * np.nan
for i, tr in enumerate(self.bpod_trials):
nogo = tr["behavior_data"]["States timestamps"]["no_go"][0][0]
error = tr["behavior_data"]["States timestamps"]["error"][0][0]
if np.all(~np.isnan(nogo)):
errorCueTrigger_times[i] = nogo
elif np.all(~np.isnan(error)):
errorCueTrigger_times[i] = error
return errorCueTrigger_times
class StimFreezeTriggerTimes(BaseBpodTrialsExtractor):
var_names = 'stimFreezeTrigger_times'
def _extract(self):
if version.parse(self.settings["IBLRIG_VERSION"] or '100.0.0') < version.parse("6.2.5"):
return np.ones(len(self.bpod_trials)) * np.nan
freeze_reward = np.array(
[
True
if np.all(~np.isnan(tr["behavior_data"]["States timestamps"]["freeze_reward"][0]))
else False
for tr in self.bpod_trials
]
)
freeze_error = np.array(
[
True
if np.all(~np.isnan(tr["behavior_data"]["States timestamps"]["freeze_error"][0]))
else False
for tr in self.bpod_trials
]
)
no_go = np.array(
[
True
if np.all(~np.isnan(tr["behavior_data"]["States timestamps"]["no_go"][0]))
else False
for tr in self.bpod_trials
]
)
assert (np.sum(freeze_error) + np.sum(freeze_reward) +
np.sum(no_go) == len(self.bpod_trials))
stimFreezeTrigger = np.array([])
for r, e, n, tr in zip(freeze_reward, freeze_error, no_go, self.bpod_trials):
if n:
stimFreezeTrigger = np.append(stimFreezeTrigger, np.nan)
continue
state = "freeze_reward" if r else "freeze_error"
stimFreezeTrigger = np.append(
stimFreezeTrigger, tr["behavior_data"]["States timestamps"][state][0][0]
)
return stimFreezeTrigger
class StimOffTriggerTimes(BaseBpodTrialsExtractor):
var_names = 'stimOffTrigger_times'
def _extract(self):
if version.parse(self.settings["IBLRIG_VERSION"] or '100.0.0') >= version.parse("6.2.5"):
stim_off_trigger_state = "hide_stim"
elif version.parse(self.settings["IBLRIG_VERSION"]) >= version.parse("5.0.0"):
stim_off_trigger_state = "exit_state"
else:
stim_off_trigger_state = "trial_start"
stimOffTrigger_times = np.array(
[tr["behavior_data"]["States timestamps"][stim_off_trigger_state][0][0]
for tr in self.bpod_trials]
)
# If pre version 5.0.0 no specific nogo Off trigger was given, just return trial_starts
if stim_off_trigger_state == "trial_start":
return stimOffTrigger_times
no_goTrigger_times = np.array(
[tr["behavior_data"]["States timestamps"]["no_go"][0][0] for tr in self.bpod_trials]
)
# Stim off trigs are either in their own state or in the no_go state if the
# mouse did not move, if the stim_off_trigger_state always exist
# (exit_state or trial_start)
# no NaNs will happen, NaNs might happen in at last trial if
# session was stopped after response
# if stim_off_trigger_state == "hide_stim":
# assert all(~np.isnan(no_goTrigger_times) == np.isnan(stimOffTrigger_times))
# Patch with the no_go states trig times
stimOffTrigger_times[~np.isnan(no_goTrigger_times)] = no_goTrigger_times[
~np.isnan(no_goTrigger_times)
]
return stimOffTrigger_times
class StimOnTriggerTimes(BaseBpodTrialsExtractor):
save_names = '_ibl_trials.stimOnTrigger_times.npy'
var_names = 'stimOnTrigger_times'
def _extract(self):
# Get the stim_on_state that triggers the onset of the stim
stim_on_state = np.array([tr['behavior_data']['States timestamps']
['stim_on'][0] for tr in self.bpod_trials])
return stim_on_state[:, 0].T
class StimOnTimes_deprecated(BaseBpodTrialsExtractor):
save_names = '_ibl_trials.stimOn_times.npy'
var_names = 'stimOn_times'
def _extract(self):
"""
Find the time of the state machine command to turn on the stim
(state stim_on start or rotary_encoder_event2)
Find the next frame change from the photodiode after that TS.
Screen is not displaying anything until then.
(Frame changes are in BNC1 High and BNC1 Low)
"""
# Version check
_logger.warning("Deprecation Warning: this is an old version of stimOn extraction."
"From version 5., use StimOnOffFreezeTimes")
if version.parse(self.settings['IBLRIG_VERSION'] or '100.0.0') >= version.parse('5.0.0'):
stimOn_times = self.get_stimOn_times_ge5(self.session_path, data=self.bpod_trials,
task_collection=self.task_collection)
else:
stimOn_times = self.get_stimOn_times_lt5(self.session_path, data=self.bpod_trials,
task_collection=self.task_collection)
return np.array(stimOn_times)
@staticmethod
def get_stimOn_times_ge5(session_path, data=False, task_collection='raw_behavior_data'):
"""
Find first and last stim_sync pulse of the trial.
stimOn_times should be the first after the stim_on state.
(Stim updates are in BNC1High and BNC1Low - frame2TTL device)
Check that all trials have frame changes.
Find length of stim_on_state [start, stop].
If either check fails the HW device failed to detect the stim_sync square change
Substitute that trial's missing or incorrect value with a NaN.
return stimOn_times
"""
if not data:
data = raw.load_data(session_path, task_collection=task_collection)
# Get all stim_sync events detected
stim_sync_all = [raw.get_port_events(tr, 'BNC1') for tr in data]
stim_sync_all = [np.array(x) for x in stim_sync_all]
# Get the stim_on_state that triggers the onset of the stim
stim_on_state = np.array([tr['behavior_data']['States timestamps']
['stim_on'][0] for tr in data])
stimOn_times = np.array([])
for sync, on, off in zip(
stim_sync_all, stim_on_state[:, 0], stim_on_state[:, 1]):
pulse = sync[np.where(np.bitwise_and((sync > on), (sync <= off)))]
if pulse.size == 0:
stimOn_times = np.append(stimOn_times, np.nan)
else:
stimOn_times = np.append(stimOn_times, pulse)
nmissing = np.sum(np.isnan(stimOn_times))
# Check if all stim_syncs have failed to be detected
if np.all(np.isnan(stimOn_times)):
_logger.error(f'{session_path}: Missing ALL BNC1 TTLs ({nmissing} trials)')
# Check if any stim_sync has failed be detected for every trial
if np.any(np.isnan(stimOn_times)):
_logger.warning(f'{session_path}: Missing BNC1 TTLs on {nmissing} trials')
return stimOn_times
@staticmethod
def get_stimOn_times_lt5(session_path, data=False, task_collection='raw_behavior_data'):
"""
Find the time of the statemachine command to turn on the stim
(state stim_on start or rotary_encoder_event2)
Find the next frame change from the photodiode after that TS.
Screen is not displaying anything until then.
(Frame changes are in BNC1High and BNC1Low)
"""
if not data:
data = raw.load_data(session_path, task_collection=task_collection)
stim_on = []
bnc_h = []
bnc_l = []
for tr in data:
stim_on.append(tr['behavior_data']['States timestamps']['stim_on'][0][0])
if 'BNC1High' in tr['behavior_data']['Events timestamps'].keys():
bnc_h.append(np.array(tr['behavior_data']
['Events timestamps']['BNC1High']))
else:
bnc_h.append(np.array([np.NINF]))
if 'BNC1Low' in tr['behavior_data']['Events timestamps'].keys():
bnc_l.append(np.array(tr['behavior_data']
['Events timestamps']['BNC1Low']))
else:
bnc_l.append(np.array([np.NINF]))
stim_on = np.array(stim_on)
bnc_h = np.array(bnc_h, dtype=object)
bnc_l = np.array(bnc_l, dtype=object)
count_missing = 0
stimOn_times = np.zeros_like(stim_on)
for i in range(len(stim_on)):
hl = np.sort(np.concatenate([bnc_h[i], bnc_l[i]]))
stot = hl[hl > stim_on[i]]
if np.size(stot) == 0:
stot = np.array([np.nan])
count_missing += 1
stimOn_times[i] = stot[0]
if np.all(np.isnan(stimOn_times)):
_logger.error(f'{session_path}: Missing ALL BNC1 TTLs ({count_missing} trials)')
if count_missing > 0:
_logger.warning(f'{session_path}: Missing BNC1 TTLs on {count_missing} trials')
return np.array(stimOn_times)
class StimOnOffFreezeTimes(BaseBpodTrialsExtractor):
"""
Extracts stim on / off and freeze times from Bpod BNC1 detected fronts.
Each stimulus event is the first detected front of the BNC1 signal after the trigger state, but before the next
trigger state.
"""
save_names = ('_ibl_trials.stimOn_times.npy', None, None)
var_names = ('stimOn_times', 'stimOff_times', 'stimFreeze_times')
def _extract(self):
choice = Choice(self.session_path).extract(
bpod_trials=self.bpod_trials, task_collection=self.task_collection, settings=self.settings, save=False
)[0]
stimOnTrigger = StimOnTriggerTimes(self.session_path).extract(
bpod_trials=self.bpod_trials, task_collection=self.task_collection, settings=self.settings, save=False
)[0]
stimFreezeTrigger = StimFreezeTriggerTimes(self.session_path).extract(
bpod_trials=self.bpod_trials, task_collection=self.task_collection, settings=self.settings, save=False
)[0]
stimOffTrigger = StimOffTriggerTimes(self.session_path).extract(
bpod_trials=self.bpod_trials, task_collection=self.task_collection, settings=self.settings, save=False
)[0]
f2TTL = [raw.get_port_events(tr, name='BNC1') for tr in self.bpod_trials]
assert stimOnTrigger.size == stimFreezeTrigger.size == stimOffTrigger.size == choice.size == len(f2TTL)
assert all(stimOnTrigger < np.nan_to_num(stimFreezeTrigger, nan=np.inf)) and \
all(np.nan_to_num(stimFreezeTrigger, nan=-np.inf) < stimOffTrigger)
stimOn_times = np.array([])
stimOff_times = np.array([])
stimFreeze_times = np.array([])
has_freeze = version.parse(self.settings.get('IBLRIG_VERSION', '0')) >= version.parse('6.2.5')
for tr, on, freeze, off, c in zip(f2TTL, stimOnTrigger, stimFreezeTrigger, stimOffTrigger, choice):
tr = np.array(tr)
# stim on
lim = freeze if has_freeze else off
idx, = np.where(np.logical_and(on < tr, tr < lim))
stimOn_times = np.append(stimOn_times, tr[idx[0]] if idx.size > 0 else np.nan)
# stim off
idx, = np.where(off < tr)
stimOff_times = np.append(stimOff_times, tr[idx[0]] if idx.size > 0 else np.nan)
# stim freeze - take last event before off trigger
if has_freeze:
idx, = np.where(np.logical_and(freeze < tr, tr < off))
stimFreeze_times = np.append(stimFreeze_times, tr[idx[-1]] if idx.size > 0 else np.nan)
else:
idx, = np.where(tr <= off)
stimFreeze_times = np.append(stimFreeze_times, tr[idx[-1]] if idx.size > 0 else np.nan)
# In no_go trials no stimFreeze happens just stim Off
stimFreeze_times[choice == 0] = np.nan
return stimOn_times, stimOff_times, stimFreeze_times
class PhasePosQuiescence(BaseBpodTrialsExtractor):
"""Extract stimulus phase, position and quiescence from Bpod data.
For extraction of pre-generated events, use the ProbaContrasts extractor instead.
"""
save_names = (None, None, '_ibl_trials.quiescencePeriod.npy')
var_names = ('phase', 'position', 'quiescence')
def _extract(self, **kwargs):
phase = np.array([t['stim_phase'] for t in self.bpod_trials])
position = np.array([t['position'] for t in self.bpod_trials])
quiescence = np.array([t['quiescent_period'] for t in self.bpod_trials])
return phase, position, quiescence
class PauseDuration(BaseBpodTrialsExtractor):
"""Extract pause duration from raw trial data."""
save_names = None
var_names = 'pause_duration'
def _extract(self, **kwargs):
# pausing logic added in version 8.9.0
ver = version.parse(self.settings.get('IBLRIG_VERSION') or '0')
default = 0. if ver < version.parse('8.9.0') else np.nan
return np.fromiter((t.get('pause_duration', default) for t in self.bpod_trials), dtype=float)
class TrialsTable(BaseBpodTrialsExtractor):
"""
Extracts the following into a table from Bpod raw data:
intervals, goCue_times, response_times, choice, stimOn_times, contrastLeft, contrastRight,
feedback_times, feedbackType, rewardVolume, probabilityLeft, firstMovement_times
Additionally extracts the following wheel data:
wheel_timestamps, wheel_position, wheel_moves_intervals, wheel_moves_peak_amplitude
"""
save_names = ('_ibl_trials.table.pqt', None, None, '_ibl_wheel.timestamps.npy', '_ibl_wheel.position.npy',
'_ibl_wheelMoves.intervals.npy', '_ibl_wheelMoves.peakAmplitude.npy', None, None)
var_names = ('table', 'stimOff_times', 'stimFreeze_times', 'wheel_timestamps', 'wheel_position', 'wheelMoves_intervals',
'wheelMoves_peakAmplitude', 'peakVelocity_times', 'is_final_movement')
def _extract(self, extractor_classes=None, **kwargs):
base = [Intervals, GoCueTimes, ResponseTimes, Choice, StimOnOffFreezeTimes, ContrastLR, FeedbackTimes, FeedbackType,
RewardVolume, ProbabilityLeft, Wheel]
out, _ = run_extractor_classes(
base, session_path=self.session_path, bpod_trials=self.bpod_trials, settings=self.settings, save=False,
task_collection=self.task_collection)
table = AlfBunch({k: v for k, v in out.items() if k not in self.var_names})
assert len(table.keys()) == 12
return table.to_df(), *(out.pop(x) for x in self.var_names if x != 'table')
[docs]
class TrainingTrials(BaseBpodTrialsExtractor):
save_names = ('_ibl_trials.repNum.npy', '_ibl_trials.goCueTrigger_times.npy', '_ibl_trials.stimOnTrigger_times.npy', None,
None, None, None, '_ibl_trials.table.pqt', None, None, '_ibl_wheel.timestamps.npy', '_ibl_wheel.position.npy',
'_ibl_wheelMoves.intervals.npy', '_ibl_wheelMoves.peakAmplitude.npy', None, None, None, None, None, None)
var_names = ('repNum', 'goCueTrigger_times', 'stimOnTrigger_times', 'itiIn_times', 'stimOffTrigger_times',
'stimFreezeTrigger_times', 'errorCueTrigger_times', 'table', 'stimOff_times', 'stimFreeze_times',
'wheel_timestamps', 'wheel_position', 'wheelMoves_intervals', 'wheelMoves_peakAmplitude',
'peakVelocity_times', 'is_final_movement', 'phase', 'position', 'quiescence', 'pause_duration')
def _extract(self) -> dict:
base = [RepNum, GoCueTriggerTimes, StimOnTriggerTimes, ItiInTimes, StimOffTriggerTimes, StimFreezeTriggerTimes,
ErrorCueTriggerTimes, TrialsTable, PhasePosQuiescence, PauseDuration]
out, _ = run_extractor_classes(
base, session_path=self.session_path, bpod_trials=self.bpod_trials, settings=self.settings, save=False,
task_collection=self.task_collection)
return {k: out[k] for k in self.var_names}