from brainbox import processing, core
import unittest
import numpy as np
[docs]
class TestProcessing(unittest.TestCase):
[docs]
def test_sync(self):
# Test casting non-uniformly-sampled data to a evenly-sampled TimeSeries.
# Begin by defining sampling intervals of random half-normally distributed length
times = np.cumsum(np.abs(np.random.normal(loc=4., scale=6., size=100)))
# take sample values as though the value was increasing as a cube of sample time
samples = times**3
# Use cubic interpolation to resample to uniform interval
cubes = core.TimeSeries(times=times, values=samples, columns=('cubic',))
resamp = processing.sync(0.1, timeseries=cubes, interp='cubic', fillval='extrapolate')
# Check that the sync function is returning a new time series object
self.assertIsInstance(resamp, core.TimeSeries)
# Test that all returned sample times are uniformly spaced
# We need to use np.isclose because of floating point arithematic problems instead of ==0.1
# Since the actual diff returns 0.09999999999999964
self.assertTrue(np.all(np.isclose(np.diff(resamp.times), 0.1)))
# Check that we're within a margin of error on the interpolation
err_margin = 1e-3 # Maximum percent error allowed
err_percs = np.abs(resamp.times**3 - resamp.values.T) / (resamp.times**3)
self.assertTrue(np.all(err_percs < err_margin))
# Make a second timeseries of square-law increasing samples
times2 = np.cumsum(np.abs(np.random.normal(loc=2., scale=1., size=200)))
samples2 = times2**2
squares = core.TimeSeries(times=times2, values=samples2, columns=('square',))
# Use cubic interpolation again, this time on both timeseries
resamp2 = processing.sync(0.1, timeseries=[squares, cubes], interp='cubic',
fillval='extrapolate')
# Check that the new TS has both squares and cubes as keys and attribs
self.assertTrue(hasattr(resamp2, 'cubic'))
self.assertTrue(hasattr(resamp2, 'square'))
# Check that both timeseries are fully contained in the resampled TS
self.assertTrue(cubes.times.min() >= resamp2.times.min())
self.assertTrue(cubes.times.max() <= resamp2.times.max())
self.assertTrue(squares.times.min() >= resamp2.times.min())
self.assertTrue(squares.times.max() <= resamp2.times.max())
# Check that all interpolated values are within the margin of error against the known func
sq_errperc = np.abs(resamp2.times**2 - resamp2.square) / resamp2.times**2
cu_errperc = np.abs(resamp2.times**3 - resamp2.cubic) / resamp2.times**3
self.assertTrue(np.all(sq_errperc < err_margin) & np.all(cu_errperc < err_margin))
# Now check the numpy array behavior of sync.
# Try running sync on the cubic times and values only.
resamp = processing.sync(0.1, times=times, values=samples, interp='cubic',
fillval='extrapolate')
# Do all the tests we did for the instance created using TimeSeries objects
self.assertTrue(isinstance(resamp, core.TimeSeries))
self.assertTrue(np.all(np.isclose(np.diff(resamp.times), 0.1)))
err_margin = 1e-3 # Maximum percent error allowed
err_percs = np.abs(resamp.times**3 - resamp.values.T) / (resamp.times**3)
self.assertTrue(np.all(err_percs < err_margin))
# Try the multiple-arrays case in which we pass two times and two values
resamp2 = processing.sync(0.1, times=(times, times2), values=(samples, samples2),
interp='cubic', fillval='extrapolate')
self.assertTrue(times.min() >= resamp2.times.min())
self.assertTrue(times.max() <= resamp2.times.max())
self.assertTrue(times2.min() >= resamp2.times.min())
self.assertTrue(times2.max() <= resamp2.times.max())
[docs]
def test_compute_cluster_averag(self):
# Create fake data for 3 clusters
clust1 = np.ones(40)
clust1_vals = np.ones(40) * 200
clust2 = 2 * np.ones(40)
clust2_vals = np.r_[np.ones(20) * 300, np.ones(20) * 500]
clust100 = 100 * np.ones(50)
clust100_vals = np.r_[np.ones(25) * 0.5, np.ones(25) * 1.0]
# Concatenate data for 3 clusters together
spike_clust = np.r_[clust1, clust2, clust100]
spike_val = np.r_[clust1_vals, clust2_vals, clust100_vals]
# Shuffle the data to make order random
ind = np.arange(len(spike_clust))
np.random.shuffle(ind)
spike_clust = spike_clust[ind]
spike_val = spike_val[ind]
# Make sure the data you have created is correct dimension
self.assertEqual(len(spike_clust), len(spike_val))
# Compute the average value across clusters
clust, avg_val, count = processing.compute_cluster_average(spike_clust, spike_val)
# Check output is as expected
self.assertTrue(np.all(clust == (1, 2, 100)))
self.assertEqual(avg_val[0], 200)
self.assertEqual(avg_val[1], 400)
self.assertEqual(avg_val[2], 0.75)
self.assertTrue(np.all(count == (40, 40, 50)))
if __name__ == '__main__':
np.random.seed(0)
unittest.main(exit=False)