'''
Computes task related output
'''
import numpy as np
from scipy.stats import ranksums, wilcoxon, ttest_ind, ttest_rel
from ._statsmodels import multipletests
from sklearn.metrics import roc_auc_score
import pandas as pd
from brainbox.population.decode import get_spike_counts_in_bins
[docs]
def responsive_units(spike_times, spike_clusters, event_times, pre_time=[0.5, 0],
post_time=[0, 0.5], alpha=0.05, fdr_corr=False, use_fr=False):
"""
Determine responsive neurons by doing a Wilcoxon Signed-Rank test between a baseline period
before a certain task event (e.g. stimulus onset) and a period after the task event.
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D array
times (in seconds) of the events from the two groups
pre_time : two-element array
time (in seconds) preceding the event to get the baseline (e.g. [0.5, 0.2] would be a
window starting 0.5 seconds before the event and ending at 0.2 seconds before the event)
post_time : two-element array
time (in seconds) to follow the event times
alpha : float
alpha to use for statistical significance
fdr_corr : boolean
whether to use an FDR correction (Benjamin-Hochmann) to correct for multiple testing
use_fr : bool
whether to use the firing rate instead of total spike count
Returns
-------
significant_units : ndarray
an array with the indices of clusters that are significatly modulated
stats : 1D array
the statistic of the test that was performed
p_values : ndarray
the p-values of all the clusters
cluster_ids : ndarray
cluster ids of the p-values
"""
# Get spike counts for baseline and event timewindow
baseline_times = np.column_stack(((event_times - pre_time[0]), (event_times - pre_time[1])))
baseline_counts, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters,
baseline_times)
times = np.column_stack(((event_times + post_time[0]), (event_times + post_time[1])))
spike_counts, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters, times)
if use_fr:
baseline_counts = baseline_counts / (pre_time[0] - pre_time[1])
spike_counts = spike_counts / (post_time[1] - post_time[0])
# Do statistics
sig_units, stats, p_values = compute_comparison_statistics(baseline_counts, spike_counts, test='signrank', alpha=alpha)
significant_units = cluster_ids[sig_units]
return significant_units, stats, p_values, cluster_ids
[docs]
def differentiate_units(spike_times, spike_clusters, event_times, event_groups,
pre_time=0, post_time=0.5, test='ranksums', alpha=0.05, fdr_corr=False):
"""
Determine units which significantly differentiate between two task events
(e.g. stimulus left/right) by performing a statistical test between the spike rates
elicited by the two events. Default is a Wilcoxon Rank Sum test.
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D array
times (in seconds) of the events from the two groups
event_groups : 1D array
group identities of the events as either 0 or 1
pre_time : float
time (in seconds) to precede the event times to get the baseline
post_time : float
time (in seconds) to follow the event times
test : string
which statistical test to use, options are:
'ranksums' Wilcoxon Rank Sums test
'signrank' Wilcoxon Signed Rank test (for paired observations)
'ttest' independent samples t-test
'paired_ttest' paired t-test
alpha : float
alpha to use for statistical significance
fdr_corr : boolean
whether to use an FDR correction (Benjamin-Hochmann) to correct for multiple testing
Returns
-------
significant_units : 1D array
an array with the indices of clusters that are significatly modulated
stats : 1D array
the statistic of the test that was performed
p_values : 1D array
the p-values of all the clusters
cluster_ids : ndarray
cluster ids of the p-values
"""
# Check input
assert test in ['ranksums', 'signrank', 'ttest', 'paired_ttest']
if (test == 'signrank') or (test == 'paired_ttest'):
assert np.sum(event_groups == 0) == np.sum(event_groups == 1), \
'For paired tests the number of events in both groups needs to be the same'
# Get spike counts for the two events
times_1 = np.column_stack(((event_times[event_groups == 0] - pre_time),
(event_times[event_groups == 0] + post_time)))
counts_1, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters, times_1)
times_2 = np.column_stack(((event_times[event_groups == 1] - pre_time),
(event_times[event_groups == 1] + post_time)))
counts_2, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters, times_2)
# Do statistics
sig_units, stats, p_values = compute_comparison_statistics(counts_1, counts_2, test=test, alpha=alpha)
significant_units = cluster_ids[sig_units]
return significant_units, stats, p_values, cluster_ids
[docs]
def compute_comparison_statistics(value1, value2, test='ranksums', alpha=0.05, fdr_corr=False):
"""
Compute statistical test between two arrays
Parameters
----------
value1 : 1D array
first array of values to compare
value2 : 1D array
second array of values to compare
test : string
which statistical test to use, options are:
'ranksums' Wilcoxon Rank Sums test
'signrank' Wilcoxon Signed Rank test (for paired observations)
'ttest' independent samples t-test
'paired_ttest' paired t-test
alpha : float
alpha to use for statistical significance
fdr_corr : boolean
whether to use an FDR correction (Benjamin-Hochmann) to correct for multiple testing
Returns
-------
significant_units : 1D array
an array with the indices of values that are significatly modulated
stats : 1D array
the statistic of the test that was performed
p_values : 1D array
the p-values of all the values
"""
p_values = np.empty(len(value1))
stats = np.empty(len(value1))
for i in range(len(value1)):
if test == 'signrank':
if np.sum(value1[i, :] - value2[i, :]) == 0:
p_values[i] = 1
stats[i] = 0
else:
stats[i], p_values[i] = wilcoxon(value1[i, :], value2[i, :])
else:
if (np.sum(value1[i, :]) == 0) and (np.sum(value2[i, :]) == 0):
p_values[i] = 1
stats[i] = 0
else:
if test == 'ranksums':
stats[i], p_values[i] = ranksums(value1[i, :], value2[i, :])
elif test == 'ttest':
stats[i], p_values[i] = ttest_ind(value1[i, :], value2[i, :])
elif test == 'paired_ttest':
stats[i], p_values[i] = ttest_rel(value1[i, :], value2[i, :])
# Perform Benjamin-Hochmann FDR correction for multiple testing
if fdr_corr:
sig_units, p_values, _, _ = multipletests(p_values, alpha, method='fdr_bh')
else:
sig_units = p_values < alpha
return sig_units, stats, p_values
[docs]
def roc_single_event(spike_times, spike_clusters, event_times,
pre_time=[0.5, 0], post_time=[0, 0.5]):
"""
Determine how well neurons respond to a certain task event by calculating the area under the
ROC curve between a baseline period before the event and a period after the event.
Values of > 0.5 indicate the neuron respons positively to the event and < 0.5 indicate
a negative response.
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D array
times (in seconds) of the events from the two groups
pre_time : two-element array
time (in seconds) preceding the event to get the baseline (e.g. [0.5, 0.2] would be a
window starting 0.5 seconds before the event and ending at 0.2 seconds before the event)
post_time : two-element array
time (in seconds) to follow the event times
Returns
-------
auc_roc : 1D array
the area under the ROC curve
cluster_ids : 1D array
cluster ids of the p-values
"""
# Get spike counts for baseline and event timewindow
baseline_times = np.column_stack(((event_times - pre_time[0]), (event_times - pre_time[1])))
baseline_counts, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters,
baseline_times)
times = np.column_stack(((event_times + post_time[0]), (event_times + post_time[1])))
spike_counts, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters, times)
# Calculate area under the ROC curve per neuron
auc_roc = np.empty(spike_counts.shape[0])
for i in range(spike_counts.shape[0]):
auc_roc[i] = roc_auc_score(np.concatenate((np.zeros(baseline_counts.shape[1]),
np.ones(spike_counts.shape[1]))),
np.concatenate((baseline_counts[i, :], spike_counts[i, :])))
return auc_roc, cluster_ids
[docs]
def roc_between_two_events(spike_times, spike_clusters, event_times, event_groups,
pre_time=0, post_time=0.25):
"""
Calcluate area under the ROC curve that indicates how well the activity of the neuron
distiguishes between two events (e.g. movement to the right vs left). A value of 0.5 indicates
the neuron cannot distiguish between the two events. A value of 0 or 1 indicates maximum
distinction. Significance is determined by bootstrapping the ROC curves. If 0.5 is not
included in the 95th percentile of the bootstrapped distribution, the neuron is deemed
to be significant.
Parameters
----------
spike_times : 1D array
spike times (in seconds)
spike_clusters : 1D array
cluster ids corresponding to each event in `spikes`
event_times : 1D array
times (in seconds) of the events from the two groups
event_groups : 1D array
group identities of the events as either 0 or 1
pre_time : float
time (in seconds) to precede the event times
post_time : float
time (in seconds) to follow the event times
Returns
-------
auc_roc : 1D array
an array of the area under the ROC curve for every neuron
cluster_ids : 1D array
cluster ids of the AUC values
"""
# Get spike counts
times = np.column_stack(((event_times - pre_time), (event_times + post_time)))
spike_counts, cluster_ids = get_spike_counts_in_bins(spike_times, spike_clusters, times)
# Calculate area under the ROC curve per neuron
auc_roc = np.empty(spike_counts.shape[0])
for i in range(spike_counts.shape[0]):
auc_roc[i] = roc_auc_score(event_groups, spike_counts[i, :])
return auc_roc, cluster_ids
def _get_biased_probs(n: int, idx: int = -1, prob: float = 0.5) -> list:
n_1 = n - 1
z = n_1 + prob
p = [1 / z] * (n_1 + 1)
p[idx] *= prob
return p
def _draw_contrast(
contrast_set: list, prob_type: str = "biased", idx: int = -1, idx_prob: float = 0.5
) -> float:
if prob_type in ["non-uniform", "biased"]:
p = _get_biased_probs(len(contrast_set), idx=idx, prob=idx_prob)
return np.random.choice(contrast_set, p=p)
elif prob_type == "uniform":
return np.random.choice(contrast_set)
def _draw_position(position_set, stim_probability_left):
return int(
np.random.choice(
position_set, p=[stim_probability_left, 1 - stim_probability_left]
)
)
[docs]
def generate_pseudo_blocks(n_trials, factor=60, min_=20, max_=100, first5050=90):
"""
Generate a pseudo block structure
Parameters
----------
n_trials : int
how many trials to generate
factor : int
factor of the exponential
min_ : int
minimum number of trials per block
max_ : int
maximum number of trials per block
first5050 : int
amount of trials with 50/50 left right probability at the beginning
Returns
---------
probabilityLeft : 1D array
array with probability left per trial
"""
block_ids = []
while len(block_ids) < n_trials:
x = np.random.exponential(factor)
while (x <= min_) | (x >= max_):
x = np.random.exponential(factor)
if (len(block_ids) == 0) & (np.random.randint(2) == 0):
block_ids += [0.2] * int(x)
elif (len(block_ids) == 0):
block_ids += [0.8] * int(x)
elif block_ids[-1] == 0.2:
block_ids += [0.8] * int(x)
elif block_ids[-1] == 0.8:
block_ids += [0.2] * int(x)
return np.array([0.5] * first5050 + block_ids[:n_trials - first5050])
[docs]
def generate_pseudo_stimuli(n_trials, contrast_set=[0, 0.06, 0.12, 0.25, 1], first5050=90):
"""
Generate a block structure with stimuli
Parameters
----------
n_trials : int
number of trials to generate
contrast_set : 1D array
the contrasts that are presented. The default is [0.06, 0.12, 0.25, 1].
first5050 : int
Number of 50/50 trials at the beginning of the session. The default is 90.
Returns
-------
p_left : 1D array
probability of left stimulus
contrast_left : 1D array
contrast on the left
contrast_right : 1D array
contrast on the right
"""
# Initialize vectors
contrast_left = np.empty(n_trials)
contrast_left[:] = np.nan
contrast_right = np.empty(n_trials)
contrast_right[:] = np.nan
# Generate block structure
p_left = generate_pseudo_blocks(n_trials, first5050=first5050)
for i in range(n_trials):
# Draw position and contrast for this trial
position = _draw_position([-1, 1], p_left[i])
contrast = _draw_contrast(contrast_set, 'uniform')
# Add to trials
if position == -1:
contrast_left[i] = contrast
elif position == 1:
contrast_right[i] = contrast
return p_left, contrast_left, contrast_right
[docs]
def generate_pseudo_session(trials, generate_choices=True, contrast_distribution='non-uniform'):
"""
Generate a complete pseudo session with biased blocks, all stimulus contrasts, choices and
rewards and omissions. Biased blocks and stimulus contrasts are generated using the same
statistics as used in the actual task. The choices of the animal are generated using the
actual psychometrics of the animal in the session. For each synthetic trial the choice is
determined by drawing from a Bernoulli distribution that is biased according to the proportion
of times the animal chose left for the stimulus contrast, side, and block probability.
No-go trials are ignored in the generating of the synthetic choices.
Parameters
----------
trials : DataFrame
Pandas dataframe with columns as trial vectors loaded using ONE
generate_choices : bool
whether to generate the choices (runs faster without)
contrast_distribution: str ['uniform', 'non-uniform']
the absolute contrast distribution.
If uniform, the zero contrast is as likely as other contrasts: BiasedChoiceWorld task
If 'non-uniform', the zero contrast is half as likely to occur: EphysChoiceWorld task
('biased' is kept for compatibility, but is deprecated as it is confusing)
Returns
-------
pseudo_trials : DataFrame
a trials dataframe with synthetically generated trials
"""
# Get contrast set presented to the animal
contrast_set = np.unique(trials['contrastLeft'][~np.isnan(trials['contrastLeft'])])
signed_contrast = trials['contrastRight'].copy()
signed_contrast[np.isnan(signed_contrast)] = -trials['contrastLeft'][
~np.isnan(trials['contrastLeft'])]
# Generate synthetic session
pseudo_trials = pd.DataFrame()
pseudo_trials['probabilityLeft'] = generate_pseudo_blocks(trials.shape[0])
# For each trial draw stimulus contrast and side and generate a synthetic choice
for i in range(pseudo_trials.shape[0]):
# Draw position and contrast for this trial
position = _draw_position([-1, 1], pseudo_trials['probabilityLeft'][i])
contrast = _draw_contrast(contrast_set, prob_type=contrast_distribution, idx=np.where(contrast_set == 0)[0][0])
signed_stim = contrast * np.sign(position)
if generate_choices:
# Generate synthetic choice by drawing from Bernoulli distribution
trial_select = ((signed_contrast == signed_stim) & (trials['choice'] != 0)
& (trials['probabilityLeft'] == pseudo_trials['probabilityLeft'][i]))
p_right = (np.sum(trials['choice'][trial_select] == 1)
/ trials['choice'][trial_select].shape[0])
this_choice = [-1, 1][np.random.binomial(1, p_right)]
# Add to trials
if position == -1:
pseudo_trials.loc[i, 'contrastLeft'] = contrast
if this_choice == -1:
pseudo_trials.loc[i, 'feedbackType'] = -1
elif this_choice == 1:
pseudo_trials.loc[i, 'feedbackType'] = 1
elif position == 1:
pseudo_trials.loc[i, 'contrastRight'] = contrast
if this_choice == -1:
pseudo_trials.loc[i, 'feedbackType'] = 1
elif this_choice == 1:
pseudo_trials.loc[i, 'feedbackType'] = -1
pseudo_trials.loc[i, 'choice'] = this_choice
else:
if position == -1:
pseudo_trials.loc[i, 'contrastLeft'] = contrast
elif position == 1:
pseudo_trials.loc[i, 'contrastRight'] = contrast
pseudo_trials.loc[i, 'stim_side'] = position
pseudo_trials['signed_contrast'] = pseudo_trials['contrastRight']
pseudo_trials.loc[pseudo_trials['signed_contrast'].isnull(),
'signed_contrast'] = -pseudo_trials['contrastLeft']
return pseudo_trials
[docs]
def get_impostor_target(targets, labels, current_label=None,
seed_idx=None, verbose=False):
"""
Generate impostor targets by selecting from a list of current targets of variable length.
Targets are selected and stitched together to the length of the current labeled target,
aka 'Frankenstein' targets, often used for evaluating a null distribution while decoding.
Parameters
----------
targets : list of all targets
targets may be arrays of any dimension (a,b,...,z)
but must have the same shape except for the last dimension, z. All targets must
have z > 0.
labels : numpy array of strings
labels corresponding to each target e.g. session eid.
only targets with unique labels are used to create impostor target. Typically,
use eid as the label because each eid has a unique target.
current_label : string
targets with the current label are not used to create impostor
target. Size of corresponding target is used to determine size of impostor
target. If None, a random selection from the set of unique labels is used.
Returns
--------
impostor_final : numpy array, same shape as all targets except last dimension
"""
np.random.seed(seed_idx)
unique_labels, unique_label_idxs = np.unique(labels, return_index=True)
unique_targets = [targets[unique_label_idxs[i]] for i in range(len(unique_label_idxs))]
if current_label is None:
current_label = np.random.choice(unique_labels)
avoid_same_label = ~(unique_labels == current_label)
# current label must correspond to exactly one unique label
assert len(np.nonzero(~avoid_same_label)[0]) == 1
avoided_index = np.nonzero(~avoid_same_label)[0][0]
nonavoided_indices = np.nonzero(avoid_same_label)[0]
ntargets = len(nonavoided_indices)
all_impostor_targets = [unique_targets[nonavoided_indices[i]] for i in range(ntargets)]
all_impostor_sizes = np.array([all_impostor_targets[i].shape[-1] for i in range(ntargets)])
current_target_size = unique_targets[avoided_index].shape[-1]
if verbose:
print('impostor target has length %s' % (current_target_size))
assert np.min(all_impostor_sizes) > 0 # all targets must be nonzero in size
max_needed_to_tile = int(np.max(all_impostor_sizes) / np.min(all_impostor_sizes)) + 1
tile_indices = np.random.choice(np.arange(len(all_impostor_targets), dtype=int),
size=max_needed_to_tile,
replace=False)
impostor_tiles = [all_impostor_targets[tile_indices[i]] for i in range(len(tile_indices))]
impostor_tile_sizes = all_impostor_sizes[tile_indices]
if verbose:
print('Randomly chose %s targets to tile the impostor target' % (max_needed_to_tile))
print('with the following sizes:', impostor_tile_sizes)
number_of_tiles_needed = np.sum(np.cumsum(impostor_tile_sizes) < current_target_size) + 1
impostor_tiles = impostor_tiles[:number_of_tiles_needed]
if verbose:
print('%s of %s needed to tile the entire impostor target' % (number_of_tiles_needed,
max_needed_to_tile))
impostor_stitch = np.concatenate(impostor_tiles, axis=-1)
start_ind = np.random.randint((impostor_stitch.shape[-1] - current_target_size) + 1)
impostor_final = impostor_stitch[..., start_ind:start_ind + current_target_size]
if verbose:
print('%s targets stitched together with shift of %s\n' % (number_of_tiles_needed,
start_ind))
np.random.seed(None) # reset numpy seed to None
return impostor_final