Source code for brainbox.modeling.neural_model


"""
GLM fitting utilities based on NeuroGLM by Il Memming Park, Jonathan Pillow:

https://github.com/pillowlab/neuroGLM

Berk Gercek
International Brain Lab, 2020
"""
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score
from scipy.special import xlogy
from brainbox.processing import bincount2D
from .utils import neglog


[docs]class NeuralModel: """ Parent class for multiple types of neural models. Contains core methods for extracting trial-relevant spike times and binning spikes, as well as making sure that the design matrix being used makes sense. """ def __init__(self, design_matrix, spk_times, spk_clu, binwidth=0.02, mintrials=100, stepwise=False): """ Construct GLM object using information about all trials, and the relevant spike times. Only ingests data, and further object methods must be called to describe kernels, gain terms, etc. as components of the model. Parameters ---------- design_matrix: brainbox.modeling.design_matrix.DesignMatrix Design matrix object which has already been compiled for use with neural data. spk_times: numpy.array of floats 1-D array of times at which spiking events were detected, in seconds. spk_clu: numpy.array of integers 1-D array of same shape as spk_times, with integer cluster IDs identifying which cluster a spike time belonged to. binwidth : float Size of bins to put spikes in to, in seconds. mintrials: int Minimum number of trials in which neurons fired a spike in order to be fit. Defaults to 100 trials. stepwise: bool Whether or not to perform stepwise regression, in which the model is built iteratively from only the mean rate, up. This allows comparison of D^2 scores for sub-models which incorporate only some parameters, to see which regressors actually improve explainability. Defaults to False. """ # Data checks # if not len(spk_times) == len(spk_clu): raise IndexError("Spike times and cluster IDs are not same length") if not design_matrix.compiled: raise AttributeError('Design matrix object must be compiled before passing to fit') # Filter out cells which don't meet the criteria for minimum spiking, while doing trial # assignment base_df = design_matrix.base_df clu_ids = np.unique(spk_clu).flatten() trbounds = base_df[['trial_start', 'trial_end']] # Get the start/end of trials # Initialize a Cells x Trials bool array to easily see how many trials a clu spiked trialspiking = np.zeros((base_df.index.max() + 1, clu_ids.max() + 1), dtype=bool) # Empty trial duration value to use later # Iterate through each trial, and store the relevant spikes for that trial into a dict # Along with the cluster labels. This makes binning spikes and accessing spikes easier. spks = {} clu = {} st_endlast = 0 for i, (start, end) in trbounds.iterrows(): st_startind = np.searchsorted(spk_times[st_endlast:], start) + st_endlast st_endind = np.searchsorted(spk_times[st_endlast:], end, side='right') + st_endlast st_endlast = st_endind trial_clu = np.unique(spk_clu[st_startind:st_endind]) trialspiking[i, trial_clu] = True spks[i] = spk_times[st_startind:st_endind] - start clu[i] = spk_clu[st_startind:st_endind] # Set model parameters to begin with self.design = design_matrix self.spikes = spks self.clu = clu self.clu_ids = np.argwhere(np.sum(trialspiking, axis=0) > mintrials).flatten() self.stepwise = stepwise self.binwidth = binwidth if len(self.clu_ids) == 0: raise UserWarning('No neuron fired a spike in a minimum number.') # Bin spikes spkarrs, arrdiffs = [], [] for i in self.design.trialsdf.index: duration = self.design.trialsdf.loc[i, 'duration'] durmod = duration % self.binwidth if durmod > (self.binwidth / 2): duration = duration - (self.binwidth / 2) if len(self.spikes[i]) == 0: arr = np.zeros((self.binf(duration), len(self.clu_ids))) spkarrs.append(arr) continue spks = self.spikes[i] clu = self.clu[i] arr = bincount2D(spks, clu, xbin=self.binwidth, ybin=self.clu_ids, xlim=[0, duration])[0] arrdiffs.append(arr.shape[1] - self.binf(duration)) spkarrs.append(arr.T) y = np.vstack(spkarrs) if hasattr(self.design, 'dm'): assert y.shape[0] == self.design.dm.shape[0], "Oh shit. Indexing error." self.binnedspikes = y
[docs] def combine_weights(self): """ Combined fit coefficients and intercepts to produce kernels where appropriate, which describe activity. Returns ------- pandas.DataFrame DataFrame in which each row is the fit weights for a given spiking unit. Columns are individual covariates added during the construction process. Indices are the cluster IDs for each of the cells that were fit (NOT a simple range(start, stop) index.) """ outputs = {} for var in self.design.covar.keys(): if self.design.covar[var]['bases'] is None: wind = self.design.covar[var]['dmcol_idx'] outputs[var] = self.coefs.apply(lambda w: w[wind]) continue winds = self.design.covar[var]['dmcol_idx'] bases = self.design.covar[var]['bases'] weights = self.coefs.apply(lambda w: np.sum(w[winds] * bases, axis=1)) offset = self.design.covar[var]['offset'] tlen = bases.shape[0] * self.binwidth tstamps = np.linspace(0 + offset, tlen + offset, bases.shape[0]) outputs[var] = pd.DataFrame(weights.values.tolist(), index=weights.index, columns=tstamps) self.combined_weights = outputs return outputs
def _scorer(self, wt, bias, dm, y): """ Score a single target y """ pred = self.link(dm @ wt + bias).flatten() if self.metric == 'dsq': null_pred = np.ones_like(pred) * np.mean(y) null_deviance = 2 * np.sum(xlogy(y, y / null_pred.flat) - y + null_pred.flat) with np.errstate(invalid='ignore', divide='ignore'): full_deviance = 2 * np.sum(xlogy(y, y / pred.flat) - y + pred.flat) return 1 - (full_deviance / null_deviance) elif self.metric == 'msespike': residuals = (y - pred) ** 2 return residuals.sum() / y.sum() elif self.metric == 'rsq': return r2_score(y, pred) elif self.metric == 'nllspike': biasdm = np.pad(dm, ((0, 0), (1, 0)), constant_values=1) return -neglog(np.vstack((bias, wt)).flatten(), biasdm, y) / np.sum(y) else: raise AttributeError('No valid metric exists in the instance for use by _scorer()')
[docs] def fit(self, train_idx=None, printcond=True): """ Fit the current set of binned spikes as a function of the current design matrix. Requires NeuralGLM.bin_spike_trains and NeuralGLM.compile_design_matrix to be run first. Will store the fit weights to an internal variable. To access these fit weights in a pandas DataFrame use the NeuralGLM.combine_weights method. Parameters ---------- train_idx : array-like of trial indices, optional List of which trials to use to train the model. Defaults to None, which indicates all indices in the trialsdf will be used (100% train) printcond : bool Whether or not to print the condition number of the design matrix. Defaults to True Returns ------- coefs : list List of coefficients fit. Not recommended to use these for interpretation. Use the .combine_weights() method instead. intercepts : list List of intercepts (bias terms) fit. """ # Input checks if train_idx is None: train_idx = self.design.trialsdf.index if not np.all(np.isin(train_idx, self.design.trialsdf.index)): raise IndexError('Not all train indices in the trials of design matrix') # Store training and test indices for self so that .score() method will know what to # operate on. If all data indices are in train indices, train and test are the same set. self.traininds = train_idx if not np.all(np.isin(self.design.trialsdf.index, train_idx)): self.testinds = self.design.trialsdf.index[~self.design.trialsdf.index.isin(train_idx)] else: self.testinds = train_idx # Mask for training data trainmask = np.isin(self.design.trlabels, train_idx).flatten() trainbinned = self.binnedspikes[trainmask] if printcond: print(f'Condition of design matrix is {np.linalg.cond(self.design[trainmask])}') traindm = self.design[trainmask] coefs, intercepts, = self._fit(traindm, trainbinned) self.coefs, self.intercepts = coefs, intercepts return
[docs] def score(self, testinds=None): """ Score model using chosen metric Returns ------- pandas.Series Score using chosen metric (defined at instantiation) for each unit fit by the model. """ if not hasattr(self, 'coefs'): raise AttributeError('Model has not been fit yet.') if testinds is None: testinds = self.testinds testmask = np.isin(self.design.trlabels, testinds).flatten() dm, binned = self.design[testmask, :], self.binnedspikes[testmask] scores = pd.Series(index=self.coefs.index, name='scores', dtype=object) for cell in self.coefs.index: cell_idx = np.argwhere(self.clu_ids == cell)[0, 0] wt = self.coefs.loc[cell].reshape(-1, 1) bias = self.intercepts.loc[cell] y = binned[:, cell_idx] scores.at[cell] = self._scorer(wt, bias, dm, y) return scores
[docs] def binf(self, t): """ Bin function for a given timestep. Returns the number of bins after trial start a given t would occur at. Parameters ---------- t : float Seconds after trial start Returns ------- int Number of bins corresponding to t using the binwidth of the model. """ return np.ceil(t / self.binwidth).astype(int)