Source code for ibllib.tests.test_spikeglx
from pathlib import Path
import shutil
import tempfile
import unittest
import numpy as np
from iblutil.io import hashfile
from ibllib.io import spikeglx
from ibllib.ephys import neuropixel
[docs]class TestSpikeGLX_hardwareInfo(unittest.TestCase):
[docs] def setUp(self) -> None:
self.workdir = Path(__file__).parent / 'fixtures' / 'io' / 'spikeglx'
self.map3A = {'left_camera': 2,
'right_camera': 3,
'body_camera': 4,
'bpod': 7,
'frame2ttl': 12,
'rotary_encoder_0': 13,
'rotary_encoder_1': 14,
'audio': 15}
self.map3B = {'left_camera': 0,
'right_camera': 1,
'body_camera': 2,
'imec_sync': 3,
'frame2ttl': 4,
'rotary_encoder_0': 5,
'rotary_encoder_1': 6,
'audio': 7,
'bpod': 16,
'laser': 17,
'laser_ttl': 18}
self.file3a = self.workdir / 'sample3A_g0_t0.imec.wiring.json'
self.file3b = self.workdir / 'sample3B_g0_t0.nidq.wiring.json'
[docs] def test_get_wiring(self):
# get params providing full file path
par = spikeglx.get_hardware_config(self.workdir)
self.assertTrue(par)
with tempfile.TemporaryDirectory() as tdir:
# test from empty directory
self.assertIsNone(spikeglx.get_hardware_config(tdir))
# test from directory
shutil.copy(self.file3a, Path(tdir) / self.file3a.name)
par3a = spikeglx.get_hardware_config(tdir)
# test from full file path
par3a_ = spikeglx.get_hardware_config(Path(tdir) / self.file3a.name)
self.assertEqual(par3a, par3a_)
[docs] def test_get_channel_map(self):
map = spikeglx.get_sync_map(self.file3a)
self.assertEqual(map, self.map3A)
map = spikeglx.get_sync_map(self.file3b)
self.assertEqual(map, self.map3B)
with tempfile.TemporaryDirectory() as tdir:
self.assertIsNone(spikeglx.get_sync_map(Path(tdir) / 'idontexist.json'))
[docs] def test_default_values(self):
from ibllib.io.extractors import ephys_fpga
self.assertEqual(ephys_fpga.CHMAPS['3A']['ap'], self.map3A)
self.assertEqual(ephys_fpga.CHMAPS['3B']['nidq'], self.map3B)
[docs]class TestSpikeGLX_glob_ephys(unittest.TestCase):
"""
Creates mock acquisition folders architecture (omitting metadata files):
├── 3A
│ ├── imec0
│ │ ├── sync_testing_g0_t0.imec0.ap.bin
│ │ └── sync_testing_g0_t0.imec0.lf.bin
│ └── imec1
│ ├── sync_testing_g0_t0.imec1.ap.bin
│ └── sync_testing_g0_t0.imec1.lf.bin
└── 3B
├── sync_testing_g0_t0.nidq.bin
├── imec0
│ ├── sync_testing_g0_t0.imec0.ap.bin
│ └── sync_testing_g0_t0.imec0.lf.bin
└── imec1
├── sync_testing_g0_t0.imec1.ap.bin
└── sync_testing_g0_t0.imec1.lf.bin
"""
[docs] def setUp(self):
def touchfile(p):
if isinstance(p, Path):
try:
p.parent.mkdir(exist_ok=True, parents=True)
p.touch(exist_ok=True)
except Exception:
print('tutu')
def create_tree(root_dir, dico):
root_dir.mkdir(exist_ok=True, parents=True)
for ldir in dico:
for k in ldir:
if k == 'path' or k == 'label':
continue
touchfile(ldir[k])
Path(ldir[k]).with_suffix('.meta').touch()
self.tmpdir = Path(tempfile.gettempdir()) / 'test_glob_ephys'
self.tmpdir.mkdir(exist_ok=True)
self.dir3a = self.tmpdir.joinpath('3A').joinpath('raw_ephys_data')
self.dir3b = self.tmpdir.joinpath('3B').joinpath('raw_ephys_data')
self.dict3a = [{'label': 'imec0',
'ap': self.dir3a / 'imec0' / 'sync_testing_g0_t0.imec0.ap.bin',
'lf': self.dir3a / 'imec0' / 'sync_testing_g0_t0.imec0.lf.bin',
'path': self.dir3a / 'imec0'},
{'label': 'imec1',
'ap': self.dir3a / 'imec1' / 'sync_testing_g0_t0.imec1.ap.bin',
'lf': self.dir3a / 'imec1' / 'sync_testing_g0_t0.imec1.lf.bin',
'path': self.dir3a / 'imec1'}]
# surprise ! one of them happens to be compressed
self.dict3b = [{'label': 'imec0',
'ap': self.dir3b / 'imec0' / 'sync_testing_g0_t0.imec0.ap.cbin',
'lf': self.dir3b / 'imec0' / 'sync_testing_g0_t0.imec0.lf.bin',
'path': self.dir3b / 'imec0'},
{'label': 'imec1',
'ap': self.dir3b / 'imec1' / 'sync_testing_g0_t0.imec1.ap.bin',
'lf': self.dir3b / 'imec1' / 'sync_testing_g0_t0.imec1.lf.bin',
'path': self.dir3b / 'imec1'},
{'label': '',
'nidq': self.dir3b / 'sync_testing_g0_t0.nidq.bin',
'path': self.dir3b}]
self.dict3b_ch = [{'label': 'imec0',
'ap': self.dir3b / 'imec0' / 'sync_testing_g0_t0.imec0.ap.ch',
'lf': self.dir3b / 'imec0' / 'sync_testing_g0_t0.imec0.lf.ch',
'path': self.dir3b / 'imec0'},
{'label': 'imec1',
'ap': self.dir3b / 'imec1' / 'sync_testing_g0_t0.imec1.ap.ch',
'lf': self.dir3b / 'imec1' / 'sync_testing_g0_t0.imec1.lf.ch',
'path': self.dir3b / 'imec1'},
{'label': '',
'nidq': self.dir3b / 'sync_testing_g0_t0.nidq.ch',
'path': self.dir3b}]
create_tree(self.dir3a, self.dict3a)
create_tree(self.dir3b, self.dict3b)
create_tree(self.dir3b, self.dict3b_ch)
[docs] def test_glob_ephys(self):
def dict_equals(d1, d2):
return all([x in d1 for x in d2]) and all([x in d2 for x in d1])
ef3b = spikeglx.glob_ephys_files(self.dir3b)
ef3a = spikeglx.glob_ephys_files(self.dir3a)
ef3b_ch = spikeglx.glob_ephys_files(self.dir3b, ext='ch')
# test glob
self.assertTrue(dict_equals(self.dict3a, ef3a))
self.assertTrue(dict_equals(self.dict3b, ef3b))
self.assertTrue(dict_equals(self.dict3b_ch, ef3b_ch))
# test the version from glob
self.assertTrue(spikeglx.get_neuropixel_version_from_files(ef3a) == '3A')
self.assertTrue(spikeglx.get_neuropixel_version_from_files(ef3b) == '3B')
# test the version from paths
self.assertTrue(spikeglx.get_neuropixel_version_from_folder(self.dir3a) == '3A')
self.assertTrue(spikeglx.get_neuropixel_version_from_folder(self.dir3b) == '3B')
self.dir3b.joinpath('imec1', 'sync_testing_g0_t0.imec1.ap.bin').unlink()
self.assertEqual(spikeglx.glob_ephys_files(self.dir3b.joinpath('imec1')), [])
[docs]class TestsSpikeGLX_compress(unittest.TestCase):
[docs] def setUp(self):
self._tempdir = tempfile.TemporaryDirectory()
# self.addClassCleanup(self._tempdir.cleanup) # py3.8
self.workdir = Path(self._tempdir.name)
file_meta = Path(__file__).parent.joinpath('fixtures', 'io', 'spikeglx',
'sample3A_short_g0_t0.imec.ap.meta')
self.file_bin = spikeglx._mock_spikeglx_file(
self.workdir.joinpath('sample3A_short_g0_t0.imec.ap.bin'), file_meta, ns=76104,
nc=385, sync_depth=16, random=True)['bin_file']
self.sr = spikeglx.Reader(self.file_bin)
assert self.sr._raw is not None
assert self.sr.is_open
[docs] def test_read_slices(self):
sr = self.sr
s2mv = sr.channel_conversion_sample2v['ap'][0]
# test the slicing of reader object
self.assertTrue(np.all(np.isclose(sr._raw[5:500, :-1] * s2mv, sr[5:500, :-1])))
self.assertTrue(np.all(np.isclose(sr._raw[5:500, 5] * s2mv, sr[5:500, 5])))
self.assertTrue(np.all(np.isclose(sr._raw[5, :-1] * s2mv, sr[5, :-1])))
self.assertTrue(sr._raw[55, 5] * s2mv == sr[55, 5])
self.assertTrue(np.all(np.isclose(sr._raw[55] * s2mv, sr[55])))
self.assertTrue(np.all(np.isclose(sr._raw[5:500] * s2mv, sr[5:500])[:, :-1]))
[docs] def test_compress(self):
def compare_data(sr0, sr1):
# test direct reading through memmap / mtscompreader
self.assertTrue(np.all(sr0._raw[1200:1210, 12] == sr1._raw[1200:1210, 12]))
# test reading through methods
d0, s0 = sr0.read_samples(1200, 54245)
d1, s1 = sr1.read_samples(1200, 54245)
self.assertTrue(np.all(d0 == d1))
self.assertTrue(np.all(s0 == s1))
self.assertFalse(self.sr.is_mtscomp)
self.file_cbin = self.sr.compress_file()
self.sr.close()
# create a reference file that will serve to compare for inplace operations
ref_file = self.file_bin.parent.joinpath('REF_' + self.file_bin.name)
ref_meta = self.file_bin.parent.joinpath('REF_' + self.file_bin.with_suffix('.meta').name)
shutil.copy(self.file_bin, ref_file)
shutil.copy(self.file_bin.with_suffix('.meta'), ref_meta)
# test file compression copy
with spikeglx.Reader(ref_file, open=False) as sr_ref:
with spikeglx.Reader(self.file_cbin, open=False) as sc:
self.assertTrue(sc.is_mtscomp)
compare_data(sr_ref, sc)
# test decompression in-place
sc.decompress_file(keep_original=False, overwrite=True)
self.assertFalse(self.sr.is_mtscomp)
self.assertFalse(self.file_cbin.exists())
compare_data(sr_ref, sc)
# test compression in-place
sc.compress_file(keep_original=False, overwrite=True)
self.assertTrue(sc.is_mtscomp)
self.assertTrue(self.file_cbin.exists())
self.assertFalse(self.file_bin.exists())
compare_data(sr_ref, sc)
[docs]class TestsSpikeGLX_Meta(unittest.TestCase):
[docs] def setUp(self):
self.workdir = Path(__file__).parent / 'fixtures' / 'io' / 'spikeglx'
self.meta_files = list(Path.glob(self.workdir, '*.meta'))
self.tmpdir = Path(tempfile.gettempdir()) / 'test_meta'
self.tmpdir.mkdir(exist_ok=True)
[docs] def test_fix_meta_file(self):
# test the case where the meta file shows a larger amount of samples
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
bin_3a = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sample3A_g0_t0.imec.ap.bin'),
self.workdir / 'sample3A_g0_t0.imec.ap.meta', ns=32, nc=385, sync_depth=16)
with open(bin_3a['bin_file'], 'wb') as fp:
np.random.randint(-20000, 20000, 385 * 22, dtype=np.int16).tofile(fp)
with spikeglx.Reader(bin_3a['bin_file'], open=False) as sr:
verifiable = sr.meta['fileTimeSecs'] * 30000
self.assertEqual(verifiable, 22)
[docs] def test_read_corrupt(self):
# nidq has 1 analog and 1 digital sync channels
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
int2volts = 5 / 32768
nidq = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sample3B_g0_t0.nidq.bin'),
self.workdir / 'sample3B_g0_t0.nidq.meta',
ns=32, nc=2, sync_depth=8, int2volts=int2volts, corrupt=True)
self.assert_read_glx(nidq)
[docs] def test_read_nidq(self):
# nidq has 1 analog and 1 digital sync channels
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
int2volts = 5 / 32768
nidq = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sample3B_g0_t0.nidq.bin'),
self.workdir / 'sample3B_g0_t0.nidq.meta',
ns=32, nc=2, sync_depth=8, int2volts=int2volts)
self.assert_read_glx(nidq)
[docs] def test_read_3A(self):
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
bin_3a = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sample3A_g0_t0.imec.ap.bin'),
self.workdir / 'sample3A_g0_t0.imec.ap.meta',
ns=32, nc=385, sync_depth=16)
self.assert_read_glx(bin_3a)
[docs] def test_read_3B(self):
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
bin_3b = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sample3B_g0_t0.imec1.ap.bin'),
self.workdir / 'sample3B_g0_t0.imec1.ap.meta',
ns=32, nc=385, sync_depth=16)
self.assert_read_glx(bin_3b)
[docs] def test_read_NP21(self):
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
bin_3b = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sampleNP2.1_g0_t0.imec.ap.bin'),
self.workdir / 'sampleNP2.1_g0_t0.imec.ap.meta',
ns=32, nc=385, sync_depth=16)
self.assert_read_glx(bin_3b)
[docs] def test_read_NP24(self):
with tempfile.TemporaryDirectory(prefix='glx_test') as tdir:
bin_3b = spikeglx._mock_spikeglx_file(
Path(tdir).joinpath('sampleNP2.4_g0_t0.imec.ap.bin'),
self.workdir / 'sampleNP2.4_g0_t0.imec.ap.meta',
ns=32, nc=385, sync_depth=16)
self.assert_read_glx(bin_3b)
[docs] def test_check_ephys_file(self):
self.tdir = tempfile.TemporaryDirectory(prefix='glx_test')
self.addCleanup(self.tdir.cleanup)
bin_3b = spikeglx._mock_spikeglx_file(
Path(self.tdir.name).joinpath('sample3B_g0_t0.imec1.ap.bin'),
self.workdir / 'sample3B_g0_t0.imec1.ap.meta',
ns=32, nc=385, sync_depth=16)
self.assertEqual(hashfile.md5(bin_3b['bin_file']), "207ba1666b866a091e5bb8b26d19733f")
self.assertEqual(hashfile.sha1(bin_3b['bin_file']),
'1bf3219c35dea15409576f6764dd9152c3f8a89c')
sr = spikeglx.Reader(bin_3b['bin_file'], open=False)
self.assertTrue(sr.verify_hash())
[docs] def assert_read_glx(self, tglx):
with spikeglx.Reader(tglx['bin_file']) as sr:
dexpected = sr.channel_conversion_sample2v[sr.type] * tglx['D']
d, sync = sr.read_samples(0, tglx['ns'])
# could be rounding errors with non-integer sampling rates
self.assertTrue(sr.nsync == 1)
self.assertTrue(sr.rl == sr.ns / sr.fs)
self.assertTrue(sr.nc == tglx['nc'])
self.assertTrue(sr.ns == tglx['ns'])
# test the data reading with gain
self.assertTrue(np.all(np.isclose(dexpected, d)))
# test the sync reading, one front per channel
self.assertTrue(np.sum(sync) == tglx['sync_depth'])
for m in np.arange(tglx['sync_depth']):
self.assertTrue(sync[m + 1, m] == 1)
if sr.type in ['ap', 'lf']: # exclude nidq from the slicing circus
# teast reading only one channel
d, _ = sr.read(slice(None), 10)
self.assertTrue(np.all(np.isclose(d, dexpected[:, 10])))
# test reading only one time
d, _ = sr.read(5, slice(None))
self.assertTrue(np.all(np.isclose(d, dexpected[5, :])))
# test reading a few times
d, _ = sr.read(slice(5, 7), slice(None))
self.assertTrue(np.all(np.isclose(d, dexpected[5:7, :])))
d, _ = sr.read([5, 6], slice(None))
self.assertTrue(np.all(np.isclose(d, dexpected[5:7, :])))
# test reading a few channels
d, _ = sr.read(slice(None), slice(300, 310))
self.assertTrue(np.all(np.isclose(d, dexpected[:, 300:310])))
# test reading a few channels with a numpy array of indices
ind = np.array([300, 302])
d, _ = sr.read(slice(None), ind)
self.assertTrue(np.all(np.isclose(d, dexpected[:, ind])))
# test double slicing
d, _ = sr.read(slice(5, 10), slice(300, 310))
self.assertTrue(np.all(np.isclose(d, dexpected[5:10, 300:310])))
# test empty slices
d, _ = sr.read(slice(5, 10), [])
self.assertTrue(d.size == 0)
d, _ = sr.read([], [])
self.assertTrue(d.size == 0)
d, _ = sr.read([], slice(300, 310))
self.assertTrue(d.size == 0)
a = sr.read_sync_analog()
self.assertIsNone(a)
# test the read_samples method (should be deprecated ?)
d, _ = sr.read_samples(0, 500, ind)
self.assertTrue(np.all(np.isclose(d, dexpected[0:500, ind])))
d, _ = sr.read_samples(0, 500)
self.assertTrue(np.all(np.isclose(d, dexpected[0:500, :])))
else:
s = sr.read_sync()
self.assertTrue(s.shape[1] == 17)
# test the channel geometries but skip when meta data doesn't correspond to NP
if sr.major_version is not None:
h = neuropixel.trace_header(sr.major_version)
th = sr.geometry
for k in h.keys():
assert(np.all(th[k] == h[k])), print(k)
[docs] def testGetSerialNumber(self):
self.meta_files.sort()
expected = [641251510, 641251510, 641251510, 17216703352, 18005116811, 18005116811, None,
19011116954, 19011110513]
for meta_data_file, res in zip(self.meta_files, expected):
md = spikeglx.read_meta_data(meta_data_file)
self.assertEqual(md.serial, res)
[docs] def testGetRevisionAndType(self):
for meta_data_file in self.meta_files:
md = spikeglx.read_meta_data(meta_data_file)
self.assertTrue(len(md.keys()) >= 37)
if meta_data_file.name.split('.')[-2] in ['lf', 'ap']:
# for ap and lf look for version number
# test getting revision
revision = meta_data_file.name[6:8]
minor = spikeglx._get_neuropixel_version_from_meta(md)[0:2]
major = spikeglx._get_neuropixel_major_version_from_meta(md)
print(revision, minor, major)
self.assertEqual(minor, revision)
# test the major version
if revision.startswith('3'):
assert major == 1
else:
assert np.floor(major) == 2
# test getting acquisition type for all ap, lf and nidq
type = meta_data_file.name.split('.')[-2]
self.assertEqual(spikeglx._get_type_from_meta(md), type)
[docs] def testReadChannelGainAPLF(self):
for meta_data_file in self.meta_files:
if meta_data_file.name.split('.')[-2] not in ['lf', 'ap']:
continue
md = spikeglx.read_meta_data(meta_data_file)
cg = spikeglx._conversion_sample2v_from_meta(md)
if 'NP2' in spikeglx._get_neuropixel_version_from_meta(md):
i2v = md.get('imAiRangeMax') / int(md.get('imMaxInt'))
self.assertTrue(np.all(cg['lf'][0:-1] == i2v / 80))
self.assertTrue(np.all(cg['ap'][0:-1] == i2v / 80))
else:
i2v = md.get('imAiRangeMax') / 512
self.assertTrue(np.all(cg['lf'][0:-1] == i2v / 250))
self.assertTrue(np.all(cg['ap'][0:-1] == i2v / 500))
# also test consistent dimension with nchannels
nc = spikeglx._get_nchannels_from_meta(md)
self.assertTrue(len(cg['ap']) == len(cg['lf']) == nc)
[docs] def testGetAnalogSyncIndex(self):
for meta_data_file in self.meta_files:
md = spikeglx.read_meta_data(meta_data_file)
if spikeglx._get_type_from_meta(md) in ['ap', 'lf']:
self.assertTrue(spikeglx._get_analog_sync_trace_indices_from_meta(md) == [])
else:
self.assertEqual(spikeglx._get_analog_sync_trace_indices_from_meta(md), [0])
[docs] def testReadChannelGainNIDQ(self):
for meta_data_file in self.meta_files:
if meta_data_file.name.split('.')[-2] not in ['nidq']:
continue
md = spikeglx.read_meta_data(meta_data_file)
nc = spikeglx._get_nchannels_from_meta(md)
cg = spikeglx._conversion_sample2v_from_meta(md)
i2v = md.get('niAiRangeMax') / 32768
self.assertTrue(np.all(cg['nidq'][slice(0, int(np.sum(md.acqMnMaXaDw[:3])))] == i2v))
self.assertTrue(np.all(cg['nidq'][slice(int(np.sum(md.acqMnMaXaDw[-1])), None)] == 1.))
self.assertTrue(len(cg['nidq']) == nc)
[docs] def testReadChannelMap(self):
for meta_data_file in self.meta_files:
md = spikeglx.read_meta_data(meta_data_file)
cm = spikeglx._map_channels_from_meta(md)
if 'snsShankMap' in md.keys():
self.assertEqual(set(cm.keys()), set(['shank', 'col', 'row', 'flag']))
[docs] def testSplitSyncTrace(self):
sc = np.uint16(2 ** np.linspace(-1, 15, 17))
out = spikeglx.split_sync(sc)
for m in range(1, 16):
self.assertEqual(np.sum(out[m]), 1)
self.assertEqual(out[m, m - 1], 1)
[docs] def testWriteMetaData(self):
for meta_data_file in self.meta_files:
md = spikeglx.read_meta_data(meta_data_file)
tmp_meta = self.tmpdir.joinpath(meta_data_file.name)
spikeglx.write_meta_data(md, tmp_meta)
md_new = spikeglx.read_meta_data(tmp_meta)
self.assertEqual(md, md_new)
[docs] def testSaveSubset(self):
chns = np.r_[np.arange(0, 11), np.arange(50, 91), 384]
subset = spikeglx._get_savedChans_subset(chns)
self.assertEqual(subset, '0:10,50:90,384')
chns = np.r_[np.arange(30, 101), np.arange(250, 301), 384]
subset = spikeglx._get_savedChans_subset(chns)
self.assertEqual(subset, '30:100,250:300,384')
[docs]class TestsBasicReader(unittest.TestCase):
"""
Tests the basic usage where there is a flat binary and no metadata associated
"""
[docs] def test_read_flat_binary_float32(self):
# here we expect no scaling to V applied and no sync trace as the format is float32
kwargs = dict(ns=60000, nc=384, fs=30000, dtype=np.float32)
data = np.random.randn(kwargs['ns'], kwargs['nc']).astype(np.float32)
with tempfile.NamedTemporaryFile() as tf:
with open(tf.name, mode='w') as fp:
data.tofile(fp)
sr = spikeglx.Reader(tf.name, **kwargs)
assert np.all(sr[:, :] == data)
assert sr.nsync == 0
assert np.all(sr.sample2volts == 1)
[docs] def test_read_flat_binary_int16(self):
# here we expect scaling on all channels but the sync channel
np.random.seed(42)
kwargs = dict(ns=60000, nc=385, fs=30000, dtype=np.int16)
s2v = np.ones(385) * spikeglx.S2V_AP
s2v[-1] = 1
data = np.random.randn(kwargs['ns'], kwargs['nc']) / s2v
data[:, -1] = 1
data = data.astype(np.int16)
with tempfile.NamedTemporaryFile() as tf:
with open(tf.name, mode='w') as fp:
data.tofile(fp)
sr = spikeglx.Reader(tf.name, **kwargs)
assert np.all(np.isclose(sr[:, :-1], data[:, :-1].astype(np.float32) * spikeglx.S2V_AP))
assert sr.nsync == 1
assert np.all(sr.sample2volts == s2v)