"""
Functions dealing with passive task
"""
import numpy as np
from iblutil.numerical import bincount2D
from scipy.linalg import svd
[docs]
def get_on_off_times_and_positions(rf_map):
"""
Prepares passive receptive field mapping into format for analysis
Parameters
----------
rf_map: output from brainbox.io.one.load_passive_rfmap
Returns
-------
rf_map_times: time of each receptive field map frame np.array(len(stim_frames)
rf_map_pos: unique position of each pixel on screen np.array(len(x_pos), len(y_pos))
rf_stim_frames: for each pixel on screen stores array of stimulus frames where stim onset
occurred. For both white squares 'on' and black squares 'off'
"""
rf_map_times = rf_map['times']
rf_map_frames = rf_map['frames'].astype('float')
gray = np.median(rf_map_frames)
x_bin = rf_map_frames.shape[1]
y_bin = rf_map_frames.shape[2]
stim_on_frames = np.zeros((x_bin * y_bin, 1), dtype=np.ndarray)
stim_off_frames = np.zeros((x_bin * y_bin, 1), dtype=np.ndarray)
rf_map_pos = np.zeros((x_bin * y_bin, 2), dtype=int)
i = 0
for x_pos in np.arange(x_bin):
for y_pos in np.arange(y_bin):
pixel_val = rf_map_frames[:, x_pos, y_pos] - gray
pixel_non_grey = np.where(pixel_val != 0)[0]
# Find cases where the frame before was gray (i.e when the stim came on)
frame_change = np.where(rf_map_frames[pixel_non_grey - 1, x_pos, y_pos] == gray)[0]
stim_pos = pixel_non_grey[frame_change]
# On stimulus, white squares
on_pix = np.where(pixel_val[stim_pos] > 0)[0]
stim_on = stim_pos[on_pix]
stim_on_frames[i, 0] = stim_on
off_pix = np.where(pixel_val[stim_pos] < 0)[0]
stim_off = stim_pos[off_pix]
stim_off_frames[i, 0] = stim_off
rf_map_pos[i, :] = [x_pos, y_pos]
i += 1
rf_stim_frames = {}
rf_stim_frames['on'] = stim_on_frames
rf_stim_frames['off'] = stim_off_frames
return rf_map_times, rf_map_pos, rf_stim_frames
[docs]
def get_rf_map_over_depth(rf_map_times, rf_map_pos, rf_stim_frames, spike_times, spike_depths,
t_bin=0.01, d_bin=80, pre_stim=0.05, post_stim=1.5, y_lim=[0, 3840],
x_lim=None):
"""
Compute receptive field map for each stimulus onset binned across depth
Parameters
----------
rf_map_times
rf_map_pos
rf_stim_frames
spike_times: array of spike times
spike_depths: array of spike depths along probe
t_bin: bin size along time dimension
d_bin: bin size along depth dimension
pre_stim: time period before rf map stim onset to epoch around
post_stim: time period after rf map onset to epoch around
y_lim: values to limit to in depth direction
x_lim: values to limit in time direction
Returns
-------
rfmap: receptive field map for 'on' 'off' stimuli.
Each rfmap has shape (depths, x_pos, y_pos, epoch_window)
depths: depths between which receptive field map has been computed
"""
binned_array, times, depths = bincount2D(spike_times, spike_depths, t_bin, d_bin,
ylim=y_lim, xlim=x_lim)
x_bin = len(np.unique(rf_map_pos[:, 0]))
y_bin = len(np.unique(rf_map_pos[:, 1]))
n_bins = int((pre_stim + post_stim) / t_bin)
rf_map = {}
for stim_type, stims in rf_stim_frames.items():
_rf_map = np.zeros(shape=(depths.shape[0], x_bin, y_bin, n_bins))
for pos, stim_frame in zip(rf_map_pos, stims):
x_pos = pos[0]
y_pos = pos[1]
# Case where there is no stimulus at this position
if len(stim_frame[0]) == 0:
_rf_map[:, x_pos, y_pos, :] = np.zeros((depths.shape[0], n_bins))
continue
stim_on_times = rf_map_times[stim_frame[0]]
stim_intervals = np.c_[stim_on_times - pre_stim, stim_on_times + post_stim]
out_intervals = stim_intervals[:, 1] > times[-1]
idx_intervals = np.searchsorted(times, stim_intervals)[np.invert(out_intervals)]
# Case when no spikes during the passive period
if idx_intervals.shape[0] == 0:
avg_stim_trials = np.zeros((depths.shape[0], n_bins))
else:
stim_trials = np.zeros((depths.shape[0], n_bins, idx_intervals.shape[0]))
for i, on in enumerate(idx_intervals):
stim_trials[:, :, i] = binned_array[:, on[0]:on[1]]
avg_stim_trials = np.mean(stim_trials, axis=2)
_rf_map[:, x_pos, y_pos, :] = avg_stim_trials
rf_map[stim_type] = _rf_map
return rf_map, depths
[docs]
def get_svd_map(rf_map):
"""
Perform SVD on the spatiotemporal rf_map and return the first spatial components
Parameters
----------
rf_map
Returns
-------
rf_svd: First spatial component of rf map for 'on' 'off' stimuli.
Each dict has shape (depths, x_pos, y_pos)
"""
rf_svd = {}
for stim_type, stims in rf_map.items():
svd_stim = []
for dep in stims:
x_pix, y_pix, n_bins = dep.shape
sub_reshaped = np.reshape(dep, (y_pix * x_pix, n_bins))
bsl = np.mean(sub_reshaped[:, 0])
u, s, v = svd(sub_reshaped - bsl)
sign = -1 if np.median(v[0, :]) < 0 else 1
rfs = sign * np.reshape(u[:, 0], (y_pix, x_pix))
rfs *= s[0]
svd_stim.append(rfs)
rf_svd[stim_type] = svd_stim
return rf_svd
[docs]
def get_stim_aligned_activity(stim_events, spike_times, spike_depths, z_score_flag=True, d_bin=20,
t_bin=0.01, pre_stim=0.4, post_stim=1, base_stim=1,
y_lim=[0, 3840], x_lim=None):
"""
Parameters
----------
stim_events: dict of different stim events. Each key contains time of stimulus onset
spike_times: array of spike times
spike_depths: array of spike depths along probe
z_score_flag: whether to return values as z_score of firing rate
T_BIN: bin size along time dimension
D_BIN: bin size along depth dimension
pre_stim: time period before rf map stim onset to epoch around
post_stim: time period after rf map onset to epoch around
base_stim: time period before rf map stim to use as baseline for z_score correction
y_lim: values to limit to in depth direction
x_lim: values to limit in time direction
Returns
-------
stim_activity: stimulus aligned activity for each stimulus type, returned as z_score of firing
rate
"""
binned_array, times, depths = bincount2D(spike_times, spike_depths, t_bin, d_bin,
ylim=y_lim, xlim=x_lim)
n_bins = int((pre_stim + post_stim) / t_bin)
n_bins_base = int(np.ceil((base_stim - pre_stim) / t_bin))
stim_activity = {}
for stim_type, stim_times in stim_events.items():
# Get rid of any nan values
stim_times = stim_times[~np.isnan(stim_times)]
stim_intervals = stim_times - pre_stim
base_intervals = stim_times - base_stim
out_intervals = stim_intervals > times[-1]
idx_stim = np.searchsorted(times, stim_intervals, side='right')[np.invert(out_intervals)]
idx_base = np.searchsorted(times, base_intervals, side='right')[np.invert(out_intervals)]
idx_stim = np.c_[idx_stim, idx_stim + n_bins]
idx_base = np.c_[idx_base, idx_base + n_bins_base]
stim_trials = np.zeros((depths.shape[0], n_bins, idx_stim.shape[0]))
noise_trials = np.zeros((depths.shape[0], n_bins_base, idx_stim.shape[0]))
for i, (st, ba) in enumerate(zip(idx_stim, idx_base)):
stim_trials[:, :, i] = binned_array[:, st[0]:st[1]]
noise_trials[:, :, i] = binned_array[:, ba[0]:ba[1]]
# Average across trials
avg_stim_trials = np.mean(stim_trials, axis=2)
if z_score_flag:
# Average across trials and time
avg_base_trials = np.mean(np.mean(noise_trials, axis=2), axis=1)[:, np.newaxis]
std_base_trials = np.std(np.mean(noise_trials, axis=2), axis=1)[:, np.newaxis]
z_score = (avg_stim_trials - avg_base_trials) / std_base_trials
z_score[np.isnan(z_score)] = 0
avg_stim_trials = z_score
stim_activity[stim_type] = avg_stim_trials
return stim_activity