"""
GLM fitting utilities based on NeuroGLM by Il Memming Park, Jonathan Pillow:
https://github.com/pillowlab/neuroGLM
Berk Gercek
International Brain Lab, 2020
"""
from warnings import warn, catch_warnings
import numpy as np
import pandas as pd
from brainbox.processing import bincount2D
from sklearn.linear_model import PoissonRegressor
import scipy.sparse as sp
import numba as nb
from numpy.matlib import repmat
from scipy.optimize import minimize
from scipy.special import xlogy
from tqdm import tqdm
[docs]class NeuralGLM:
"""
Generalized Linear Model which seeks to describe spiking activity as the output of a poisson
process. Uses sklearn's GLM methods under the hood while providing useful routines for dealing
with neural data
"""
def __init__(self, trialsdf, spk_times, spk_clu, vartypes,
train=0.8, binwidth=0.02, mintrials=100):
"""
Construct GLM object using information about all trials, and the relevant spike times.
Only ingests data, and further object methods must be called to describe kernels, gain
terms, etc. as components of the model.
Parameters
----------
trialsdf: pandas.DataFrame
DataFrame of trials in which each row contains all desired covariates of the model.
e.g. contrast, stimulus type, etc. Not all columns will necessarily be fit.
If a continuous covariate (e.g. wheel position, pupil diameter) is included, each entry
of the column must be a nSamples x 2 array with samples in the first column and
timestamps (relative to trial start) in the second position.
*Must have \'trial_start\' and \'trial_end\' parameters which are times, in seconds.*
spk_times: numpy.array of floats
1-D array of times at which spiking events were detected, in seconds.
spk_clu: numpy.array of integers
1-D array of same shape as spk_times, with integer cluster IDs identifying which
cluster a spike time belonged to.
vartypes: dict
Dict with column names in trialsdf as keys, values are the type of covariate the column
contains. e.g. {'stimOn_times': 'timing', 'wheel', 'continuous', 'correct': 'value'}
Valid values are:
'timing' : A timestamp relative to trial start (e.g. stimulus onset)
'continuous' : A continuous covariate sampled throughout the trial (e.g. eye pos)
'value' : A single value for the given trial (e.g. contrast or difficulty)
train: float
Float in (0, 1] indicating proportion of data to use for training GLM vs testing
(using the NeuralGLM.score method). Trials to keep will be randomly sampled.
binwidth: float
Width, in seconds, of the bins which will be used to count spikes. Defaults to 20ms.
mintrials: int
Minimum number of trials in which neurons fired a spike in order to be fit. Defaults
to 100 trials.
Returns
-------
glm: object
GLM object with methods for adding regressors and fitting
"""
# Data checks #
if not all([name in vartypes for name in trialsdf.columns]):
raise KeyError("Some columns were not described in vartypes")
if not all([value in ('timing', 'continuous', 'value') for value in vartypes.values()]):
raise ValueError("Invalid values were passed in vartypes")
if not len(spk_times) == len(spk_clu):
raise IndexError("Spike times and cluster IDs are not same length")
if not isinstance(train, float) and not train == 1:
raise TypeError('train must be a float between 0 and 1')
if not ((train > 0) & (train <= 1)):
raise ValueError('train must be between 0 and 1')
# Filter out cells which don't meet the criteria for minimum spiking, while doing trial
# assignment
self.vartypes = vartypes
self.vartypes['duration'] = 'value'
trialsdf = trialsdf.copy() # Make sure we don't modify the original dataframe
clu_ids = np.unique(spk_clu)
trbounds = trialsdf[['trial_start', 'trial_end']] # Get the start/end of trials
# Initialize a Cells x Trials bool array to easily see how many trials a clu spiked
trialspiking = np.zeros((trialsdf.index.max() + 1, clu_ids.max() + 1), dtype=bool)
# Empty trial duration value to use later
trialsdf['duration'] = np.nan
# Iterate through each trial, and store the relevant spikes for that trial into a dict
# Along with the cluster labels. This makes binning spikes and accessing spikes easier.
spks = {}
clu = {}
st_endlast = 0
timingvars = [col for col in trialsdf.columns if vartypes[col] == 'timing']
for i, (start, end) in trbounds.iterrows():
if any(np.isnan((start, end))):
warn(f"NaN values found in trial start or end at trial number {i}. "
"Discarding trial.")
trialsdf.drop(i, inplace=True)
continue
st_startind = np.searchsorted(spk_times[st_endlast:], start) + st_endlast
st_endind = np.searchsorted(spk_times[st_endlast:], end, side='right') + st_endlast
st_endlast = st_endind
trial_clu = np.unique(spk_clu[st_startind:st_endind])
trialspiking[i, trial_clu] = True
spks[i] = spk_times[st_startind:st_endind] - start
clu[i] = spk_clu[st_startind:st_endind]
for col in timingvars:
trialsdf.at[i, col] = np.round(trialsdf.at[i, col] - start, decimals=5)
trialsdf.at[i, 'duration'] = end - start
# Break the data into test and train sections for cross-validation
if train == 1:
print('Training fraction set to 1. Training on all data.')
traininds = trialsdf.index
testinds = trialsdf.index
else:
trainlen = int(np.floor(len(trialsdf) * train))
traininds = sorted(np.random.choice(trialsdf.index, trainlen, replace=False))
testinds = trialsdf.index[~trialsdf.index.isin(traininds)]
self.spikes = spks
self.clu = clu
self.clu_ids = np.argwhere(np.sum(trialspiking, axis=0) > mintrials)
self.binwidth = binwidth
self.covar = {}
self.trialsdf = trialsdf
self.traininds = traininds
self.testinds = testinds
self.compiled = False
return
[docs] def add_covariate_timing(self, covlabel, eventname, bases,
offset=0, deltaval=None, cond=None, desc=''):
"""
Convenience wrapper for adding timing event regressors to the GLM. Automatically generates
a one-hot vector for each trial as the regressor and adds the appropriate data structure
to the model.
Parameters
----------
covlabel : str
Label which the covariate will use. Can be accessed via dot syntax of the instance
usually.
eventname : str
Label of the column in trialsdf which has the event timing for each trial.
bases : numpy.array
nTB x nB array, i.e. number of time bins for the bases functions by number of bases.
Each column in the array is used together to describe the response of a unit to that
timing event.
offset : float, seconds
Offset of bases functions relative to timing event. Negative values will ensure that
deltaval : None, str, or pandas series, optional
Values of the kronecker delta function peak used to encode the event. If a string, the
column in trialsdf with that label will be used. If a pandas series with indexes
matching trialsdf, corresponding elements of the series will be the delta funtion val.
If None (default) height is 1.
cond : None, list, or fun, optional
Condition which to apply this covariate. Can either be a list of trial indices, or a
function which takes in rows of the trialsdf and returns booleans.
desc : str, optional
Additional information about the covariate, if desired. by default ''
"""
if covlabel in self.covar:
raise AttributeError(f'Covariate {covlabel} already exists in model.')
if self.compiled:
warn('Design matrix was already compiled once. Be sure to compile again if adding'
' additional covariates.')
if deltaval is None:
gainmod = False
elif isinstance(deltaval, pd.Series):
gainmod = True
elif isinstance(deltaval, str) and deltaval in self.trialsdf.columns:
gainmod = True
deltaval = self.trialsdf[deltaval]
else:
raise TypeError(f'deltaval must be None or pandas series. {type(deltaval)} '
'was passed instead.')
if eventname not in self.vartypes:
raise ValueError('Event name specified not found in trialsdf')
elif self.vartypes[eventname] != 'timing':
raise TypeError(f'Column {eventname} in trialsdf is not registered as a timing')
vecsizes = self.trialsdf['duration'].apply(self.binf)
stiminds = self.trialsdf[eventname].apply(self.binf)
stimvecs = []
for i in self.trialsdf.index:
vec = np.zeros(vecsizes[i])
if gainmod:
vec[stiminds[i]] = deltaval[i]
else:
vec[stiminds[i]] = 1
stimvecs.append(vec.reshape(-1, 1))
regressor = pd.Series(stimvecs, index=self.trialsdf.index)
self.add_covariate(covlabel, regressor, bases, offset, cond, desc)
return
[docs] def add_covariate_boxcar(self, covlabel, boxstart, boxend,
cond=None, height=None, desc=''):
if covlabel in self.covar:
raise AttributeError(f'Covariate {covlabel} already exists in model.')
if self.compiled:
warn('Design matrix was already compiled once. Be sure to compile again if adding'
' additional covariates.')
if boxstart not in self.trialsdf.columns or boxend not in self.trialsdf.columns:
raise KeyError('boxstart or boxend not found in trialsdf columns.')
if self.vartypes[boxstart] != 'timing':
raise TypeError(f'Column {boxstart} in trialsdf is not registered as a timing. '
'boxstart and boxend need to refer to timining events in trialsdf.')
if self.vartypes[boxend] != 'timing':
raise TypeError(f'Column {boxend} in trialsdf is not registered as a timing. '
'boxstart and boxend need to refer to timining events in trialsdf.')
if isinstance(height, str):
if height in self.trialsdf.columns:
height = self.trialsdf[height]
else:
raise KeyError(f'{height} is str not in columns of trialsdf')
elif isinstance(height, pd.Series):
if not all(height.index == self.trialsdf.index):
raise IndexError('Indices of height series does not match trialsdf.')
elif height is None:
height = pd.Series(np.ones(len(self.trialsdf.index)), index=self.trialsdf.index)
vecsizes = self.trialsdf['duration'].apply(self.binf)
stind = self.trialsdf[boxstart].apply(self.binf)
endind = self.trialsdf[boxend].apply(self.binf)
stimvecs = []
for i in self.trialsdf.index:
bxcar = np.zeros(vecsizes[i])
bxcar[stind[i]:endind[i] + 1] = height[i]
stimvecs.append(bxcar)
regressor = pd.Series(stimvecs, index=self.trialsdf.index)
self.add_covariate(covlabel, regressor, None, cond, desc)
return
[docs] def add_covariate_raw(self, covlabel, raw,
cond=None, desc=''):
stimlens = self.trialsdf.duration.apply(self.binf)
if isinstance(raw, str):
if raw not in self.trialsdf.columns:
raise KeyError(f'String {raw} not found in columns of trialsdf. Strings must'
'refer to valid column names.')
covseries = self.trialsdf[raw]
if np.any(covseries.apply(len) != stimlens):
raise IndexError(f'Some array shapes in {raw} do not match binned duration.')
self.add_covariate(covlabel, covseries, None, cond=cond)
if callable(raw):
try:
covseries = self.trialsdf.apply(raw, axis=1)
except Exception:
raise TypeError('Function for raw covariate generation did not run properly.'
'Make sure that the function passed takes in rows of trialsdf.')
if np.any(covseries.apply(len) != stimlens):
raise IndexError(f'Some array shapes in {raw} do not match binned duration.')
self.add_covariate(covlabel, covseries, None, cond=cond)
if isinstance(raw, pd.Series):
if np.any(raw.index != self.trialsdf.index):
raise IndexError('Indices of raw do not match indices of trialsdf.')
if np.any(raw.apply(len) != stimlens):
raise IndexError(f'Some array shapes in {raw} do not match binned duration.')
self.add_covariate(covlabel, raw, None, cond=cond)
[docs] def add_covariate(self, covlabel, regressor, bases,
offset=0, cond=None, desc=''):
"""
Parent function to add covariates to model object. Takes a regressor in the form of a
pandas Series object, a T x M array of M bases, and stores them for use in the design
matrix generation.
Parameters
----------
covlabel : str
Label for the covariate being added. Will be exposed, if possible, through
(instance).(covlabel) attribute.
regressor : pandas.Series
Series in which each element is the value(s) of a regressor for a trial at that index.
These will be convolved with the bases functions (if provided) to produce the
components of the design matrix. *Regressor must be (T / dt) x 1 array for each trial*
bases : numpy.array or None
T x M array of M basis functions over T timesteps. Columns will be convolved with the
elements of `regressor` to produce elements of the design matrix. If None, it is
assumed a raw regressor is being used.
offset : int, optional
Offset of the regressor from the bases during convolution. Negative values indicate
that the firing of the unit will be , by default 0
cond : list or func, optional
Condition for which to apply covariate. Either a list of trials which the covariate
applies to, or a function of the form f(dataframerow) which returns a boolean,
by default None
desc : str, optional
Description of the covariate for reference purposes, by default '' (empty)
"""
if covlabel in self.covar:
raise AttributeError(f'Covariate {covlabel} already exists in model.')
if self.compiled:
warn('Design matrix was already compiled once. Be sure to compile again if adding'
' additional covariates.')
# Test for mismatch in length of regressor vs trials
mismatch = np.zeros(len(self.trialsdf.index), dtype=bool)
for i in self.trialsdf.index:
currtr = self.trialsdf.loc[i]
nT = self.binf(currtr.duration)
if regressor.loc[i].shape[0] != nT:
mismatch[i] = True
if np.any(mismatch):
raise ValueError('Length mismatch between regressor and trial on trials'
f'{*np.argwhere(mismatch),}.')
# Initialize containers for the covariate dicts
if not hasattr(self, 'currcol'):
self.currcol = 0
if callable(cond):
cond = self.trialsdf.index[self.trialsdf.apply(cond, axis=1)].to_numpy()
if not all(regressor.index == self.trialsdf.index):
raise IndexError('Indices of regressor and trials dataframes do not match.')
cov = {'description': desc,
'bases': bases,
'valid_trials': cond if cond is not None else self.trialsdf.index,
'offset': offset,
'regressor': regressor,
'dmcol_idx': np.arange(self.currcol, self.currcol + bases.shape[1])
if bases is not None else self.currcol}
if bases is None:
self.currcol += 1
else:
self.currcol += bases.shape[1]
self.covar[covlabel] = cov
return
[docs] def bin_spike_trains(self):
"""
Bins spike times passed to class at instantiation. Will not bin spike trains which did
not meet the criteria for minimum number of spiking trials. Must be run before the
NeuralGLM.fit() method is called.
"""
spkarrs = []
arrdiffs = []
for i in self.trialsdf.index:
duration = self.trialsdf.loc[i, 'duration']
durmod = duration % self.binwidth
if durmod > (self.binwidth / 2):
duration = duration - (self.binwidth / 2)
if len(self.spikes[i]) == 0:
arr = np.zeros((self.binf(duration), len(self.clu_ids)))
spkarrs.append(arr)
continue
spks = self.spikes[i]
clu = self.clu[i]
arr = bincount2D(spks, clu,
xbin=self.binwidth, ybin=self.clu_ids, xlim=[0, duration])[0]
arrdiffs.append(arr.shape[1] - self.binf(duration))
spkarrs.append(arr.T)
y = np.vstack(spkarrs)
if hasattr(self, 'dm'):
assert y.shape[0] == self.dm.shape[0], "Oh shit. Indexing error."
self.binnedspikes = y
return
[docs] def compile_design_matrix(self, dense=True):
"""
Compiles design matrix for the current experiment based on the covariates which were added
with the various NeuralGLM.add_covariate methods available. Can optionally compile a sparse
design matrix using the scipy.sparse package, however that method may take longer depending
on the degree of sparseness.
Parameters
----------
dense : bool, optional
Whether or not to compute a dense design matrix or a sparse one, by default True
"""
covars = self.covar
# Go trial by trial and compose smaller design matrices
miniDMs = []
rowtrials = []
for i, trial in self.trialsdf.iterrows():
nT = self.binf(trial.duration)
miniX = np.zeros((nT, self.currcol))
rowlabs = np.ones((nT, 1), dtype=int) * i
for cov in covars.values():
sidx = cov['dmcol_idx']
# Optionally use cond to filter out which trials to apply certain regressors,
if i not in cov['valid_trials']:
continue
stim = cov['regressor'][i]
# Convolve Kernel or basis function with stimulus or regressor
if cov['bases'] is None:
miniX[:, sidx] = stim
else:
if len(stim.shape) == 1:
stim = stim.reshape(-1, 1)
miniX[:, sidx] = convbasis(stim, cov['bases'], self.binf(cov['offset']))
# Sparsify convolved result and store in miniDMs
if dense:
miniDMs.append(miniX)
else:
miniDMs.append(sp.lil_matrix(miniX))
rowtrials.append(rowlabs)
if dense:
dm = np.vstack(miniDMs)
else:
dm = sp.vstack(miniDMs).to_csc()
trlabels = np.vstack(rowtrials)
if hasattr(self, 'binnedspikes'):
assert self.binnedspikes.shape[0] == dm.shape[0], "Oh shit. Indexing error."
self.dm = dm
self.trlabels = trlabels
# self.dm = np.roll(dm, -1, axis=0) # Fix weird +1 offset bug in design matrix
self.compiled = True
return
[docs] def fit(self, method='sklearn', alpha=0):
"""
Fit the current set of binned spikes as a function of the current design matrix. Requires
NeuralGLM.bin_spike_trains and NeuralGLM.compile_design_matrix to be run first. Will store
the fit weights to an internal variable. To access these fit weights in a pandas DataFrame
use the NeuralGLM.combine_weights method.
Parameters
----------
method : str, optional
'sklearn' or 'minimize', describes the fitting method used to obtain weights.
Scikit-learn uses weight normalization and regularization and will return significantly
different results from 'minimize', which simply minimizes the negative log likelihood
of the data given the covariates, by default 'sklearn'
alpha : float, optional
Regularization strength for scikit-learn implementation of GLM fitting, where 0 is
effectively unregularized weights. Does not function in the minimize
option, by default 1
Returns
-------
coefs : list
List of coefficients fit. Not recommended to use these for interpretation. Use
the .combine_weights() method instead.
intercepts : list
List of intercepts (bias terms) fit. Not recommended to use these for interpretation.
"""
if not self.compiled:
raise AttributeError('Design matrix has not been compiled yet. Please run '
'neuroglm.compile_design_matrix() before fitting.')
# TODO: Make this optionally parallel across multiple cores of CPU
# Initialize pd Series to store output coefficients and intercepts for fits
coefs = pd.Series(index=self.clu_ids.flat, name='coefficients', dtype=object)
intercepts = pd.Series(index=self.clu_ids.flat, name='intercepts', dtype=object)
variances = pd.Series(index=self.clu_ids.flat, name='variances', dtype=object)
biasdm = np.pad(self.dm.copy(), ((0, 0), (1, 0)), 'constant', constant_values=1)
trainmask = np.isin(self.trlabels, self.traininds).flatten() # Mask for training data
trainbinned = self.binnedspikes[trainmask]
print(f'Condition of design matrix is {np.linalg.cond(self.dm[trainmask])}')
if method == 'sklearn':
traindm = self.dm[trainmask]
nonconverged = []
for i, cell in tqdm(enumerate(self.clu_ids), 'Fitting units:', leave=False):
binned = trainbinned[:, i]
with catch_warnings(record=True) as w:
fitobj = PoissonRegressor(alpha=alpha, max_iter=300).fit(traindm,
binned)
if len(w) != 0:
nonconverged.append(cell[0])
wts = np.concatenate([[fitobj.intercept_], fitobj.coef_], axis=0)
wvar = np.diag(np.linalg.inv(dd_neglog(wts, biasdm[trainmask], binned)))
coefs.at[cell[0]] = fitobj.coef_
variances.at[cell[0]] = wvar[1:]
intercepts.at[cell[0]] = fitobj.intercept_
if len(nonconverged) != 0:
warn(f'Fitting did not converge for some units: {nonconverged}')
elif method == 'minimize':
traindm = biasdm[trainmask]
for i, cell in tqdm(enumerate(self.clu_ids), 'Fitting units:', leave=False):
binned = trainbinned[:, i]
wi = np.linalg.lstsq(traindm, binned, rcond=None)[0]
res = minimize(neglog, wi, (traindm, binned),
method='trust-ncg', jac=d_neglog, hess=dd_neglog)
hess = dd_neglog(res.x, traindm, binned)
try:
wvar = np.diag(np.linalg.inv(hess))
except np.linalg.LinAlgError:
wvar = np.ones_like(res.x) * 1e6
coefs.at[cell[0]] = res.x[1:]
intercepts.at[cell[0]] = res.x[0]
variances.at[cell[0]] = wvar[1:]
self.coefs = coefs
self.intercepts = intercepts
self.variances = variances
self.fitmethod = method
return coefs, intercepts
[docs] def combine_weights(self):
"""
Combined fit coefficients and intercepts to produce kernels where appropriate, which
describe activity.
Returns
-------
pandas.DataFrame
DataFrame in which each row is the fit weights for a given spiking unit. Columns are
individual covariates added during the construction process. Indices are the cluster
IDs for each of the cells that were fit (NOT a simple range(start, stop) index.)
"""
outputs = {}
varoutputs = {}
for var in self.covar.keys():
if self.covar[var]['bases'] is None:
wind = self.covar[var]['dmcol_idx']
outputs[var] = self.coefs.apply(lambda w: w[wind])
continue
winds = self.covar[var]['dmcol_idx']
bases = self.covar[var]['bases']
weights = self.coefs.apply(lambda w: np.sum(w[winds] * bases, axis=1))
variances = self.variances.apply(lambda v: np.sum(v[winds] * bases, axis=1))
offset = self.covar[var]['offset']
tlen = bases.shape[0] * self.binwidth
tstamps = np.linspace(0 + offset, tlen + offset, bases.shape[0])
outputs[var] = pd.DataFrame(weights.values.tolist(),
index=weights.index,
columns=tstamps)
varoutputs[var] = pd.DataFrame(variances.values.tolist(),
index=weights.index,
columns=tstamps)
self.combined_weights = outputs
self.combined_variances = varoutputs
return outputs
[docs] def score(self):
"""
Compute the squared deviance of the model, i.e. how much variance beyond the null model
(a poisson process with the same mean, defined by the intercept, at every time step) the
model which was fit explains.
For a detailed explanation see https://bookdown.org/egarpor/PM-UC3M/glm-deviance.html`
Returns
-------
pandas.Series
A series in which the index are cluster IDs and each entry is the D^2 for the model fit
to that cluster
"""
if not hasattr(self, 'coefs'):
raise AttributeError('Fit was not run. Please run fit first.')
testmask = np.isin(self.trlabels, self.testinds).flatten()
testdm = self.dm[testmask, :]
scores = pd.Series(index=self.coefs.index)
for cell in self.coefs.index:
cell_idx = np.argwhere(self.clu_ids == cell)[0, 0]
wt = self.coefs.loc[cell].reshape(-1, 1)
bias = self.intercepts.loc[cell]
y = self.binnedspikes[testmask, cell_idx]
pred = np.exp(testdm @ wt + bias)
null_pred = np.ones_like(pred) * np.mean(y)
null_deviance = 2 * np.sum(xlogy(y, y / null_pred.flat) - y + null_pred.flat)
with np.errstate(invalid='ignore', divide='ignore'):
full_deviance = 2 * np.sum(xlogy(y, y / pred.flat) - y + pred.flat)
d_sq = 1 - (full_deviance / null_deviance)
scores.at[cell] = d_sq
return scores
[docs] def binf(self, t):
"""
Bin function for a given timestep. Returns the number of bins after trial start a given t
would occur at.
Parameters
----------
t : float
Seconds after trial start
Returns
-------
int
Number of bins corresponding to t using the binwidth of the model.
"""
return np.ceil(t / self.binwidth).astype(int)
[docs]def convbasis(stim, bases, offset=0):
if offset < 0:
stim = np.pad(stim, ((0, -offset), (0, 0)), 'constant')
elif offset > 0:
stim = np.pad(stim, ((offset, 0), (0, 0)), 'constant')
X = denseconv(stim, bases)
if offset < 0:
X = X[-offset:, :]
elif offset > 0:
X = X[: -(1 + offset), :]
return X
# Precompilation for speed
[docs]@nb.njit
def denseconv(X, bases):
T, dx = X.shape
TB, M = bases.shape
indices = np.ones((dx, M))
sI = np.sum(indices, axis=1)
BX = np.zeros((T, int(np.sum(sI))))
sI = np.cumsum(sI)
k = 0
for kCov in range(dx):
A = np.zeros((T + TB - 1, int(np.sum(indices[kCov, :]))))
for i, j in enumerate(np.argwhere(indices[kCov, :]).flat):
A[:, i] = np.convolve(X[:, kCov], bases[:, j])
BX[:, k: sI[kCov]] = A[: T, :]
k = sI[kCov]
return BX
[docs]def raised_cosine(duration, nbases, binfun):
nbins = binfun(duration)
ttb = repmat(np.arange(1, nbins + 1).reshape(-1, 1), 1, nbases)
dbcenter = nbins / nbases
cwidth = 4 * dbcenter
bcenters = 0.5 * dbcenter + dbcenter * np.arange(0, nbases)
x = ttb - repmat(bcenters.reshape(1, -1), nbins, 1)
bases = (np.abs(x / cwidth) < 0.5) * (np.cos(x * np.pi * 2 / cwidth) * 0.5 + 0.5)
return bases
[docs]def full_rcos(duration, nbases, binfun, n_before=1):
if not isinstance(n_before, int):
n_before = int(n_before)
nbins = binfun(duration)
ttb = repmat(np.arange(1, nbins + 1).reshape(-1, 1), 1, nbases)
dbcenter = nbins / (nbases - 2)
cwidth = 4 * dbcenter
bcenters = 0.5 * dbcenter + dbcenter * np.arange(-n_before, nbases - n_before)
x = ttb - repmat(bcenters.reshape(1, -1), nbins, 1)
bases = (np.abs(x / cwidth) < 0.5) * (np.cos(x * np.pi * 2 / cwidth) * 0.5 + 0.5)
return bases
[docs]def neglog(weights, x, y):
xproj = x @ weights
f = np.exp(xproj)
nzidx = f != 0
if np.any(y[~nzidx] != 0):
return np.inf
return -y[nzidx].reshape(1, -1) @ xproj[nzidx] + np.sum(f)
[docs]def d_neglog(weights, x, y):
xproj = x @ weights
f = np.exp(xproj)
df = f
nzidx = (f != 0).reshape(-1)
if np.any(y[~nzidx] != 0):
return np.inf
return x[nzidx, :].T @ ((1 - y[nzidx] / f[nzidx]) * df[nzidx])
[docs]def dd_neglog(weights, x, y):
xproj = x @ weights
f = np.exp(xproj)
df = f
ddf = df
nzidx = (f != 0).reshape(-1)
if np.any(y[~nzidx] != 0):
return np.inf
yf = y[nzidx] / f[nzidx]
p1 = ddf[nzidx] * (1 - yf) + (y[nzidx] * (df[nzidx] / f[nzidx])**2)
p2 = x[nzidx, :]
return (p1.reshape(-1, 1) * p2).T @ x[nzidx, :]