Source code for ibllib.tests.test_ephys

# Mock dataset
import unittest
from tempfile import TemporaryDirectory

import numpy as np
import scipy.signal

from one.api import ONE
import neuropixel
from neurodsp import voltage

from ibllib.ephys import ephysqc, spikes
from ibllib.tests import TEST_DB
from ibllib.tests.fixtures import utils


[docs] def a_little_spike(nsw=121, nc=1): # creates a kind of waveform that resembles a spike wav = np.zeros(nsw) wav[0] = 1 wav[5] = -0.1 wav[10] = -0.3 wav[15] = -0.1 sos = scipy.signal.butter(N=3, Wn=.15, output='sos') spike = scipy.signal.sosfilt(sos, wav) spike = - spike / np.max(spike) if nc > 1: spike = spike[:, np.newaxis] * scipy.signal.hamming(nc)[np.newaxis, :] return spike
[docs] def make_synthetic_data(ns=10000, nc=384, nss=121, ncs=21, nspikes=1200, tr=None, sample=None): if tr is None: tr = np.random.randint(np.ceil(ncs / 2), nc - np.ceil(ncs / 2), nspikes) if sample is None: sample = np.random.randint(np.ceil(nss / 2), ns - np.ceil(nss / 2), nspikes) h = neuropixel.trace_header(1) icsmid = int(np.floor(ncs / 2)) issmid = int(np.floor(nss / 2)) template = a_little_spike(121) data = np.zeros((ns, nc)) for m in np.arange(tr.size): itr = np.arange(tr[m] - icsmid, tr[m] + icsmid + 1) iss = np.arange(sample[m] - issmid, sample[m] + issmid + 1) offset = np.abs(h['x'][itr[icsmid]] + 1j * h['y'][itr[icsmid]] - h['x'][itr] - 1j * h['y'][itr]) ampfac = 1 / (offset + 10) ** 1.3 ampfac = ampfac / np.max(ampfac) tmp = template[:, np.newaxis] * ampfac[np.newaxis, :] data[slice(iss[0], iss[-1] + 1), slice(itr[0], itr[-1] + 1)] += tmp return data
[docs] def synthetic_with_bad_channels(): np.random.seed(12345) ns, nc, fs = (30000, 384, 30000) data = make_synthetic_data(ns=ns, nc=nc) * 1e-6 * 50 st = np.round(np.cumsum(- np.log(np.random.rand(int(ns / fs * 50 * 1.5))) / 50) * fs) st = st[st < ns].astype(np.int32) stripes = np.zeros(ns) stripes[st] = 1 stripes = scipy.signal.convolve(stripes, scipy.signal.ricker(1200, 40), 'same') * 1e-6 * 2500 data = data + stripes[:, np.newaxis] noise = np.random.randn(*data.shape) * 1e-6 * 10 channels = {'idead': [29, 36, 39, 40, 191], 'inoisy': [133, 235], 'ioutside': np.arange(275, 384)} data[:, channels['idead']] = data[:, channels['idead']] / 20 noise[:, channels['inoisy']] = noise[:, channels['inoisy']] * 200 data[:, channels['idead']] = data[:, channels['idead']] / 20 data[:, channels['ioutside']] = 0 data += noise return data, channels
[docs] class TestNeuropixel(unittest.TestCase): """Comprehensive tests about geometry are run as part of the spikeglx reader testing suite"""
[docs] def test_layouts(self): dense = neuropixel.dense_layout() assert set(dense.keys()) == set(['x', 'y', 'row', 'col', 'ind', 'shank']) xu = np.unique(dense['x']) yu = np.unique(dense['y']) assert np.all(np.diff(xu) == 16) assert np.all(np.diff(yu) == 20) assert xu.size == 4 and yu.size == 384 / 2
[docs] def tests_headers(self): th = neuropixel.trace_header() assert set(th.keys()) == set(['x', 'y', 'row', 'col', 'ind', 'adc', 'sample_shift', 'shank'])
[docs] class TestFpgaTask(unittest.TestCase):
[docs] def test_impeccable_dataset(self): fpga2bpod = np.array([11 * 1e-6, -20]) # bpod starts 20 secs before with 10 ppm drift fpga_trials = { 'intervals': np.array([[0, 9.5], [10, 19.5]]), 'stimOn_times': np.array([2, 12]), 'goCue_times': np.array([2.0001, 12.0001]), 'stimFreeze_times': np.array([4., 14.]), 'feedback_times': np.array([4.0001, 14.0001]), 'errorCue_times': np.array([4.0001, np.nan]), 'valveOpen_times': np.array([np.nan, 14.0001]), 'stimOff_times': np.array([6.0001, 15.0001]), 'itiIn_times': np.array([6.0011, 15.000]), } alf_trials = { 'goCueTrigger_times_bpod': np.polyval(fpga2bpod, fpga_trials['goCue_times'] - 0.00067), 'response_times_bpod': np.polyval(fpga2bpod, np.array([4., 14.])), 'intervals_bpod': np.polyval(fpga2bpod, fpga_trials['intervals']), # Times from session start 'goCueTrigger_times': fpga_trials['goCue_times'] - 0.00067, 'response_times': np.array([4., 14.]), 'intervals': fpga_trials['intervals'], 'stimOn_times': fpga_trials['stimOn_times'], 'goCue_times': fpga_trials['goCue_times'], 'feedback_times': fpga_trials['feedback_times'], } qcs, qct = ephysqc.qc_fpga_task(fpga_trials, alf_trials) self.assertTrue(np.all([qcs[k] for k in qcs])) self.assertTrue(np.all([np.all(qct[k]) for k in qct]))
[docs] class TestEphysQC(unittest.TestCase): tempdir = None
[docs] @classmethod def setUpClass(cls) -> None: cls.tempdir = TemporaryDirectory() cls.one = ONE(**TEST_DB, cache_dir=cls.tempdir.name)
[docs] @classmethod def tearDownClass(cls) -> None: # Clear overwritten methods by destroying cached instance ONE.cache_clear() if cls.tempdir: cls.tempdir.cleanup()
[docs] def setUp(self) -> None: self.eid = 'b1c968ad-4874-468d-b2e4-5ffa9b9964e9' # make a temp probe insertion self.pname = 'probe02' # Find any existing insertions with this name and delete probe_insertions = self.one.alyx.rest('insertions', 'list', session=self.eid, name=self.pname, no_cache=True) for pi in probe_insertions: self.one.alyx.rest('insertions', 'delete', pi['id']) # Create new insertion with this name and add teardown hook to delete it probe_insertion = self.one.alyx.rest('insertions', 'create', data={'session': self.eid, 'name': self.pname}) self.addCleanup(self.one.alyx.rest, 'insertions', 'delete', id=probe_insertion['id']) self.pid = probe_insertion['id'] self.qc = ephysqc.EphysQC(self.pid, one=self.one)
[docs] def tearDown(self) -> None: pass
[docs] def test_ensure_data(self): # Make sure raises an error when no data present utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=False) with self.assertRaises(AssertionError): self.qc._ensure_required_data() # Make sure it runs through fine when meta files are present utils.create_fake_raw_ephys_data_folder(self.one.eid2path(self.eid), populate=True) self.qc._ensure_required_data()
[docs] class TestDetectSpikes(unittest.TestCase):
[docs] def test_spike_detection(self): """ Test that creates a synthetic dataset with spikes and an amplitude decay function with the probe gemetry, and then pastes spikes all around the dataset and detects and de-duplicates The test is feeding the detections in a new round of simulation, and then computing the zero-lag cross-correlation between input and simulated output, and asserting on the similarity """ fs = 30000 nspikes = 1200 h = neuropixel.trace_header(version=1) ns, nc = (10000, len(h['x'])) nss, ncs = (121, 21) np.random.seed(973) display = False data = make_synthetic_data(ns, nc, nss, ncs, nspikes) detects = spikes.detection(data, fs=fs, h=h, detect_threshold=-0.8, time_tol=.0006) sample_out = (detects.time * fs + nss / 2 - 4).astype(np.int32) tr_out = detects.trace.astype(np.int32) data_out = make_synthetic_data(ns, nc, nss, ncs, tr=tr_out, sample=sample_out) if display: from easyqc.gui import viewseis eqc = viewseis(data, si=1 / 30000 * 1e3, taxis=0, title='data') eqc.ctrl.add_scatter(detects.time * 1e3, detects.trace) eqco = viewseis(data_out, si=1 / 30000 * 1e3, taxis=0, title='data_out') # noqa xcor = np.zeros(nc) for tr in np.arange(nc): if np.all(data[:, tr] == 0): xcor[tr] = 1 continue xcor[tr] = np.corrcoef(data[:, tr], data_out[:, tr])[1, 0] assert np.mean(xcor > .8) > .95 assert np.nanmedian(xcor) > .99
[docs] class TestDetectBadChannels(unittest.TestCase):
[docs] def test_channel_detections(self): """ This test creates a synthetic dataset with voltage stripes and 3 types of bad channels 1) dead channels or low amplitude 2) noisy 3) out of the brain """ data, channels = synthetic_with_bad_channels() labels, xfeats = voltage.detect_bad_channels(data.T, fs=30000) assert np.all(np.where(labels == 1)[0] == np.array(channels['idead'])) assert np.all(np.where(labels == 2)[0] == np.array(channels['inoisy'])) assert np.all(np.where(labels == 3)[0] == np.array(channels['ioutside']))
# from easyqc.gui import viewseis # eqc = viewseis(data, si=1 / 30000 * 1e3, h=h, title='synth', taxis=0) # from ibllib.plots.figures import ephys_bad_channels # ephys_bad_channels(data.T, 30000, labels, xfeats) if __name__ == '__main__': unittest.main(exit=False, verbosity=2)