'''
Population functions.
Code from https://github.com/cortex-lab/phylib/blob/master/phylib/stats/ccg.py by C. Rossant.
Code for decoding by G. Meijer
'''
import numpy as np
import scipy as sp
import types
from itertools import groupby
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import KFold, LeaveOneOut, LeaveOneGroupOut
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, roc_auc_score
from sklearn.utils import shuffle as sklearn_shuffle
def _get_spike_counts_in_bins(spike_times, spike_clusters, intervals):
"""
Return the number of spikes in a sequence of time intervals, for each neuron.
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
intervals : 2D array of shape (n_events, 2)
the start and end times of the events
Returns
---------
counts : 2D array of shape (n_neurons, n_events)
the spike counts of all neurons ffrom scipy.stats import sem, tor all events
value (i, j) is the number of spikes of neuron `neurons[i]` in interval #j
cluster_ids : 1D array
list of cluster ids
"""
# Check input
assert intervals.ndim == 2
assert intervals.shape[1] == 2
assert np.all(np.diff(spike_times) >= 0), "Spike times need to be sorted"
intervals_idx = np.searchsorted(spike_times, intervals)
# For each neuron and each interval, the number of spikes in the interval.
cluster_ids = np.unique(spike_clusters)
n_neurons = len(cluster_ids)
n_intervals = intervals.shape[0]
counts = np.zeros((n_neurons, n_intervals), dtype=np.uint32)
for j in range(n_intervals):
t0, t1 = intervals[j, :]
# Count the number of spikes in the window, for each neuron.
x = np.bincount(
spike_clusters[intervals_idx[j, 0]:intervals_idx[j, 1]],
minlength=cluster_ids.max() + 1)
counts[:, j] = x[cluster_ids]
return counts, cluster_ids
def _index_of(arr, lookup):
"""Replace scalars in an array by their indices in a lookup table.
Implicitely assume that:
* All elements of arr and lookup are non-negative integers.
* All elements or arr belong to lookup.
This is not checked for performance reasons.
"""
# Equivalent of np.digitize(arr, lookup) - 1, but much faster.
# TODO: assertions to disable in production for performance reasons.
# TODO: np.searchsorted(lookup, arr) is faster on small arrays with large
# values
lookup = np.asarray(lookup, dtype=np.int32)
m = (lookup.max() if len(lookup) else 0) + 1
tmp = np.zeros(m + 1, dtype=np.int)
# Ensure that -1 values are kept.
tmp[-1] = -1
if len(lookup):
tmp[lookup] = np.arange(len(lookup))
return tmp[arr]
def _increment(arr, indices):
"""Increment some indices in a 1D vector of non-negative integers.
Repeated indices are taken into account."""
bbins = np.bincount(indices)
arr[:len(bbins)] += bbins
return arr
def _diff_shifted(arr, steps=1):
return arr[steps:] - arr[:len(arr) - steps]
def _create_correlograms_array(n_clusters, winsize_bins):
return np.zeros((n_clusters, n_clusters, winsize_bins // 2 + 1), dtype=np.int32)
def _symmetrize_correlograms(correlograms):
"""Return the symmetrized version of the CCG arrays."""
n_clusters, _, n_bins = correlograms.shape
assert n_clusters == _
# We symmetrize c[i, j, 0].
# This is necessary because the algorithm in correlograms()
# is sensitive to the order of identical spikes.
correlograms[..., 0] = np.maximum(
correlograms[..., 0], correlograms[..., 0].T)
sym = correlograms[..., 1:][..., ::-1]
sym = np.transpose(sym, (1, 0, 2))
return np.dstack((sym, correlograms))
def _generate_pseudo_blocks(n_trials, factor=60, min_=20, max_=100):
block_ids = []
while len(block_ids) < n_trials:
x = np.random.exponential(factor)
while (x <= min_) | (x >= max_):
x = np.random.exponential(factor)
if (len(block_ids) == 0) & (np.random.randint(2) == 0):
block_ids += [0] * int(x)
elif (len(block_ids) == 0):
block_ids += [1] * int(x)
elif block_ids[-1] == 0:
block_ids += [1] * int(x)
elif block_ids[-1] == 1:
block_ids += [0] * int(x)
return np.array(block_ids[:n_trials])
[docs]def xcorr(spike_times, spike_clusters, bin_size=None, window_size=None):
"""Compute all pairwise cross-correlograms among the clusters appearing in `spike_clusters`.
Parameters
----------
:param spike_times: Spike times in seconds.
:type spike_times: array-like
:param spike_clusters: Spike-cluster mapping.
:type spike_clusters: array-like
:param bin_size: Size of the bin, in seconds.
:type bin_size: float
:param window_size: Size of the window, in seconds.
:type window_size: float
Returns an `(n_clusters, n_clusters, winsize_samples)` array with all pairwise
cross-correlograms.
"""
assert np.all(np.diff(spike_times) >= 0), ("The spike times must be increasing.")
assert spike_times.ndim == 1
assert spike_times.shape == spike_clusters.shape
# Find `binsize`.
bin_size = np.clip(bin_size, 1e-5, 1e5) # in seconds
# Find `winsize_bins`.
window_size = np.clip(window_size, 1e-5, 1e5) # in seconds
winsize_bins = 2 * int(.5 * window_size / bin_size) + 1
# Take the cluster order into account.
clusters = np.unique(spike_clusters)
n_clusters = len(clusters)
# Like spike_clusters, but with 0..n_clusters-1 indices.
spike_clusters_i = _index_of(spike_clusters, clusters)
# Shift between the two copies of the spike trains.
shift = 1
# At a given shift, the mask precises which spikes have matching spikes
# within the correlogram time window.
mask = np.ones_like(spike_times, dtype=np.bool)
correlograms = _create_correlograms_array(n_clusters, winsize_bins)
# The loop continues as long as there is at least one spike with
# a matching spike.
while mask[:-shift].any():
# Interval between spike i and spike i+shift.
spike_diff = _diff_shifted(spike_times, shift)
# Binarize the delays between spike i and spike i+shift.
spike_diff_b = np.round(spike_diff / bin_size).astype(np.int64)
# Spikes with no matching spikes are masked.
mask[:-shift][spike_diff_b > (winsize_bins / 2)] = False
# Cache the masked spike delays.
m = mask[:-shift].copy()
d = spike_diff_b[m]
# Find the indices in the raveled correlograms array that need
# to be incremented, taking into account the spike clusters.
indices = np.ravel_multi_index(
(spike_clusters_i[:-shift][m], spike_clusters_i[+shift:][m], d), correlograms.shape)
# Increment the matching spikes in the correlograms array.
_increment(correlograms.ravel(), indices)
shift += 1
return _symmetrize_correlograms(correlograms)
[docs]def decode(spike_times, spike_clusters, event_times, event_groups, pre_time=0, post_time=0.5,
classifier='bayes-multinomial', cross_validation='kfold', num_splits=5, prob_left=None,
custom_validation=None, n_neurons='all', iterations=1, shuffle=False, phase_rand=False,
pseudo_blocks=False):
"""
Use decoding to classify groups of trials (e.g. stim left/right). Classification is done using
the population vector of summed spike counts from the specified time window. Cross-validation
is achieved using n-fold cross validation or leave-one-out cross validation. Decoders can
decode any number of groups. When providing the classfier with an imbalanced dataset (not
the same number of trials in each group) the chance level will not be 1/groups. In that case,
to compare the classification performance against change one has to either determine chance
level by decoding a shuffled dataset or use the 'auroc' metric as readout (this metric is
robust against imbalanced datasets)
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D or 2D array
times (in seconds) of the events from the two groups, if it's a 2D array every row will
event_groups : 1D or 2D array
group identities of the events, can be any number of groups, accepts integers and strings
pre_time : float
time (in seconds) preceding the event times
post_time : float
time (in seconds) following the event times
classifier : string or sklearn object
which decoder to use, either input a scikit learn clf object directly or a string.
When it's a string options are (all classifiers are used with default options):
'bayes-multinomal' Naive Bayes with multinomial likelihood
'bayes-gaussian' Naive Bayes with gaussian likelihood
'forest' Random forest
'regression' Logistic regression
'lda' Linear Discriminant Analysis
cross_validation : string of generator object
which cross-validation method to use
you can input the .split method of any sklearn cross-validation method
or a string. If it's a string, options are:
'none' No cross-validation
'kfold' K-fold cross-validation
'kfold-interleaved' K-fold cross validation with interleaved trial selection
'leave-one-out' Leave out the trial that is being decoded
'block' Leave out the block the to-be-decoded trial is in
num_splits : integer
** only for 'kfold' cross-validation **
Number of splits to use for k-fold cross validation, a value of 5 means that the decoder
will be trained on 4/5th of the data and used to predict the remaining 1/5th. This process
is repeated five times so that all data has been used as both training and test set.
prob_left : 1D array
** only for 'block' cross-validation **
the probability of the stimulus appearing on the left for each trial in event_times
custom_validation : generator
** only for 'custom' cross-validation **
a generator object with the splits to be used for cross validation using this format:
(
(split1_train_idxs, split1_test_idxs),
(split2_train_idxs, split2_test_idxs),
(split3_train_idxs, split3_test_idxs),
...)
n_neurons : string or integer
number of neurons to randomly subselect from the population (default is 'all')
iterations : int
number of times to repeat the decoding (especially usefull when subselecting neurons)
shuffle : boolean
whether to shuffle the trial labels each decoding iteration
phase_rand : boolean
whether to use phase randomization of the activity over trials to use as a "chance"
predictor
pseudo_blocks : boolean
whether to generate pseudo blocks with the same statistics as the actual blocks
to estimate chance level
Returns
-------
results : dict
dictionary with decoding results
accuracy : float
accuracy of the classifier in percentage correct
f1 : float
F1 score of the classifier
auroc : float
the area under the ROC curve of the classification performance
confusion_matrix : 2D array
normalized confusion matrix
predictions : 2D array with dimensions iterations x trials
predicted group label for all trials in every iteration
probabilities : 2D array with dimensions iterations x trials
classification probability for all trials in every iteration
"""
# Check input
if type(classifier) == str:
assert classifier in ['bayes-multinomial', 'bayes-gaussian', 'forest', 'regression', 'lda']
assert (type(cross_validation) == str) or (isinstance(cross_validation, types.GeneratorType))
if type(cross_validation) == str:
assert cross_validation in ['none', 'kfold', 'kfold-interleaved', 'leave-one-out', 'block']
assert event_times.shape[0] == event_groups.shape[0]
if cross_validation == 'block':
assert event_times.shape[0] == prob_left.shape[0]
# Get matrix of all neuronal responses
times = np.column_stack(((event_times - pre_time), (event_times + post_time)))
pop_vector, cluster_ids = _get_spike_counts_in_bins(spike_times, spike_clusters, times)
pop_vector = pop_vector.T
# Exclude last trial if the number of trials is even and phase shuffling
if (phase_rand is True) & (event_groups.shape[0] % 2 == 0):
event_groups = event_groups[:-1]
pop_vector = pop_vector[:-1]
# Initialize classifier
if type(classifier) == str:
if classifier == 'forest':
clf = RandomForestClassifier()
elif classifier == 'bayes-multinomial':
clf = MultinomialNB()
elif classifier == 'bayes-gaussian':
clf = GaussianNB()
elif classifier == 'regression':
clf = LogisticRegression(solver='liblinear')
elif classifier == 'lda':
clf = LinearDiscriminantAnalysis()
else:
clf = classifier
# Pre-allocate variables
acc = np.zeros(iterations)
f1 = np.zeros(iterations)
auroc = np.zeros(iterations)
conf_matrix_norm = np.zeros((np.shape(np.unique(event_groups))[0],
np.shape(np.unique(event_groups))[0],
iterations))
pred = np.zeros([iterations, pop_vector.shape[0]])
prob = np.zeros([iterations, pop_vector.shape[0]])
for i in range(iterations):
# Pre-allocate variables for this iteration
y_pred = np.zeros(event_groups.shape)
y_probs = np.zeros(event_groups.shape)
# Get neurons to use for this iteration
if n_neurons == 'all':
sub_pop_vector = pop_vector
else:
use_neurons = np.random.choice(pop_vector.shape[1], n_neurons, replace=False)
sub_pop_vector = pop_vector[:, use_neurons]
# Shuffle trail labels if necessary
if shuffle is True:
event_groups = sklearn_shuffle(event_groups)
# Perform phase randomization of activity over trials if necessary
if phase_rand is True:
if i == 0:
original_pop_vector = sub_pop_vector
rand_pop_vector = np.empty(original_pop_vector.shape)
frequencies = int((original_pop_vector.shape[0] - 1) / 2)
fsignal = sp.fft.fft(original_pop_vector, axis=0)
power = np.abs(fsignal[1:1 + frequencies])
phases = 2 * np.pi * np.random.rand(frequencies)
for k in range(original_pop_vector.shape[1]):
newfsignal = fsignal[0, k]
newfsignal = np.append(newfsignal, np.exp(1j * phases) * power[:, k])
newfsignal = np.append(newfsignal, np.flip(np.exp(-1j * phases) * power[:, k]))
newsignal = sp.fft.ifft(newfsignal)
rand_pop_vector[:, k] = np.abs(newsignal.real)
sub_pop_vector = rand_pop_vector
# Generate pseudo blocks if necessary
if pseudo_blocks:
event_groups = _generate_pseudo_blocks(event_groups.shape[0])
if cross_validation == 'none':
# Fit the model on all the data and predict
clf.fit(sub_pop_vector, event_groups)
y_pred = clf.predict(sub_pop_vector)
# Get the probability of the prediction for ROC analysis
probs = clf.predict_proba(sub_pop_vector)
y_probs = probs[:, 1] # keep positive only
else:
# Perform cross-validation
if cross_validation == 'leave-one-out':
cv = LeaveOneOut().split(sub_pop_vector)
elif cross_validation == 'kfold':
cv = KFold(n_splits=num_splits).split(sub_pop_vector)
elif cross_validation == 'kfold-interleaved':
cv = KFold(n_splits=num_splits, shuffle=True).split(sub_pop_vector)
elif cross_validation == 'block':
block_lengths = [sum(1 for i in g) for k, g in groupby(prob_left)]
blocks = np.repeat(np.arange(len(block_lengths)), block_lengths)
cv = LeaveOneGroupOut().split(sub_pop_vector, groups=blocks)
elif cross_validation == 'custom':
cv = cross_validation
# Loop over the splits into train and test
for train_index, test_index in cv:
# Fit the model to the training data
clf.fit(sub_pop_vector[train_index], event_groups[train_index])
# Predict the test data
y_pred[test_index] = clf.predict(sub_pop_vector[test_index])
# Get the probability of the prediction for ROC analysis
probs = clf.predict_proba(sub_pop_vector[test_index])
y_probs[test_index] = probs[:, 1] # keep positive only
# Calculate performance metrics and confusion matrix
acc[i] = accuracy_score(event_groups, y_pred)
f1[i] = f1_score(event_groups, y_pred)
auroc[i] = roc_auc_score(event_groups[~np.isnan(y_probs)], y_probs[~np.isnan(y_probs)])
conf_matrix = confusion_matrix(event_groups, y_pred)
conf_matrix_norm[:, :, i] = conf_matrix / conf_matrix.sum(axis=1)[:, np.newaxis]
# Add prediction and probability to matrix
pred[i, :] = y_pred
prob[i, :] = y_probs
# Make integers from arrays when there's only one iteration
if iterations == 1:
acc = acc[0]
f1 = f1[0]
auroc = auroc[0]
# Add to results dictionary
if cross_validation == 'kfold':
results = dict({'accuracy': acc, 'f1': f1, 'auroc': auroc,
'predictions': pred, 'probabilities': prob,
'confusion_matrix': conf_matrix_norm,
'n_groups': np.shape(np.unique(event_groups))[0],
'classifier': classifier, 'cross_validation': '%d-fold' % num_splits,
'iterations': iterations, 'shuffle': shuffle, 'phase_rand': phase_rand,
'pseudo_blocks': pseudo_blocks})
else:
results = dict({'accuracy': acc, 'f1': f1, 'auroc': auroc,
'predictions': pred, 'probabilities': prob,
'confusion_matrix': conf_matrix_norm,
'n_groups': np.shape(np.unique(event_groups))[0],
'classifier': classifier, 'cross_validation': cross_validation,
'iterations': iterations, 'shuffle': shuffle, 'phase_rand': phase_rand,
'pseudo_blocks': pseudo_blocks})
return results
[docs]def lda_project(spike_times, spike_clusters, event_times, event_groups, pre_time=0, post_time=0.5,
cross_validation='kfold', num_splits=5, prob_left=None, custom_validation=None):
"""
Use linear discriminant analysis to project population vectors to the line that best separates
the two groups. When cross-validation is used, the LDA projection is fitted on the training
data after which the test data is projected to this projection.
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D array
times (in seconds) of the events from the two groups
event_groups : 1D array
group identities of the events, can be any number of groups, accepts integers and strings
pre_time : float
time (in seconds) preceding the event times
post_time : float
time (in seconds) following the event times
cross_validation : string
which cross-validation method to use, options are:
'none' No cross-validation
'kfold' K-fold cross-validation
'leave-one-out' Leave out the trial that is being decoded
'block' Leave out the block the to-be-decoded trial is in
'custom' Any custom cross-validation provided by the user
num_splits : integer
** only for 'kfold' cross-validation **
Number of splits to use for k-fold cross validation, a value of 5 means that the decoder
will be trained on 4/5th of the data and used to predict the remaining 1/5th. This process
is repeated five times so that all data has been used as both training and test set.
prob_left : 1D array
** only for 'block' cross-validation **
the probability of the stimulus appearing on the left for each trial in event_times
custom_validation : generator
** only for 'custom' cross-validation **
a generator object with the splits to be used for cross validation using this format:
(
(split1_train_idxs, split1_test_idxs),
(split2_train_idxs, split2_test_idxs),
(split3_train_idxs, split3_test_idxs),
...)
n_neurons : int
Group size of number of neurons to be sub-selected
Returns
-------
lda_projection : 1D array
the position along the LDA projection axis for the population vector of each trial
"""
# Check input
assert cross_validation in ['none', 'kfold', 'leave-one-out', 'block', 'custom']
assert event_times.shape[0] == event_groups.shape[0]
if cross_validation == 'block':
assert event_times.shape[0] == prob_left.shape[0]
if cross_validation == 'custom':
assert isinstance(custom_validation, types.GeneratorType)
# Get matrix of all neuronal responses
times = np.column_stack(((event_times - pre_time), (event_times + post_time)))
pop_vector, cluster_ids = _get_spike_counts_in_bins(spike_times, spike_clusters, times)
pop_vector = pop_vector.T
# Initialize
lda = LinearDiscriminantAnalysis()
lda_projection = np.zeros(event_groups.shape)
if cross_validation == 'none':
# Find the best LDA projection on all data and transform those data
lda_projection = lda.fit_transform(pop_vector, event_groups)
else:
# Perform cross-validation
if cross_validation == 'leave-one-out':
cv = LeaveOneOut().split(pop_vector)
elif cross_validation == 'kfold':
cv = KFold(n_splits=num_splits).split(pop_vector)
elif cross_validation == 'block':
block_lengths = [sum(1 for i in g) for k, g in groupby(prob_left)]
blocks = np.repeat(np.arange(len(block_lengths)), block_lengths)
cv = LeaveOneGroupOut().split(pop_vector, groups=blocks)
elif cross_validation == 'custom':
cv = custom_validation
# Loop over the splits into train and test
for train_index, test_index in cv:
# Find LDA projection on the training data
lda.fit(pop_vector[train_index], [event_groups[j] for j in train_index])
# Project the held-out test data to projection
lda_projection[test_index] = lda.transform(pop_vector[test_index]).T[0]
return lda_projection