# Mock dataset
import unittest
from tempfile import TemporaryDirectory
import numpy as np
import scipy.signal
from one.api import ONE
import neuropixel
from ibldsp import voltage
from ibllib.ephys import ephysqc, spikes
from ibllib.tests import TEST_DB
from ibllib.tests.fixtures import utils
[docs]
def a_little_spike(nsw=121, nc=1):
# creates a kind of waveform that resembles a spike
wav = np.zeros(nsw)
wav[0] = 1
wav[5] = -0.1
wav[10] = -0.3
wav[15] = -0.1
sos = scipy.signal.butter(N=3, Wn=.15, output='sos')
spike = scipy.signal.sosfilt(sos, wav)
spike = - spike / np.max(spike)
if nc > 1:
spike = spike[:, np.newaxis] * scipy.signal.hamming(nc)[np.newaxis, :]
return spike
[docs]
def make_synthetic_data(ns=10000, nc=384, nss=121, ncs=21, nspikes=1200, tr=None, sample=None):
if tr is None:
tr = np.random.randint(np.ceil(ncs / 2), nc - np.ceil(ncs / 2), nspikes)
if sample is None:
sample = np.random.randint(np.ceil(nss / 2), ns - np.ceil(nss / 2), nspikes)
h = neuropixel.trace_header(1)
icsmid = int(np.floor(ncs / 2))
issmid = int(np.floor(nss / 2))
template = a_little_spike(121)
data = np.zeros((ns, nc))
for m in np.arange(tr.size):
itr = np.arange(tr[m] - icsmid, tr[m] + icsmid + 1)
iss = np.arange(sample[m] - issmid, sample[m] + issmid + 1)
offset = np.abs(h['x'][itr[icsmid]] + 1j * h['y'][itr[icsmid]] - h['x'][itr] - 1j * h['y'][itr])
ampfac = 1 / (offset + 10) ** 1.3
ampfac = ampfac / np.max(ampfac)
tmp = template[:, np.newaxis] * ampfac[np.newaxis, :]
data[slice(iss[0], iss[-1] + 1), slice(itr[0], itr[-1] + 1)] += tmp
return data
[docs]
def synthetic_with_bad_channels():
np.random.seed(12345)
ns, nc, fs = (30000, 384, 30000)
data = make_synthetic_data(ns=ns, nc=nc) * 1e-6 * 50
st = np.round(np.cumsum(- np.log(np.random.rand(int(ns / fs * 50 * 1.5))) / 50) * fs)
st = st[st < ns].astype(np.int32)
stripes = np.zeros(ns)
stripes[st] = 1
stripes = scipy.signal.convolve(stripes, scipy.signal.ricker(1200, 40), 'same') * 1e-6 * 2500
data = data + stripes[:, np.newaxis]
noise = np.random.randn(*data.shape) * 1e-6 * 10
channels = {'idead': [29, 36, 39, 40, 191], 'inoisy': [133, 235], 'ioutside': np.arange(275, 384)}
data[:, channels['idead']] = data[:, channels['idead']] / 20
noise[:, channels['inoisy']] = noise[:, channels['inoisy']] * 200
data[:, channels['idead']] = data[:, channels['idead']] / 20
data[:, channels['ioutside']] = 0
data += noise
return data, channels
[docs]
class TestNeuropixel(unittest.TestCase):
"""Comprehensive tests about geometry are run as part of the spikeglx reader testing suite"""
[docs]
def test_layouts(self):
dense = neuropixel.dense_layout()
assert set(dense.keys()) == set(['x', 'y', 'row', 'col', 'ind', 'shank'])
xu = np.unique(dense['x'])
yu = np.unique(dense['y'])
assert np.all(np.diff(xu) == 16)
assert np.all(np.diff(yu) == 20)
assert xu.size == 4 and yu.size == 384 / 2
[docs]
class TestFpgaTask(unittest.TestCase):
[docs]
def test_impeccable_dataset(self):
fpga2bpod = np.array([11 * 1e-6, -20]) # bpod starts 20 secs before with 10 ppm drift
fpga_trials = {
'intervals': np.array([[0, 9.5], [10, 19.5]]),
'stimOn_times': np.array([2, 12]),
'goCue_times': np.array([2.0001, 12.0001]),
'stimFreeze_times': np.array([4., 14.]),
'feedback_times': np.array([4.0001, 14.0001]),
'errorCue_times': np.array([4.0001, np.nan]),
'valveOpen_times': np.array([np.nan, 14.0001]),
'stimOff_times': np.array([6.0001, 15.0001]),
'itiIn_times': np.array([6.0011, 15.000]),
}
alf_trials = {
'goCueTrigger_times_bpod': np.polyval(fpga2bpod, fpga_trials['goCue_times'] - 0.00067),
'response_times_bpod': np.polyval(fpga2bpod, np.array([4., 14.])),
'intervals_bpod': np.polyval(fpga2bpod, fpga_trials['intervals']),
# Times from session start
'goCueTrigger_times': fpga_trials['goCue_times'] - 0.00067,
'response_times': np.array([4., 14.]),
'intervals': fpga_trials['intervals'],
'stimOn_times': fpga_trials['stimOn_times'],
'goCue_times': fpga_trials['goCue_times'],
'feedback_times': fpga_trials['feedback_times'],
}
qcs, qct = ephysqc.qc_fpga_task(fpga_trials, alf_trials)
self.assertTrue(np.all([qcs[k] for k in qcs]))
self.assertTrue(np.all([np.all(qct[k]) for k in qct]))
[docs]
class TestEphysQC(unittest.TestCase):
tempdir = None
[docs]
@classmethod
def setUpClass(cls) -> None:
cls.tempdir = TemporaryDirectory()
cls.one = ONE(**TEST_DB, cache_dir=cls.tempdir.name)
[docs]
@classmethod
def tearDownClass(cls) -> None:
# Clear overwritten methods by destroying cached instance
ONE.cache_clear()
if cls.tempdir:
cls.tempdir.cleanup()
[docs]
def setUp(self) -> None:
self.eid = 'b1c968ad-4874-468d-b2e4-5ffa9b9964e9'
# make a temp probe insertion
self.pname = 'probe02'
# Find any existing insertions with this name and delete
probe_insertions = self.one.alyx.rest('insertions', 'list', session=self.eid, name=self.pname, no_cache=True)
for pi in probe_insertions:
self.one.alyx.rest('insertions', 'delete', pi['id'])
# Create new insertion with this name and add teardown hook to delete it
probe_insertion = self.one.alyx.rest('insertions', 'create', data={'session': self.eid, 'name': self.pname})
self.addCleanup(self.one.alyx.rest, 'insertions', 'delete', id=probe_insertion['id'])
self.pid = probe_insertion['id']
self.qc = ephysqc.EphysQC(self.pid, one=self.one)
[docs]
def tearDown(self) -> None:
pass
[docs]
def test_ensure_data(self):
# Make sure raises an error when no data present
utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=False)
with self.assertRaises(AssertionError):
self.qc._ensure_required_data()
# Make sure it runs through fine when meta files are present
utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=True)
self.qc._ensure_required_data()
[docs]
class TestDetectSpikes(unittest.TestCase):
[docs]
def test_spike_detection(self):
"""
Test that creates a synthetic dataset with spikes and an amplitude decay function
with the probe gemetry, and then pastes spikes all around the dataset and detects and
de-duplicates
The test is feeding the detections in a new round of simulation, and then computing
the zero-lag cross-correlation between input and simulated output, and asserting on
the similarity
"""
fs = 30000
nspikes = 1200
h = neuropixel.trace_header(version=1)
ns, nc = (10000, len(h['x']))
nss, ncs = (121, 21)
np.random.seed(973)
display = False
data = make_synthetic_data(ns, nc, nss, ncs, nspikes)
detects = spikes.detection(data, fs=fs, h=h, detect_threshold=-0.8, time_tol=.0006)
sample_out = (detects.time * fs + nss / 2 - 4).astype(np.int32)
tr_out = detects.trace.astype(np.int32)
data_out = make_synthetic_data(ns, nc, nss, ncs, tr=tr_out, sample=sample_out)
if display:
from easyqc.gui import viewseis
eqc = viewseis(data, si=1 / 30000 * 1e3, taxis=0, title='data')
eqc.ctrl.add_scatter(detects.time * 1e3, detects.trace)
eqco = viewseis(data_out, si=1 / 30000 * 1e3, taxis=0, title='data_out') # noqa
xcor = np.zeros(nc)
for tr in np.arange(nc):
if np.all(data[:, tr] == 0):
xcor[tr] = 1
continue
xcor[tr] = np.corrcoef(data[:, tr], data_out[:, tr])[1, 0]
assert np.mean(xcor > .8) > .95
assert np.nanmedian(xcor) > .99
[docs]
class TestDetectBadChannels(unittest.TestCase):
[docs]
def test_channel_detections(self):
"""
This test creates a synthetic dataset with voltage stripes and 3 types of bad channels
1) dead channels or low amplitude
2) noisy
3) out of the brain
"""
data, channels = synthetic_with_bad_channels()
labels, xfeats = voltage.detect_bad_channels(data.T, fs=30000)
assert np.all(np.where(labels == 1)[0] == np.array(channels['idead']))
assert np.all(np.where(labels == 2)[0] == np.array(channels['inoisy']))
assert np.all(np.where(labels == 3)[0] == np.array(channels['ioutside']))
# from easyqc.gui import viewseis
# eqc = viewseis(data, si=1 / 30000 * 1e3, h=h, title='synth', taxis=0)
# from ibllib.plots.figures import ephys_bad_channels
# ephys_bad_channels(data.T, 30000, labels, xfeats)
if __name__ == '__main__':
unittest.main(exit=False, verbosity=2)