Source code for brainbox.tests.test_processing

from brainbox import processing, core
import unittest
import numpy as np


[docs] class TestProcessing(unittest.TestCase):
[docs] def test_sync(self): # Test casting non-uniformly-sampled data to a evenly-sampled TimeSeries. # Begin by defining sampling intervals of random half-normally distributed length times = np.cumsum(np.abs(np.random.normal(loc=4., scale=6., size=100))) # take sample values as though the value was increasing as a cube of sample time samples = times**3 # Use cubic interpolation to resample to uniform interval cubes = core.TimeSeries(times=times, values=samples, columns=('cubic',)) resamp = processing.sync(0.1, timeseries=cubes, interp='cubic', fillval='extrapolate') # Check that the sync function is returning a new time series object self.assertIsInstance(resamp, core.TimeSeries) # Test that all returned sample times are uniformly spaced # We need to use np.isclose because of floating point arithematic problems instead of ==0.1 # Since the actual diff returns 0.09999999999999964 self.assertTrue(np.all(np.isclose(np.diff(resamp.times), 0.1))) # Check that we're within a margin of error on the interpolation err_margin = 1e-3 # Maximum percent error allowed err_percs = np.abs(resamp.times**3 - resamp.values.T) / (resamp.times**3) self.assertTrue(np.all(err_percs < err_margin)) # Make a second timeseries of square-law increasing samples times2 = np.cumsum(np.abs(np.random.normal(loc=2., scale=1., size=200))) samples2 = times2**2 squares = core.TimeSeries(times=times2, values=samples2, columns=('square',)) # Use cubic interpolation again, this time on both timeseries resamp2 = processing.sync(0.1, timeseries=[squares, cubes], interp='cubic', fillval='extrapolate') # Check that the new TS has both squares and cubes as keys and attribs self.assertTrue(hasattr(resamp2, 'cubic')) self.assertTrue(hasattr(resamp2, 'square')) # Check that both timeseries are fully contained in the resampled TS self.assertTrue(cubes.times.min() >= resamp2.times.min()) self.assertTrue(cubes.times.max() <= resamp2.times.max()) self.assertTrue(squares.times.min() >= resamp2.times.min()) self.assertTrue(squares.times.max() <= resamp2.times.max()) # Check that all interpolated values are within the margin of error against the known func sq_errperc = np.abs(resamp2.times**2 - resamp2.square) / resamp2.times**2 cu_errperc = np.abs(resamp2.times**3 - resamp2.cubic) / resamp2.times**3 self.assertTrue(np.all(sq_errperc < err_margin) & np.all(cu_errperc < err_margin)) # Now check the numpy array behavior of sync. # Try running sync on the cubic times and values only. resamp = processing.sync(0.1, times=times, values=samples, interp='cubic', fillval='extrapolate') # Do all the tests we did for the instance created using TimeSeries objects self.assertTrue(isinstance(resamp, core.TimeSeries)) self.assertTrue(np.all(np.isclose(np.diff(resamp.times), 0.1))) err_margin = 1e-3 # Maximum percent error allowed err_percs = np.abs(resamp.times**3 - resamp.values.T) / (resamp.times**3) self.assertTrue(np.all(err_percs < err_margin)) # Try the multiple-arrays case in which we pass two times and two values resamp2 = processing.sync(0.1, times=(times, times2), values=(samples, samples2), interp='cubic', fillval='extrapolate') self.assertTrue(times.min() >= resamp2.times.min()) self.assertTrue(times.max() <= resamp2.times.max()) self.assertTrue(times2.min() >= resamp2.times.min()) self.assertTrue(times2.max() <= resamp2.times.max())
[docs] def test_compute_cluster_averag(self): # Create fake data for 3 clusters clust1 = np.ones(40) clust1_vals = np.ones(40) * 200 clust2 = 2 * np.ones(40) clust2_vals = np.r_[np.ones(20) * 300, np.ones(20) * 500] clust100 = 100 * np.ones(50) clust100_vals = np.r_[np.ones(25) * 0.5, np.ones(25) * 1.0] # Concatenate data for 3 clusters together spike_clust = np.r_[clust1, clust2, clust100] spike_val = np.r_[clust1_vals, clust2_vals, clust100_vals] # Shuffle the data to make order random ind = np.arange(len(spike_clust)) np.random.shuffle(ind) spike_clust = spike_clust[ind] spike_val = spike_val[ind] # Make sure the data you have created is correct dimension self.assertEqual(len(spike_clust), len(spike_val)) # Compute the average value across clusters clust, avg_val, count = processing.compute_cluster_average(spike_clust, spike_val) # Check output is as expected self.assertTrue(np.all(clust == (1, 2, 100))) self.assertEqual(avg_val[0], 200) self.assertEqual(avg_val[1], 400) self.assertEqual(avg_val[2], 0.75) self.assertTrue(np.all(count == (40, 40, 50)))
if __name__ == '__main__': np.random.seed(0) unittest.main(exit=False)