import hashlib
from typing import TypeVar, Sequence, Union, Optional, Type
import uuid
import numpy as np
D = TypeVar("D", bound=np.generic)
Array = Union[np.ndarray, Sequence]
[docs]
def between_sorted(sorted_v, bounds=None):
"""
Given a vector of sorted values, returns a boolean vector indices True when the value
is between bounds. If multiple bounds are given, returns the equivalent OR of individual
bounds tuple
Especially useful for spike times
indices = between_sorted(spike_times, [tstart, tstop])
:param sorted_v: vector containing sorted values (won't check)
:param bounds: minimum included value and maximum included value
can be a list[tstart, tstop] or an array of dimension (n, 2)
:return:
"""
bounds = np.array(bounds)
starts, stops = (np.take(bounds, 0, axis=-1), np.take(bounds, 1, axis=-1))
sbounds = np.logical_and(starts <= sorted_v[-1], stops >= sorted_v[0])
starts = starts[sbounds]
stops = stops[sbounds]
sel = sorted_v * 0
sel[np.searchsorted(sorted_v, starts)] = 1
istops = np.searchsorted(sorted_v, stops, side="right")
sel[istops[istops < sorted_v.size]] += -1
return np.cumsum(sel).astype(bool)
[docs]
def hash_uuids(uuids, algo="sha256"):
"""
Returns a hash of an array or list of UUID strings
:param uuids: collection of UUID strings: list | np.ndarray | pd.Series | pd.Index
:param algo: hashing algorithm to use, default is sha256
"""
hash = hashlib.new(algo)
for cid in uuids:
_uuid = cid if isinstance(cid, uuid.UUID) else uuid.UUID(hex=cid)
hash.update(_uuid.bytes)
return hash.hexdigest()
[docs]
def ismember(a, b):
"""
equivalent of np.isin but returns indices as in the matlab ismember function
returns an array containing logical 1 (true) where the data in A is B
also returns the location of members in b such as a[lia] == b[locb]
:param a: 1d - array
:param b: 1d - array
:return: isin, locb
"""
lia = np.isin(a, b)
aun, _, iuainv = np.unique(a[lia], return_index=True, return_inverse=True)
_, ibu, iau = np.intersect1d(b, aun, return_indices=True)
locb = ibu[iuainv]
return lia, locb
[docs]
def ismember2d(a, b):
"""
Equivalent of np.isin but returns indices as in the matlab ismember function
returns an array containing logical 1 (true) where the data in A is B
also returns the location of members in b such as a[lia, :] == b[locb, :]
:param a: 2d array
:param b: 2d array
:return: isin, locb
"""
from numba import jit
@jit(nopython=True)
def find_first_2d(mat, val):
"""
Returns first index where
The purpose of this function is performance: uses low level numba and avoids looping
through the full array
:param mat: np.array
:param val: values to search for
:return: index or empty array
"""
for i in np.arange(mat.shape[0]):
if np.all(mat[i] == val):
return i
amask = np.ones(a.shape[0], dtype=bool)
ia = np.zeros(a.shape, dtype=bool)
ib = np.zeros(a.shape, dtype=np.int32) - 1
ina = np.zeros(a.shape[0], dtype=bool)
bind = np.zeros(a.shape[0], dtype=np.int32) - 1
# get a 1d ismember first for each column
for n in np.arange(a.shape[1]):
iaa, ibb = ismember(a[amask, n], b[:, n])
ia[amask, n] = iaa
ib[np.where(amask)[0][iaa], n] = ibb
# those that have at least one mismatch are not in
amask[~np.all(ia, axis=1)] = False
# those that get the same index for all column do not need further testing
ifound = np.where(amask)[0][np.sum(np.abs(np.diff(ib[amask], axis=1)), axis=1) == 0]
ina[ifound] = True
amask[ifound] = False
bind[ifound] = ib[ifound, 0]
# the remaining ones have to be check manually (almost never happens for uuids)
for iaa in np.where(amask)[0]:
ibb = find_first_2d(b, a[iaa, :])
if ibb:
ina[iaa] = True
bind[iaa] = ibb
return ina, bind[ina]
[docs]
def intersect2d(a0, a1, assume_unique=False):
"""
Performs intersection on multiple columns arrays a0 and a1
:param a0:
:param a1:
:param assume_unique: If True, the input arrays are both assumed to be unique,
which can speed up the calculation.
:return: intersection
:return: index of a0 such as intersection = a0[ia, :]
:return: index of b0 such as intersection = b0[ib, :]
"""
_, i0, i1 = np.intersect1d(
a0[:, 0], a1[:, 0], return_indices=True, assume_unique=assume_unique
)
for n in np.arange(1, a0.shape[1]):
_, ii0, ii1 = np.intersect1d(
a0[i0, n], a1[i1, n], return_indices=True, assume_unique=assume_unique
)
i0 = i0[ii0]
i1 = i1[ii1]
return a0[i0, :], i0, i1
[docs]
def bincount2D(x, y, xbin=0, ybin=0, xlim=None, ylim=None, weights=None):
"""
Computes a 2D histogram by aggregating values in a 2D array.
:param x: values to bin along the 2nd dimension (c-contiguous)
:param y: values to bin along the 1st dimension
:param xbin:
scalar: bin size along 2nd dimension
0: aggregate according to unique values
array: aggregate according to exact values (count reduce operation)
:param ybin:
scalar: bin size along 1st dimension
0: aggregate according to unique values
array: aggregate according to exact values (count reduce operation)
:param xlim: (optional) 2 values (array or list) that restrict range along 2nd dimension
:param ylim: (optional) 2 values (array or list) that restrict range along 1st dimension
:param weights: (optional) defaults to None, weights to apply to each value for aggregation
:return: 3 numpy arrays MAP [ny,nx] image, xscale [nx], yscale [ny]
"""
# if no bounds provided, use min/max of vectors
if xlim is None:
xlim = [np.min(x), np.max(x)]
if ylim is None:
ylim = [np.min(y), np.max(y)]
def _get_scale_and_indices(v, bin, lim):
# if bin is a nonzero scalar, this is a bin size: create scale and indices
if np.isscalar(bin) and bin != 0:
scale = np.arange(lim[0], lim[1] + bin / 2, bin)
ind = (np.floor((v - lim[0]) / bin)).astype(np.int64)
# if bin == 0, aggregate over unique values
else:
scale, ind = np.unique(v, return_inverse=True)
return scale, ind
xscale, xind = _get_scale_and_indices(x, xbin, xlim)
yscale, yind = _get_scale_and_indices(y, ybin, ylim)
# aggregate by using bincount on absolute indices for a 2d array
nx, ny = [xscale.size, yscale.size]
ind2d = np.ravel_multi_index(np.c_[yind, xind].transpose(), dims=(ny, nx))
r = np.bincount(ind2d, minlength=nx * ny, weights=weights).reshape(ny, nx)
# if a set of specific values is requested output an array matching the scale dimensions
if not np.isscalar(xbin) and xbin.size > 1:
_, iout, ir = np.intersect1d(xbin, xscale, return_indices=True)
_r = r.copy()
r = np.zeros((ny, xbin.size))
r[:, iout] = _r[:, ir]
xscale = xbin
if not np.isscalar(ybin) and ybin.size > 1:
_, iout, ir = np.intersect1d(ybin, yscale, return_indices=True)
_r = r.copy()
r = np.zeros((ybin.size, r.shape[1]))
r[iout, :] = _r[ir, :]
yscale = ybin
return r, xscale, yscale
[docs]
def rcoeff(x, y):
"""
Computes pairwise Pearson correlation coefficients for matrices.
That is for 2 matrices the same size, computes the row to row coefficients and outputs
a vector corresponding to the number of rows of the first matrix.
If the second array is a vector then computes the correlation coefficient for all rows.
:param x: np array [nc, ns]
:param y: np array [nc, ns] or [ns]
:return: r [nc]
"""
def normalize(z):
mean = np.mean(z, axis=-1)
return z - mean if mean.size == 1 else z - mean[:, np.newaxis]
xnorm = normalize(x)
ynorm = normalize(y)
rcor = np.sum(xnorm * ynorm, axis=-1) / np.sqrt(
np.sum(np.square(xnorm), axis=-1) * np.sum(np.square(ynorm), axis=-1)
)
return rcor
[docs]
def within_ranges(
x: np.ndarray,
ranges: Array,
labels: Optional[Array] = None,
mode: str = "vector",
dtype: Type[D] = "int8",
) -> np.ndarray:
"""
Detects which points of the input vector lie within one of the ranges specified in the ranges.
Returns an array the size of x with a 1 if the corresponding point is within a range.
The function uses a stable sort algorithm (timsort) to find the edges within the input array.
Edge behaviour is inclusive.
Ranges are [(start0, stop0), (start1, stop1), etc.] or n-by-2 numpy array.
The ranges may be optionally assigned a row in 'matrix' mode or a numerical label in 'vector'
mode. Labels must have a length of n. Overlapping ranges have a value that is the sum of the
relevant range labels (ones in 'matrix' mode).
If mode is 'vector' (default) it will give a vector, specifying the range of each point.
If mode is 'matrix' it will give a matrix output where each range is assigned a particular row
index with 1 if the point belongs to that range label. Multiple ranges can be assigned to a
particular row, e.g. [0, 0,1] would give a 2-by-N matrix with the first two ranges in the
first row. Points within more than one range are given a value > 1
Parameters
----------
x : array_like
An array whose points are tested against the ranges. multi-dimensional arrays are
flattened to 1D
ranges : array_like
A list of tuples or N-by-2 array of ranges to test, where N is the number of ranges,
i.e. [[start0, stop0],
[start1, stop1]]
labels : vector, list
If mode is 'vector'; a list of integer labels to demarcate which points lie within each
range. In 'matrix' mode; a list of column indices (ranges can share indices).
The number of labels should match the number of ranges. If None, ones are used for all
ranges.
mode : {'matrix', 'vector'}
The type of output to return. If 'matrix' (default), an N-by-M matrix is returned where N
is the size of x and M corresponds to the max index in labels, e.g. with labels=[0,1,2],
the output matrix would have 3 columns. If 'vector' a vector the size of x is returned.
dtype : str, numeric or boolean type
The data type of the returned array. If type is bool, the labels in vector mode will be
ignored. Default is int8.
Returns
-------
A vector of size like x where zeros indicate that the points do not lie within ranges (
'vector' mode) or a matrix where out.shape[0] == x.size and out.shape[1] == max(labels) + 1.
Examples
-------
# Assert that points in ranges are mutually exclusive
np.all(within_ranges(x, ranges) <= 1)
Tests
-----
>>> import numpy as np
>>> within_ranges(np.arange(11), [(1, 2), (5, 8)])
array([0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0], dtype=int8)
>>> ranges = np.array([[1, 2], [5, 8]])
>>> within_ranges(np.arange(10) + 1, ranges, labels=np.array([0,1]), mode='matrix')
array([[1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 1, 1, 1, 0, 0]], dtype=int8)
>>> within_ranges(np.arange(11), [(1,2), (5,8), (4,6)], labels=[0,1,1], mode='matrix')
array([[0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 2, 2, 1, 1, 0, 0]], dtype=int8)
>>> within_ranges(np.arange(10) + 1, ranges, np.array([3,1]), mode='vector')
array([3, 3, 0, 0, 1, 1, 1, 1, 0, 0], dtype=int8)
>>> within_ranges(np.arange(11), [(1,2), (5,8), (4,6)], dtype=bool)
array([False, True, True, False, True, True, True, True, True,
False, False])
"""
# Flatten
x = x.ravel()
# Ensure ranges are numpy
ranges = np.array(ranges)
# Get size info
n_points = x.size
n_ranges = ranges.shape[0]
if labels is None:
# In 'matrix' mode default row index is 0
labels = np.zeros((n_ranges,), dtype="uint32")
if mode == "vector": # Otherwise default numerical label is 1
labels += 1
assert len(labels) >= n_ranges, "range labels do not match number of ranges"
n_labels = np.unique(labels).size
# If no ranges given, short circuit function and return zeros
if n_ranges == 0:
return np.zeros_like(x, dtype=dtype)
# Check end comes after start in each case
assert np.all(
np.diff(ranges, axis=1) > 0
), "ranges ends must all be greater than starts"
# Make array containing points, starts and finishes
# This order means it will be inclusive
to_sort = np.concatenate((ranges[:, 0], x, ranges[:, 1]))
# worst case O(n*log(n)) but will be better than this as most of the array is ordered;
# memory overhead ~n/2
idx = np.argsort(to_sort, kind="stable")
# Make delta array containing 1 for every start and -1 for every stop
# with one row for each range label
if mode == "matrix":
delta_shape = (n_labels, n_points + 2 * n_ranges)
delta = np.zeros(delta_shape, dtype="int8")
delta[labels, np.arange(n_ranges)] = 1
delta[labels, n_points + n_ranges + np.arange(n_ranges)] = -1
# Arrange in order
delta_sorted = delta[:, idx]
# Take cumulative sums
summed = np.cumsum(delta_sorted, axis=1)
# Reorder back to original order
reordered = np.zeros(delta_shape, dtype=dtype)
reordered[:, idx] = summed.reshape(delta_shape[0], -1)
return reordered[:, np.arange(n_ranges, n_points + n_ranges)]
elif mode == "vector":
delta_shape = (n_points + 2 * n_ranges,)
r_delta = np.zeros(delta_shape, dtype="int32")
r_delta[np.arange(n_ranges)] = labels
r_delta[n_points + n_ranges + np.arange(n_ranges)] = -labels
# Arrange in order
r_delta_sorted = r_delta[idx]
# Take cumulative sum
r_summed = np.cumsum(r_delta_sorted)
# Reorder back to original
r_reordered = np.zeros_like(r_summed, dtype=dtype)
r_reordered[idx] = r_summed
return r_reordered[np.arange(n_ranges, n_points + n_ranges)]
else:
raise ValueError('unknown mode type, options are "matrix" and "vector"')