"""Extract stimulus timing info from ttl pulses and metadata."""
import os
import glob
import json
import numpy as np
[docs]def get_session_path(path):
"""
Return local session path given a local path of any file from the session (assumes `Subjects`
directory is present)
Args:
path (str): absolute path of any file in session
e.g. /mnt/data/cortexlab/Subjects/CSHL050/2019-10-29/001/alf/spikes.times.npy
Example usage:
one = ONE()
eid = one.search(subject=subject, date=date, number=number)
files_paths = one.load(eid[0], download_only=True)
session_path = get_session_path(files_paths)
"""
from pathlib import Path
subject_dir = np.where([part == 'Subjects' for part in Path(path).parts])[0]
if len(subject_dir) == 0:
raise FileNotFoundError('Could not find `Subject` directory in %s' % path)
else:
session_path = os.path.join(*Path(path).parts[:subject_dir[0] + 4])
return session_path
[docs]def load_ttl_pulses(session_path):
"""
Extract ttl pulses from sync signals
:param session_path: absolute path of a session, i.e. /mnt/data/Subjects/ZM_1887/2019-07-10/001
:type session_path: str
:return: ttl pulse times
:rtype: np.ndarray
"""
from ibllib.io.extractors.ephys_fpga import get_main_probe_sync
sync, sync_chmap = get_main_probe_sync(session_path, bin_exists=False)
fr2ttl_ch = sync_chmap['frame2ttl']
# find times of when ttl polarity changes on fr2ttl channel
sync_pol_ = sync['polarities'][sync['channels'] == fr2ttl_ch]
sync_times_ = sync['times'][sync['channels'] == fr2ttl_ch]
return sync_pol_, sync_times_
def _get_master_probe_dir(session_path):
"""Find master probe; code from ibllib.io.extractors.ephys_fpga._get_main_probe_sync"""
from ibllib.io.extractors.ephys_fpga import _get_all_probes_sync
from ibllib.io.extractors.ephys_fpga import get_neuropixel_version_from_files
ephys_files = _get_all_probes_sync(session_path)
if not ephys_files:
raise FileNotFoundError(f"No ephys files found in {session_path}")
version = get_neuropixel_version_from_files(ephys_files)
if version == '3A':
# the sync master is the probe with the most sync pulses
sync_box_ind = np.argmax([ef.sync.times.size for ef in ephys_files])
elif version == '3B':
# the sync master is the nidq breakout box
sync_box_ind = np.argmax([1 if ef.get('nidq') else 0 for ef in ephys_files])
probe_dir = ephys_files[sync_box_ind].path
return probe_dir
[docs]def load_rf_mapping_stimulus(session_path, stim_metadata):
"""
extract frames of rf mapping stimulus
:param session_path: absolute path of a session, i.e. /mnt/data/Subjects/ZM_1887/2019-07-10/001
:type session_path: str
:param stim_metadata: dictionary of stimulus/task metadata
:type stim_metadata: dict
:return: stimulus frames
:rtype: np.ndarray of shape (y_pix, x_pix, n_frames)
"""
idx_rfm = get_stim_num_from_name(stim_metadata['VISUAL_STIMULI'], 'receptive_field_mapping')
if idx_rfm is not None:
stim_filename = stim_metadata['VISUAL_STIM_%i' % idx_rfm].get(
'stim_data_file_name', '*RFMapStim.raw*')
stim_file = glob.glob(os.path.join(session_path, 'raw_behavior_data', stim_filename))[0]
frame_array = np.fromfile(stim_file, dtype='uint8')
y_pix, x_pix, _ = stim_metadata['VISUAL_STIM_%i' % idx_rfm]['stim_file_shape']
frames = np.transpose(np.reshape(frame_array, [y_pix, x_pix, -1], order='F'), [2, 1, 0])
else:
frames = np.array([])
return frames
[docs]def get_stim_num_from_name(stim_ids, stim_name):
"""
"VISUAL_STIMULI": {
"0": "SPACER",
"1": "receptive_field_mapping",
"2": "orientation-direction_selectivity",
"3": "contrast_reversal",
"4": "task_stimuli",
"5": "spontaneous_activity"}
:param stim_ids: map from number (as string) to stimulus name
:type stim_ids: dict
:param stim_name: name of stimulus type
:type stim_name: str
:return: the number associated with the stimulus type
:rtype: int
"""
idx = None
for key in stim_ids.keys():
if stim_ids[key].lower() == stim_name.lower():
idx = key
break
return int(idx)
[docs]def get_contrast_reversal_stimulus(stim_metadata):
"""
Extract contrast reversal stimulus info
:param stim_metadata: dictionary of stimulus metadata loaded from task json file
:type stim_metadata: dict
:return: np array of shape (n_stims,)
:rtype: np.ndarray
"""
stim_idx = get_stim_num_from_name(stim_metadata['VISUAL_STIMULI'], 'contrast_reversal')
stim_metadata = stim_metadata['VISUAL_STIM_%i' % stim_idx]
# return actual frames
# patch_contrasts = stim_metadata['stim_patch_contrasts']
# frames = np.stack([patch_contrasts[str(n)] for n in stim_metadata['stim_sequence']])
# return index into actual frames
stim = np.array(stim_metadata['stim_sequence'])
return stim
[docs]def get_task_stimulus(session_path):
"""
Extract contrast selectivity stimulus info
:param session_path: absolute path of a session, i.e. /mnt/data/Subjects/ZM_1887/2019-07-10/001
:type session_path: str
:return: np array of shape (n_stims, 2); col 0 contains stim azimuth, col 1 contains contrast
:rtype: np.ndarray
"""
from zipfile import ZipFile
stim_file_zip = glob.glob(os.path.join(
session_path, 'raw_behavior_data', '_iblrig_codeFiles*.zip'))[0]
zf = ZipFile(stim_file_zip)
# find task stimulus csv file
stim_file = None
for file in zf.namelist():
if file.endswith('stims.csv'):
stim_file = file
break
if stim_file is None:
raise FileNotFoundError('Could not find a stims.csv file in the iblrig code files archive')
with zf.open(stim_file) as f:
stim_array = np.loadtxt(f, delimiter=' ')
return stim_array
[docs]def get_rf_ttl_pulses(ttl_signal):
"""
Find where ttl_signal increases or decreases
:param ttl_signal:
:type ttl_signal: array-like
:return: where signal increases/decreases
:rtype: tuple (np.ndarray, np.ndarray) of ttl (rise, fall) indices
"""
# Convert values to 0, 1, -1 for simplicity
assert len(np.unique(ttl_signal)) == 3
ttl_sig = np.zeros(shape=ttl_signal.shape)
ttl_sig[ttl_signal == np.max(ttl_signal)] = 1
ttl_sig[ttl_signal == np.min(ttl_signal)] = -1
# Find number of passage from 0->-1 and 0->+1
d_ttl_sig = np.concatenate([np.diff(ttl_sig), [0]])
idxs_up = np.where((ttl_sig == 0) & (d_ttl_sig == 1))[0]
idxs_dn = np.where((ttl_sig == 0) & (d_ttl_sig == -1))[0]
return idxs_up, idxs_dn
[docs]def get_expected_ttl_pulses(stim_order, stim_meta, ttl_signal_rf_map):
"""
Get expected number of ttl pulses for each stimulus
:param stim_order: list of stimulus ids throughout protocol
:type stim_order: array-like
:param stim_meta: dictionary containing stim metadata; from _iblrig_taskSettings json
:type stim_meta: dict
:param ttl_signal_rf_map: ttl signal during receptive field mapping with locally sparse noise
:type ttl_signal_rf_map: array-like
:return: list of ttl pulses for each stimulus class
:rtype: list
"""
n_expected_ttl_pulses = np.zeros(shape=(len(stim_order)))
for i, stim_id in enumerate(stim_order):
if stim_meta['VISUAL_STIMULI'][str(stim_id)] == 'receptive_field_mapping':
n_instances = np.sum((np.array(stim_order) == stim_id) * 1)
if n_instances > 1:
raise ValueError('Extractor expects a single rf mapping presentation')
# number of TTL pulses expected in frame2ttl trace for rf mapping
idxs_up, idxs_dn = get_rf_ttl_pulses(ttl_signal_rf_map)
n_expected_ttl_pulses[i] = len(idxs_up) + len(idxs_dn)
else:
key = str('VISUAL_STIM_%i' % stim_id)
if key in stim_meta:
n_expected_ttl_pulses[i] = stim_meta[key]['ttl_num']
else:
# spontaneous activity, no stimulus info in metadata
n_expected_ttl_pulses[i] = 0
return n_expected_ttl_pulses
# def update_ttl_pulses(stim_name, is_pol_beg_ok, is_pol_end_ok, stim_ts, n_expected_ttl_pulses):
# """
#
# :param stim_name: stim protocol name (e.g. 'receptive_field_mapping', 'contrast_reversal')
# :type stim_name: str
# :param is_pol_beg_ok:
# :type is_pol_beg_ok: bool
# :param is_pol_end_ok:
# :type is_pol_end_ok: bool
# :param stim_ts: stimulus presentation times
# :type stim_ts: array-like
# :param n_expected_ttl_pulses:
# :type n_expected_ttl_pulses: int
# :return: (new_stim_ts, new_expected_ttl_pulses)
# :rtype: tuple
# """
#
# import copy
# new_stim_ts = copy.copy(stim_ts)
# new_expected_ttl_pulses = n_expected_ttl_pulses
#
# if not is_pol_beg_ok:
# if stim_name == 'receptive_field_mapping':
# new_stim_ts = stim_ts[1:]
# print('--> 1 TTL pulse removed from beginning')
# elif stim_name == 'orientation-direction_selectivity':
# new_stim_ts = stim_ts[1:]
# print('--> 1 TTL pulse removed from beginning')
# elif stim_name == 'contrast_reversal':
# raise NotImplementedError
# elif stim_name == 'task_stimuli':
# new_stim_ts = stim_ts[1:]
# print('--> 1 TTL pulse removed from beginning')
# else:
# raise ValueError('"%s" is not a valid stimulus protocol' % stim_name)
#
# if not is_pol_end_ok:
# if stim_name == 'receptive_field_mapping':
# new_stim_ts = stim_ts[:-1]
# print('--> 1 TTL pulse removed from end')
# elif stim_name == 'orientation-direction_selectivity':
# new_expected_ttl_pulses -= 1
# print('--> updated correct number of TTL pulses')
# elif stim_name == 'contrast_reversal':
# raise NotImplementedError
# elif stim_name == 'task_stimuli':
# new_stim_ts = stim_ts[:-1]
# print('--> 1 TTL pulse removed from end')
# else:
# raise ValueError('"%s" is not a valid stimulus protocol' % stim_name)
#
# return new_stim_ts, new_expected_ttl_pulses
[docs]def get_spacer_times(spacer_template, jitter, ttl_signal, t_quiet):
"""
:param spacer_template: list of indices where ttl signal changes
:type spacer_template: array-like
:param jitter: jitter (in seconds) for matching ttl_signal with spacer_template
:type jitter: float
:param ttl_signal:
:type ttl_signal: array-like
:param t_quiet: seconds between spacer and next stim
:type t_quiet: float
:return: times of spacer onset/offset
:rtype: n_spacer x 2 np.ndarray; first col onset times, second col offset
"""
diff_spacer_template = np.diff(spacer_template)
# add jitter;
# remove extreme values (shouldn't be a problem with iblrig versions >= 5.2.10)
spacer_model = jitter + diff_spacer_template[2:-2]
# diff ttl signal to compare to spacer_model
dttl = np.diff(ttl_signal)
# remove diffs larger than max diff in model to clean up signal
dttl[dttl > np.max(spacer_model)] = 0
# convolve cleaned diff ttl signal w/ spacer model
conv_dttl = np.correlate(dttl, spacer_model, mode='full')
# find spacer location
thresh = 3.0
idxs_spacer_middle = np.where(
(conv_dttl[1:-2] < thresh) &
(conv_dttl[2:-1] > thresh) &
(conv_dttl[3:] < thresh))[0]
# adjust indices for
# - `np.where` call above
# - length of spacer_model
idxs_spacer_middle += 2 - int((np.floor(len(spacer_model) / 2)))
# pull out spacer times (middle)
ts_spacer_middle = ttl_signal[idxs_spacer_middle]
# put beginning/end of spacer times into an array
spacer_length = np.max(spacer_template)
spacer_times = np.zeros(shape=(ts_spacer_middle.shape[0], 2))
for i, t in enumerate(ts_spacer_middle):
spacer_times[i, 0] = t - (spacer_length / 2) - t_quiet
spacer_times[i, 1] = t + (spacer_length / 2) + t_quiet
return spacer_times, conv_dttl
[docs]def interpolate_rf_mapping_stimulus(ttl_signal, times, frames, t_bin):
"""
Interpolate stimulus presentation times to screen refresh rate to match `frames`
:param ttl_signal:
:type ttl_signal: array-like
:param times: array of stimulus switch times
:type times: array-like
:param frames: (time, y_pix, x_pix) array of stim frames
:type frames: array-like
:param t_bin: screen refresh rate
:type t_bin: float
:return: tuple of (stim_times, stim_frames)
"""
beg_extrap_val = -10001
end_extrap_val = -10000
idxs_up, idxs_dn = get_rf_ttl_pulses(ttl_signal)
X = np.sort(np.concatenate([idxs_up, idxs_dn]))
Xq = np.arange(frames.shape[0])
# make left and right extrapolations distinctive to easily find later
Tq = np.interp(Xq, X, times, left=beg_extrap_val, right=end_extrap_val)
# uniform spacing outside boundaries of ttl signal
# first values
n_beg = len(np.where(Tq == beg_extrap_val)[0])
if 0 < n_beg < Tq.shape[0]:
Tq[:n_beg] = times[0] - np.arange(n_beg, 0, -1) * t_bin
# end values
n_end = len(np.where(Tq == end_extrap_val)[0])
if 0 < n_end < Tq.shape[0]:
Tq[-n_end:] = times[-1] + np.arange(1, n_end + 1) * t_bin
return Tq, frames
[docs]def export_to_alf(session_path, stim_ts, stim_datas, stim_names):
"""
Export extracted stimuli and their presentation times to the session alf directory.
:param session_path: absolute path of a session, i.e. /mnt/data/Subjects/ZM_1887/2019-07-10/001
:param stim_ts:
:param stim_datas:
:param stim_names:
:return: None; instead, saves the following files into `session_path/alf`; the 'xx' part of the
following filenames will be '00', '01', etc., one for each time the specific protocol is
run
orientation/direction selectivity
`_iblcertif_.odsgratings.times.xx.npy`: shape (n_stims, 2); columns are
(stim on time, stim off time)
`_iblcertif_.odsgratings.stims.xx.npy`: shape (n_stims,); value is grating angle
contrast reversal
`_iblcertif_.reversal.times.xx.npy`: shape (n_stims,); stim presentation times
`_iblcertif_.reversal.stims.xx.npy`: shape (n_stims,); stim identity - these values map
into the matrices defined in `_iblrig_taskSettings.raw.json` files, 'VISUAL_STIM_3' ->
'stim_patch_contrasts'
receptive field mapping (sparse noise)
`_iblcertif_.rfmap.times.xx.npy`: shape (n_stims,); stim presentation times
`_iblcertif_.rfmap.stims.xx.npy`: shape (n_stims, y_pix, x_pix); sparse noise stim frames
spontaneous activity:
`_iblcertif_.spontaneous.times.xx.npy`: shape (2,); start and end times of spont activity
task stimuli (gratings of varying locations/contrast)
`_iblcertif_.task.times.xx.npy`: shape (n_stims, 2); columns are
(stim on time, stim off time)
`_iblcertif_.task.stims.xx.npy`: shape (n_stims, 2); columns are
(azimuth in degrees [i.e. left or right], contrast)
"""
counters = {stim_name: 0 for stim_name in np.unique(stim_names)}
for stim_t, stim_data, stim_name in zip(stim_ts, stim_datas, stim_names):
if stim_name == 'spacer':
continue
if stim_name == 'receptive_field_mapping':
stim_short = 'rfmap'
elif stim_name == 'orientation-direction_selectivity':
stim_short = 'odsgratings'
elif stim_name == 'contrast_reversal':
stim_short = 'reversal'
elif stim_name == 'task_stimuli':
stim_short = 'task'
elif stim_name == 'spontaneous_activity':
stim_short = 'spontaneous'
else:
raise ValueError('"%s" is an unknown stimulus protocol' % stim_name)
filename_times = str('_iblcertif_.%s.times.%02i.npy' % (stim_short, counters[stim_name]))
filename_stims = str('_iblcertif_.%s.stims.%02i.npy' % (stim_short, counters[stim_name]))
counters[stim_name] += 1
if not os.path.exists(os.path.join(session_path, 'alf')):
os.mkdir(os.path.join(session_path, 'alf'))
shape_t = stim_t.shape[0]
shape_d = stim_data.shape[0]
if shape_t != 0 and shape_d != 0:
if stim_name == 'contrast_reversal':
# no known bug fix for contrast reversal bonsai bugs; don't save out if problem
if shape_t != shape_d:
continue
else:
assert shape_t == shape_d
if shape_t != 0:
np.save(os.path.join(session_path, 'alf', filename_times), stim_t)
if shape_d != 0:
np.save(os.path.join(session_path, 'alf', filename_stims), stim_data)
# alert cronjobs that there are new files to register to the database by saving empty file
open(os.path.join(session_path, 'register_me.flag'), 'a').close()
if __name__ == '__main__':
# example usage
from oneibl.one import ONE
one = ONE()
eid = one.search(subject='ZM_2104', date='2019-09-19', number=1)
dtypes = [
'ephysData.raw.meta',
'_spikeglx_sync.channels',
'_spikeglx_sync.polarities',
'_spikeglx_sync.times',
'_iblrig_RFMapStim.raw',
'_iblrig_codeFiles.raw',
'_iblrig_taskSettings.raw'
]
files_paths = one.load(eid[0], dataset_types=dtypes, clobber=False, download_only=True)
session_path = get_session_path(files_paths[0])
extract_stimulus_info_to_alf(session_path, save=True)