Source code for brainbox.task.passive

"""
Functions dealing with passive task
"""
import numpy as np
from iblutil.numerical import bincount2D
from scipy.linalg import svd


[docs] def get_on_off_times_and_positions(rf_map): """ Prepares passive receptive field mapping into format for analysis Parameters ---------- rf_map: output from brainbox.io.one.load_passive_rfmap Returns ------- rf_map_times: time of each receptive field map frame np.array(len(stim_frames) rf_map_pos: unique position of each pixel on screen np.array(len(x_pos), len(y_pos)) rf_stim_frames: for each pixel on screen stores array of stimulus frames where stim onset occurred. For both white squares 'on' and black squares 'off' """ rf_map_times = rf_map['times'] rf_map_frames = rf_map['frames'].astype('float') gray = np.median(rf_map_frames) x_bin = rf_map_frames.shape[1] y_bin = rf_map_frames.shape[2] stim_on_frames = np.zeros((x_bin * y_bin, 1), dtype=np.ndarray) stim_off_frames = np.zeros((x_bin * y_bin, 1), dtype=np.ndarray) rf_map_pos = np.zeros((x_bin * y_bin, 2), dtype=int) i = 0 for x_pos in np.arange(x_bin): for y_pos in np.arange(y_bin): pixel_val = rf_map_frames[:, x_pos, y_pos] - gray pixel_non_grey = np.where(pixel_val != 0)[0] # Find cases where the frame before was gray (i.e when the stim came on) frame_change = np.where(rf_map_frames[pixel_non_grey - 1, x_pos, y_pos] == gray)[0] stim_pos = pixel_non_grey[frame_change] # On stimulus, white squares on_pix = np.where(pixel_val[stim_pos] > 0)[0] stim_on = stim_pos[on_pix] stim_on_frames[i, 0] = stim_on off_pix = np.where(pixel_val[stim_pos] < 0)[0] stim_off = stim_pos[off_pix] stim_off_frames[i, 0] = stim_off rf_map_pos[i, :] = [x_pos, y_pos] i += 1 rf_stim_frames = {} rf_stim_frames['on'] = stim_on_frames rf_stim_frames['off'] = stim_off_frames return rf_map_times, rf_map_pos, rf_stim_frames
[docs] def get_rf_map_over_depth(rf_map_times, rf_map_pos, rf_stim_frames, spike_times, spike_depths, t_bin=0.01, d_bin=80, pre_stim=0.05, post_stim=1.5, y_lim=[0, 3840], x_lim=None): """ Compute receptive field map for each stimulus onset binned across depth Parameters ---------- rf_map_times rf_map_pos rf_stim_frames spike_times: array of spike times spike_depths: array of spike depths along probe t_bin: bin size along time dimension d_bin: bin size along depth dimension pre_stim: time period before rf map stim onset to epoch around post_stim: time period after rf map onset to epoch around y_lim: values to limit to in depth direction x_lim: values to limit in time direction Returns ------- rfmap: receptive field map for 'on' 'off' stimuli. Each rfmap has shape (depths, x_pos, y_pos, epoch_window) depths: depths between which receptive field map has been computed """ binned_array, times, depths = bincount2D(spike_times, spike_depths, t_bin, d_bin, ylim=y_lim, xlim=x_lim) x_bin = len(np.unique(rf_map_pos[:, 0])) y_bin = len(np.unique(rf_map_pos[:, 1])) n_bins = int((pre_stim + post_stim) / t_bin) rf_map = {} for stim_type, stims in rf_stim_frames.items(): _rf_map = np.zeros(shape=(depths.shape[0], x_bin, y_bin, n_bins)) for pos, stim_frame in zip(rf_map_pos, stims): x_pos = pos[0] y_pos = pos[1] # Case where there is no stimulus at this position if len(stim_frame[0]) == 0: _rf_map[:, x_pos, y_pos, :] = np.zeros((depths.shape[0], n_bins)) continue stim_on_times = rf_map_times[stim_frame[0]] stim_intervals = np.c_[stim_on_times - pre_stim, stim_on_times + post_stim] out_intervals = stim_intervals[:, 1] > times[-1] idx_intervals = np.searchsorted(times, stim_intervals)[np.invert(out_intervals)] # Case when no spikes during the passive period if idx_intervals.shape[0] == 0: avg_stim_trials = np.zeros((depths.shape[0], n_bins)) else: stim_trials = np.zeros((depths.shape[0], n_bins, idx_intervals.shape[0])) for i, on in enumerate(idx_intervals): stim_trials[:, :, i] = binned_array[:, on[0]:on[1]] avg_stim_trials = np.mean(stim_trials, axis=2) _rf_map[:, x_pos, y_pos, :] = avg_stim_trials rf_map[stim_type] = _rf_map return rf_map, depths
[docs] def get_svd_map(rf_map): """ Perform SVD on the spatiotemporal rf_map and return the first spatial components Parameters ---------- rf_map Returns ------- rf_svd: First spatial component of rf map for 'on' 'off' stimuli. Each dict has shape (depths, x_pos, y_pos) """ rf_svd = {} for stim_type, stims in rf_map.items(): svd_stim = [] for dep in stims: x_pix, y_pix, n_bins = dep.shape sub_reshaped = np.reshape(dep, (y_pix * x_pix, n_bins)) bsl = np.mean(sub_reshaped[:, 0]) u, s, v = svd(sub_reshaped - bsl) sign = -1 if np.median(v[0, :]) < 0 else 1 rfs = sign * np.reshape(u[:, 0], (y_pix, x_pix)) rfs *= s[0] svd_stim.append(rfs) rf_svd[stim_type] = svd_stim return rf_svd
[docs] def get_stim_aligned_activity(stim_events, spike_times, spike_depths, z_score_flag=True, d_bin=20, t_bin=0.01, pre_stim=0.4, post_stim=1, base_stim=1, y_lim=[0, 3840], x_lim=None): """ Parameters ---------- stim_events: dict of different stim events. Each key contains time of stimulus onset spike_times: array of spike times spike_depths: array of spike depths along probe z_score_flag: whether to return values as z_score of firing rate T_BIN: bin size along time dimension D_BIN: bin size along depth dimension pre_stim: time period before rf map stim onset to epoch around post_stim: time period after rf map onset to epoch around base_stim: time period before rf map stim to use as baseline for z_score correction y_lim: values to limit to in depth direction x_lim: values to limit in time direction Returns ------- stim_activity: stimulus aligned activity for each stimulus type, returned as z_score of firing rate """ binned_array, times, depths = bincount2D(spike_times, spike_depths, t_bin, d_bin, ylim=y_lim, xlim=x_lim) n_bins = int((pre_stim + post_stim) / t_bin) n_bins_base = int(np.ceil((base_stim - pre_stim) / t_bin)) stim_activity = {} for stim_type, stim_times in stim_events.items(): # Get rid of any nan values stim_times = stim_times[~np.isnan(stim_times)] stim_intervals = stim_times - pre_stim base_intervals = stim_times - base_stim out_intervals = stim_intervals > times[-1] idx_stim = np.searchsorted(times, stim_intervals, side='right')[np.invert(out_intervals)] idx_base = np.searchsorted(times, base_intervals, side='right')[np.invert(out_intervals)] idx_stim = np.c_[idx_stim, idx_stim + n_bins] idx_base = np.c_[idx_base, idx_base + n_bins_base] stim_trials = np.zeros((depths.shape[0], n_bins, idx_stim.shape[0])) noise_trials = np.zeros((depths.shape[0], n_bins_base, idx_stim.shape[0])) for i, (st, ba) in enumerate(zip(idx_stim, idx_base)): stim_trials[:, :, i] = binned_array[:, st[0]:st[1]] noise_trials[:, :, i] = binned_array[:, ba[0]:ba[1]] # Average across trials avg_stim_trials = np.mean(stim_trials, axis=2) if z_score_flag: # Average across trials and time avg_base_trials = np.mean(np.mean(noise_trials, axis=2), axis=1)[:, np.newaxis] std_base_trials = np.std(np.mean(noise_trials, axis=2), axis=1)[:, np.newaxis] z_score = (avg_stim_trials - avg_base_trials) / std_base_trials z_score[np.isnan(z_score)] = 0 avg_stim_trials = z_score stim_activity[stim_type] = avg_stim_trials return stim_activity