Source code for ibllib.dsp.utils

"""
Window generator, front detections, rms
"""
import numpy as np
from scipy import interpolate
from ibllib.misc import print_progress


[docs]def sync_timestamps(tsa, tsb, tbin=0.1, return_indices=False): """ Sync two arrays of time stamps :param tsa: vector of timestamps :param tsb: vector of timestamps :param tbin: time bin length :param return_indices (bool), if True returns 2 sets of indices for tsa and tsb with identified matches :return: function: interpolation function such as fnc(tsa) = tsb float: drift in ppm numpy array: of indices ia numpy array: of indices ib """ def _interp_fcn(tsa, tsb, ib): # now compute the bpod/fpga drift and precise time shift drift_ppm = np.polyfit(tsa[ib >= 0], tsb[ib[ib >= 0]] - tsa[ib >= 0], 1)[0] * 1e6 fcn_a2b = interpolate.interp1d(tsa[ib >= 0], tsb[ib[ib >= 0]], fill_value="extrapolate") return fcn_a2b, drift_ppm # assert sorted inputs tmin = np.min([np.min(tsa), np.min(tsb)]) tmax = np.max([np.max(tsa), np.max(tsb)]) # brute force correlation to get an estimate of the delta_t between series x = np.zeros(int(np.ceil(tmax - tmin) / tbin)) y = np.zeros_like(x) x[np.int32(np.floor((tsa - tmin) / tbin))] = 1 y[np.int32(np.floor((tsb - tmin) / tbin))] = 1 delta_t = (parabolic_max(np.correlate(x, y, mode='full'))[0] - x.shape[0] + 1) * tbin # do a first assignment at a DT threshold ib = np.zeros(tsa.shape, dtype=np.int32) - 1 threshold = tbin for m in np.arange(tsa.shape[0]): dt = np.abs(tsa[m] - delta_t - tsb) inds = np.where(dt < threshold)[0] if inds.size == 1: ib[m] = inds[0] elif inds.size > 1: candidates = inds[~np.isin(inds, ib[:m])] if candidates.size == 1: ib[m] = candidates[0] elif candidates.size > 1: ib[m] = inds[np.argmin(dt[inds])] fcn_a2b, _ = _interp_fcn(tsa, tsb, ib) # do a second assignment - this time a full matrix of candidate matches is computed # the most obvious matches are assigned first and then one by one iamiss = np.where(ib < 0)[0] ibmiss = np.setxor1d(np.arange(tsb.size), ib[ib >= 0]) dt = np.abs(fcn_a2b(tsa[iamiss]) - tsb[ibmiss][:, np.newaxis]) dt[dt > tbin] = np.nan while ~np.all(np.isnan(dt)): _b, _a = np.unravel_index(np.nanargmin(dt), dt.shape) ib[iamiss[_a]] = ibmiss[_b] dt[:, _a] = np.nan dt[_b, :] = np.nan fcn_a2b, drift_ppm = _interp_fcn(tsa, tsb, ib) if return_indices: return fcn_a2b, drift_ppm, np.where(ib >= 0)[0], ib[ib >= 0] else: return fcn_a2b, drift_ppm
[docs]def parabolic_max(x): """ Maximum picking with parabolic interpolation around the maxima :param x: 1d or 2d array :return: interpolated max index, interpolated max """ # for 2D arrays, operate along the last dimension ns = x.shape[-1] axis = -1 imax = np.argmax(x, axis=axis) if x.ndim == 1: v010 = x[np.maximum(np.minimum(imax + np.array([-1, 0, 1]), ns - 1), 0)] v010 = v010[:, np.newaxis] else: v010 = np.vstack((x[..., np.arange(x.shape[0]), np.maximum(imax - 1, 0)], x[..., np.arange(x.shape[0]), imax], x[..., np.arange(x.shape[0]), np.minimum(imax + 1, ns - 1)])) poly = np.matmul(.5 * np.array([[1, -2, 1], [-1, 0, 1], [0, 2, 0]]), v010) ipeak = - poly[1] / (poly[0] + np.double(poly[0] == 0)) / 2 maxi = poly[2] + ipeak * poly[1] + ipeak ** 2. * poly[0] ipeak += imax # handle edges iedges = np.logical_or(imax == 0, imax == ns - 1) if x.ndim == 1: maxi = v010[1, 0] if iedges else maxi[0] ipeak = imax if iedges else ipeak[0] else: maxi[iedges] = v010[1, iedges] ipeak[iedges] = imax[iedges] return ipeak, maxi
def _fcn_extrap(x, f, bounds): """ Extrapolates a flat value before and after bounds x: array to be filtered f: function to be applied between bounds (cf. fcn_cosine below) bounds: 2 elements list or np.array """ y = f(x) y[x < bounds[0]] = f(bounds[0]) y[x > bounds[1]] = f(bounds[1]) return y
[docs]def fcn_cosine(bounds): """ Returns a soft thresholding function with a cosine taper: values <= bounds[0]: values values < bounds[0] < bounds[1] : cosine taper values < bounds[1]: bounds[1] :param bounds: :return: lambda function """ def _cos(x): return (1 - np.cos((x - bounds[0]) / (bounds[1] - bounds[0]) * np.pi)) / 2 func = lambda x: _fcn_extrap(x, _cos, bounds) # noqa return func
[docs]def fronts(x, axis=-1, step=1): """ Detects Rising and Falling edges of a voltage signal, returns indices and :param x: array on which to compute RMS :param axis: (optional, -1) negative value :param step: (optional, 1) value of the step to detect :return: numpy array of indices, numpy array of rises (1) and falls (-1) """ d = np.diff(x, axis=axis) ind = np.array(np.where(np.abs(d) >= step)) sign = d[tuple(ind)] ind[axis] += 1 if len(ind) == 1: return ind[0], sign else: return ind, sign
[docs]def falls(x, axis=-1, step=-1): """ Detects Falling edges of a voltage signal, returns indices :param x: array on which to compute RMS :param axis: (optional, -1) negative value :param step: (optional, -1) value of the step to detect :return: numpy array """ return rises(-x, axis=axis, step=-step)
[docs]def rises(x, axis=-1, step=1): """ Detect Rising edges of a voltage signal, returns indices :param x: array on which to compute RMS :param axis: (optional, -1) :param step: (optional, 1) amplitude of the step to detect :return: numpy array """ ind = np.array(np.where(np.diff(x, axis=axis) >= step)) ind[axis] += 1 if len(ind) == 1: return ind[0] else: return ind
[docs]def rms(x, axis=-1): """ Root mean square of array along axis :param x: array on which to compute RMS :param axis: (optional, -1) :return: numpy array """ return np.sqrt(np.mean(x ** 2, axis=axis))
[docs]class WindowGenerator(object): """ `wg = WindowGenerator(ns, nswin, overlap)` Provide sliding windows indices generator for signal processing applications. For straightforward spectrogram / periodogram implementation, prefer scipy methods ! Example of implementations in test_dsp.py. """ def __init__(self, ns, nswin, overlap): """ :param ns: number of sample of the signal along the direction to be windowed :param nswin: number of samples of the window :return: dsp.WindowGenerator object: """ self.ns = int(ns) self.nswin = int(nswin) self.overlap = int(overlap) self.nwin = int(np.ceil(float(ns - nswin) / float(nswin - overlap))) + 1 self.iw = None @property def firstlast(self): """ Generator that yields first and last index of windows :return: tuple of [first_index, last_index] of the window """ self.iw = 0 first = 0 while True: last = first + self.nswin last = min(last, self.ns) yield (first, last) if last == self.ns: break first += self.nswin - self.overlap self.iw += 1 @property def slice(self): """ Generator that yields slices of windows :return: a slice of the window """ for first, last in self.firstlast: yield slice(first, last)
[docs] def slice_array(self, sig, axis=-1): """ Provided an array or sliceable object, generator that yields slices corresponding to windows. Especially useful when working on memmpaps :param sig: array :param axis: (optional, -1) dimension along which to provide the slice :return: array slice Generator """ for first, last in self.firstlast: yield np.take(sig, np.arange(first, last), axis=axis)
[docs] def tscale(self, fs): """ Returns the time scale associated with Window slicing (middle of window) :param fs: sampling frequency (Hz) :return: time axis scale """ return np.array([(first + (last - first - 1) / 2) / fs for first, last in self.firstlast])
[docs] def print_progress(self): """ Prints progress using a terminal progress bar """ print_progress(self.iw, self.nwin)