import logging
import unittest
import tempfile
from pathlib import Path
import shutil
import json
import numpy as np
import one.alf.io as alfio
from one.alf.exceptions import ALFObjectNotFound
from one.alf.spec import FILE_SPEC, regex
[docs]class TestAlfBunch(unittest.TestCase):
[docs] def test_to_dataframe_scalars(self):
simple = alfio.AlfBunch({'titi': np.random.rand(500), 'toto': np.random.rand(500)})
df = simple.to_df()
self.assertTrue(np.all(df['titi'].values == simple.titi))
self.assertTrue(np.all(df['toto'].values == simple.toto))
self.assertTrue(len(df.columns) == 2)
simple['titi'] = np.random.rand(50)
with self.assertRaises(ValueError):
simple.to_df()
simple['toto'] = np.random.rand(50, 10, 5)
with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING):
self.assertTrue('toto' not in simple.to_df().columns)
[docs] def test_to_dataframe_vectors(self):
vectors = alfio.AlfBunch({'titi': np.random.rand(500, 1),
'toto': np.random.rand(500),
'tata': np.random.rand(500, 2)})
df = vectors.to_df()
self.assertTrue(np.all(df['titi'].values == vectors.titi[:, 0]))
self.assertTrue(np.all(df['toto'].values == vectors.toto))
self.assertTrue(np.all(df['tata_0'].values == vectors.tata[:, 0]))
self.assertTrue(np.all(df['tata_1'].values == vectors.tata[:, 1]))
self.assertTrue(len(df.columns) == 4)
[docs] def test_append_numpy(self):
a = alfio.AlfBunch({'titi': np.random.rand(500), 'toto': np.random.rand(500)})
b = alfio.AlfBunch({})
# test with empty elements
self.assertTrue(np.all(np.equal(a.append({})['titi'], a['titi'])))
self.assertTrue(np.all(np.equal(b.append(a)['titi'], a['titi'])))
self.assertEqual(b.append({}), {})
# test with numpy arrays
b = alfio.AlfBunch({'titi': np.random.rand(250),
'toto': np.random.rand(250)})
c = a.append(b)
t = np.all(np.equal(c['titi'][0:500], a['titi']))
t &= np.all(np.equal(c['toto'][0:500], a['toto']))
t &= np.all(np.equal(c['titi'][500:], b['titi']))
t &= np.all(np.equal(c['toto'][500:], b['toto']))
self.assertTrue(t)
a.append(b, inplace=True)
self.assertTrue(np.all(np.equal(c['toto'], a['toto'])))
self.assertTrue(np.all(np.equal(c['titi'], a['titi'])))
[docs] def test_append_list(self):
# test with lists
a = alfio.AlfBunch({'titi': [0, 1, 3], 'toto': ['a', 'b', 'c']})
b = alfio.AlfBunch({'titi': [1, 2, 4], 'toto': ['d', 'e', 'f']})
c = a.append(b)
self.assertTrue(len(c['toto']) == 6)
self.assertTrue(len(a['toto']) == 3)
c = c.append(b)
self.assertTrue(len(c['toto']) == 9)
self.assertTrue(len(a['toto']) == 3)
c.append(b, inplace=True)
self.assertTrue(len(c['toto']) == 12)
self.assertTrue(len(a['toto']) == 3)
with self.assertRaises(NotImplementedError):
a.append(alfio.AlfBunch({'foobar': [8, 9, 10]}))
a['foobar'] = '123'
with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING) as log:
a.append({'titi': [5], 'toto': [8], 'foobar': 'd'})
self.assertTrue('str' in log.output[0])
[docs] def test_check_dimensions(self):
a = alfio.AlfBunch({'titi': np.array([0, 1, 3]), 'toto': np.array(['a', 'b', 'c'])})
self.assertFalse(a.check_dimensions)
a['titi'] = np.append(a['titi'], 4)
self.assertTrue(a.check_dimensions)
[docs]class TestsAlfPartsFilters(unittest.TestCase):
[docs] def setUp(self) -> None:
self.tmpdir = Path(tempfile.gettempdir()) / 'iotest'
self.tmpdir.mkdir(exist_ok=True)
[docs] def test_npy_parts_and_file_filters(self):
a = {'riri': np.random.rand(100), 'fifi': np.random.rand(100)}
alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts='tutu')
alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts='tutu', timescale='toto')
self.assertTrue(alfio.exists(self.tmpdir, 'neuveux'))
b = alfio.load_object(self.tmpdir, 'neuveux', short_keys=True)
# Should include timescale in keys
self.assertCountEqual(list(b.keys()), ['fifi', 'fifi_toto', 'riri', 'riri_toto'])
for k in a:
self.assertTrue(np.all(a[k] == b[k]))
# Test load with extra filter
b = alfio.load_object(self.tmpdir, 'neuveux', timescale='toto', short_keys=True)
self.assertCountEqual(list(b.keys()), ['fifi_toto', 'riri_toto'])
with self.assertRaises(ALFObjectNotFound):
alfio.load_object(self.tmpdir, 'neuveux', timescale='toto', namespace='baz')
# also test file filters through wildcard
self.assertTrue(alfio.exists(self.tmpdir, 'neu*'))
c = alfio.load_object(self.tmpdir, 'neuveux', timescale='to*', short_keys=True)
self.assertEqual(set(c.keys()), set([k for k in c.keys() if k.endswith('toto')]))
# test with the long keys
b = alfio.load_object(self.tmpdir, 'neuveux', short_keys=False)
expected = ['fifi.tutu', 'fifi_toto.tutu', 'riri.tutu', 'riri_toto.tutu']
self.assertCountEqual(list(b.keys()), expected)
# Test duplicate attributes
alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts=['tutu', 'titi'])
with self.assertRaises(AssertionError):
alfio.load_object(self.tmpdir, 'neuveux', short_keys=True)
# Restricting by extra parts and using long keys should succeed
alfio.load_object(self.tmpdir, 'neuveux', extra=['tutu', 'titi'])
alfio.load_object(self.tmpdir, 'neuveux', short_keys=False)
[docs] def test_filter_by(self):
spec_idx_map = regex(FILE_SPEC).groupindex
file_names = [
'noalf.file',
'_ibl_trials.intervals.npy',
'_ibl_trials.intervals_bpod.csv',
'wheel.position.npy',
'wheel.timestamps.npy',
'wheelMoves.intervals.npy',
'_namespace_obj.attr_timescale.raw.v12.ext']
for f in file_names:
(self.tmpdir / f).touch()
# Test filter with None; should return files with no non-standard timescale
alf_files, _ = alfio.filter_by(self.tmpdir, timescale=None)
expected = [
'wheel.position.npy',
'wheel.timestamps.npy',
'wheelMoves.intervals.npy',
'_ibl_trials.intervals.npy']
self.assertCountEqual(alf_files, expected, 'failed to filter with None attribute')
# Test filtering by object; should return only 'wheel' ALF objects
alf_files, parts = alfio.filter_by(self.tmpdir, object='wheel')
expected = ['wheel.position.npy', 'wheel.timestamps.npy']
self.assertCountEqual(alf_files, expected, 'failed to filter by object')
self.assertEqual(len(alf_files), len(parts))
# Test wildcards; should return 'wheel' and 'wheelMoves' ALF objects
alf_files, _ = alfio.filter_by(self.tmpdir, object='wh*')
expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy']
self.assertCountEqual(alf_files, expected, 'failed to filter with wildcard')
# Test filtering by specific timescale; test parts returned
alf_files, parts = alfio.filter_by(self.tmpdir, timescale='bpod')
expected = ['_ibl_trials.intervals_bpod.csv']
self.assertEqual(alf_files, expected, 'failed to filter by timescale')
expected = ('ibl', 'trials', 'intervals', 'bpod', None, 'csv')
self.assertTupleEqual(parts[0], expected)
self.assertEqual(len(parts[0]), len(spec_idx_map))
self.assertEqual(parts[0][spec_idx_map['timescale'] - 1], 'bpod')
# Test filtering multiple attributes; should return only trials intervals
alf_files, _ = alfio.filter_by(self.tmpdir, attribute='intervals', object='trials')
expected = ['_ibl_trials.intervals.npy', '_ibl_trials.intervals_bpod.csv']
self.assertCountEqual(alf_files, expected, 'failed to filter by multiple attribute')
# Test returning only ALF files
alf_files, _ = alfio.filter_by(self.tmpdir)
self.assertCountEqual(alf_files, file_names[1:], 'failed to return ALF files')
# Test return empty
out = alfio.filter_by(self.tmpdir, object=None)
self.assertEqual(out, ([], []))
# Test extras
alf_files, _ = alfio.filter_by(self.tmpdir, extra='v12')
expected = ['_namespace_obj.attr_timescale.raw.v12.ext']
self.assertEqual(alf_files, expected, 'failed to filter extra attributes')
alf_files, _ = alfio.filter_by(self.tmpdir, extra=['v12', 'raw'])
expected = ['_namespace_obj.attr_timescale.raw.v12.ext']
self.assertEqual(alf_files, expected, 'failed to filter extra attributes as list')
alf_files, _ = alfio.filter_by(self.tmpdir, extra=['foo', 'v12'])
self.assertEqual(alf_files, [], 'failed to filter extra attributes')
# Assert kwarg validation; should raise TypeError
with self.assertRaises(TypeError):
alfio.filter_by(self.tmpdir, unknown=None)
# Check regular expression search
alf_files, _ = alfio.filter_by(self.tmpdir, object='^wheel.*', wildcards=False)
expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy']
self.assertCountEqual(alf_files, expected, 'failed to filter by regex')
# Should work with lists
alf_files, _ = alfio.filter_by(self.tmpdir, object=['^wheel$', '.*Moves'], wildcards=False)
self.assertCountEqual(alf_files, expected, 'failed to filter by regex')
[docs] def tearDown(self) -> None:
shutil.rmtree(self.tmpdir)
[docs]class TestsAlf(unittest.TestCase):
[docs] def setUp(self) -> None:
# riri, fifi and loulou are huey, duey and louie in French (Donald nephews for ignorants)
self.tmpdir = Path(tempfile.gettempdir()) / 'iotest'
self.tmpdir.mkdir(exist_ok=True)
self.vfile = self.tmpdir / 'toto.titi.npy'
self.tfile = self.tmpdir / 'toto.timestamps.npy'
self.object_files = [self.tmpdir / 'neuveu.riri.npy',
self.tmpdir / 'neuveu.fifi.npy',
self.tmpdir / 'neuveu.loulou.npy',
self.tmpdir / 'object.attribute.part1.part2.npy',
self.tmpdir / 'object.attribute.part1.npy',
self.tmpdir / 'neuveu.foobar_matlab.npy']
for f in self.object_files:
shape = (5, 1) if 'matlab' in str(f) else (5,)
np.save(file=f, arr=np.random.rand(*shape))
self.object_files.append(self.tmpdir / 'neuveu.timestamps.npy')
np.save(file=self.object_files[-1], arr=np.ones((2, 2)))
[docs] def test_exists(self):
self.assertFalse(alfio.exists(self.tmpdir, 'asodiujfas'))
self.assertTrue(alfio.exists(self.tmpdir, 'neuveu'))
# test with attribute string only
self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', attributes='riri'))
# test with list of attributes
self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', attributes=['riri', 'fifi']))
self.assertFalse(alfio.exists(self.tmpdir, 'neuveu', attributes=['riri', 'fifiasdf']))
# test with extras
self.assertTrue(alfio.exists(self.tmpdir, 'object', extra='part2'))
self.assertTrue(alfio.exists(self.tmpdir, 'object', extra=['part1', 'part2']))
self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', extra=None))
# test with wildcards
self.assertTrue(alfio.exists(self.tmpdir, 'neu*', attributes='riri'))
# globing with list: an empty part should return true as well
self.assertTrue(alfio.exists(self.tmpdir, 'object', extra=['']))
[docs] def test_read_ts(self):
# simplest test possible with one column in each file
t = np.arange(0, 10)
d = np.random.rand(10)
np.save(self.vfile, d)
np.save(self.tfile, t)
t_, d_ = alfio.read_ts(self.vfile)
self.assertTrue(np.all(t_ == t))
self.assertTrue(np.all(d_ == d))
# Test expands timeseries and deals with single column 2D vectors
t = np.array([[0, 10], [0.3, 0.4]]).T
d = np.random.rand(10, 1)
np.save(self.vfile, d)
np.save(self.tfile, t)
t_, d_ = alfio.read_ts(str(self.vfile))
self.assertEqual(d_.ndim, 1)
expected = np.around(np.arange(t[0, 1], t[1, 1], .01)[:-1], 2)
np.testing.assert_array_equal(t_, expected)
self.tfile.unlink()
with self.assertRaises(FileNotFoundError):
alfio.read_ts(self.vfile)
[docs] def test_load_object(self):
# first usage of load object is to provide one of the files belonging to the object
expected_keys = {'riri', 'fifi', 'loulou', 'foobar_matlab', 'timestamps'}
obj = alfio.load_object(self.object_files[0])
self.assertTrue(obj.keys() == expected_keys)
# Check flattens single column 2D vectors
self.assertTrue(all([obj[o].shape == (5,) for o in obj]))
# the second usage is to provide a directory and the object name
obj = alfio.load_object(self.tmpdir, 'neuveu')
self.assertTrue(obj.keys() == expected_keys)
self.assertTrue(all([obj[o].shape == (5,) for o in obj]))
# providing directory without object will return all ALF files
with self.assertRaises(ValueError) as context:
alfio.load_object(self.tmpdir)
self.assertTrue('object name should be provided too' in str(context.exception))
# Check key conflicts
np.save(file=str(self.tmpdir / 'neuveu.loulou.extra.npy'), arr=np.random.rand(5,))
obj = alfio.load_object(self.tmpdir, 'neuveu', short_keys=False)
self.assertTrue('loulou.extra' in obj)
with self.assertRaises(AssertionError):
alfio.load_object(self.tmpdir, 'neuveu', short_keys=True)
# the third usage is to provide file list
obj = alfio.load_object(self.object_files[:3], short_keys=False)
self.assertEqual(3, len(obj))
# Check dimension mismatch
data = np.random.rand(list(obj.values())[0].size + 1)
np.save(file=str(self.object_files[0]), arr=data) # Save a new size
with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING) as log:
alfio.load_object(self.tmpdir, 'neuveu', short_keys=False)
self.assertIn(str(data.shape), log.output[0])
[docs] def test_save_npy(self):
# test with straight vectors
a = {'riri': np.random.rand(100),
'fifi': np.random.rand(100)}
alfio.save_object_npy(self.tmpdir, a, 'neuveux')
# read after write
b = alfio.load_object(self.tmpdir, 'neuveux')
for k in a:
self.assertTrue(np.all(a[k] == b[k]))
# test with more exotic shapes, still valid
a = {'riri': np.random.rand(100),
'fifi': np.random.rand(100, 2),
'loulou': np.random.rand(1, 2)}
alfio.save_object_npy(self.tmpdir, a, 'neuveux')
# read after write
b = alfio.load_object(self.tmpdir, 'neuveux')
for k in a:
self.assertTrue(np.all(a[k] == b[k]))
# test with non allowed shape
a = {'riri': np.random.rand(100),
'fifi': np.random.rand(100, 2),
'loulou': np.random.rand(5, 2)}
with self.assertRaises(Exception) as context:
alfio.save_object_npy(self.tmpdir, a, 'neuveux')
self.assertTrue('Dimensions are not consistent' in str(context.exception))
[docs] def test_check_dimensions(self):
a = {'a': np.ones([10, 10]), 'b': np.ones([10, 2]), 'c': np.ones([10])}
status = alfio.check_dimensions(a)
self.assertTrue(status == 0)
a = {'a': np.ones([10, 10]), 'b': np.ones([10, 1]), 'c': np.ones([10])}
status = alfio.check_dimensions(a)
self.assertTrue(status == 0)
a = {'a': np.ones([10, 15]), 'b': np.ones([1, 15]), 'c': np.ones([10])}
status = alfio.check_dimensions(a)
self.assertTrue(status == 0)
a = {'a': np.ones([9, 10]), 'b': np.ones([10, 1]), 'c': np.ones([10])}
status = alfio.check_dimensions(a)
self.assertTrue(status == 1)
# test for timestamps which is an exception to the rule
a = {'a': np.ones([10, 15]), 'b': np.ones([1, 15]), 'c': np.ones([10])}
a['timestamps'] = np.ones([2, 2])
a['timestamps_titi'] = np.ones([10, 1])
status = alfio.check_dimensions(a)
self.assertTrue(status == 0)
a['timestamps'] = np.ones([2, 4])
status = alfio.check_dimensions(a)
self.assertTrue(status == 1)
[docs] def test_ts2vec(self):
n = 10
# Test interpolate
ts = np.array([[0, 10], [0, 100]]).T
ts_ = alfio.ts2vec(ts, n)
np.testing.assert_array_equal(ts_.astype(int), np.arange(0, 100, 10, dtype=int))
# Test flatten
ts = np.ones((n, 1))
ts_ = alfio.ts2vec(ts, n)
np.testing.assert_array_equal(ts_, np.ones(n))
# Test identity
np.testing.assert_array_equal(ts_, alfio.ts2vec(ts_, n))
# Test ValueError
with self.assertRaises(ValueError):
alfio.ts2vec(np.empty((n, 2, 3)), n)
[docs] def tearDown(self) -> None:
shutil.rmtree(self.tmpdir)
[docs]class TestsLoadFile(unittest.TestCase):
[docs] def setUp(self) -> None:
# riri, fifi and loulou are huey, duey and louie in French (Donald nephews for ignorants)
self.tmpdir = tempfile.TemporaryDirectory()
self.empty = Path(self.tmpdir.name) / 'foo.bar.npy'
self.empty.touch()
self.npy = Path(self.tmpdir.name) / 'foo.baz.npy'
np.save(file=self.npy, arr=np.random.rand(5))
self.csv = Path(self.tmpdir.name) / 'foo.baz.csv'
with open(self.csv, 'w') as f:
f.write('a,b,c\n1,2,3')
self.ssv = Path(self.tmpdir.name) / 'foo.baz.ssv'
with open(self.ssv, 'w') as f:
f.write('a b c\n1 2 3')
self.tsv = Path(self.tmpdir.name) / 'foo.baz.tsv'
with open(self.tsv, 'w') as f:
f.write('a\tb\tc\n1\t2\t3')
self.json1 = Path(self.tmpdir.name) / 'foo.baz.json'
with open(self.json1, 'w') as f:
json.dump({'a': [1, 2, 3], 'b': [4, 5, 6]}, f)
self.json2 = Path(self.tmpdir.name) / '_broken_foo.baz.json'
with open(self.json2, 'w') as f:
f.write('{"a": [1, 2, 3],"b": [4, 5 6]}')
[docs] def test_load_file_content(self):
self.assertIsNone(alfio.load_file_content(self.empty))
self.assertIsInstance(alfio.load_file_content(self.npy), np.ndarray)
for file in (self.csv, self.ssv, self.tsv):
with self.subTest('Loading text files', delim=file.suffix):
loaded = alfio.load_file_content(file)
self.assertEqual(3, loaded.size)
self.assertCountEqual(loaded.columns, ['a', 'b', 'c'])
loaded = alfio.load_file_content(self.json1)
self.assertCountEqual(loaded.keys(), ['a', 'b'])
self.assertIsNone(alfio.load_file_content(self.json2))
[docs] def tearDown(self) -> None:
self.tmpdir.cleanup()
[docs]class TestUUID_Files(unittest.TestCase):
[docs] def test_remove_uuid(self):
with tempfile.TemporaryDirectory() as dir:
f1 = Path(dir).joinpath('tutu.part1.part1.30c09473-4d3d-4f51-9910-c89a6840096e.json')
f2 = Path(dir).joinpath('tata.part1.part1.json')
f3 = Path(dir).joinpath('toto.json')
f1.touch()
f2.touch()
f2.touch()
self.assertTrue(alfio.remove_uuid_file(f1) ==
Path(dir).joinpath('tutu.part1.part1.json'))
self.assertTrue(alfio.remove_uuid_file(f2) ==
Path(dir).joinpath('tata.part1.part1.json'))
self.assertTrue(alfio.remove_uuid_file(f3) ==
Path(dir).joinpath('toto.json'))
self.assertTrue(alfio.remove_uuid_file(str(f3)) ==
Path(dir).joinpath('toto.json'))
[docs] def test_remove_uuid_recusive(self):
uuid = '30c09473-4d3d-4f51-9910-c89a6840096e'
with tempfile.TemporaryDirectory() as dir:
f1 = Path(dir).joinpath(f'tutu.part1.part1.{uuid}.json')
f2 = Path(dir).joinpath('tata.part1.part1.json')
f3 = Path(dir).joinpath('toto.json')
f4 = Path(dir).joinpath('collection', f'tutu.part1.part1.{uuid}.json')
f1.touch()
f2.touch()
f2.touch()
f3.touch()
f4.parent.mkdir()
f4.touch()
alfio.remove_uuid_recursive(Path(dir))
self.assertFalse(len(list(Path(dir).rglob(f'*{uuid}*'))))
[docs]class TestALFFolders(unittest.TestCase):
tempdir = None
[docs] @classmethod
def setUpClass(cls) -> None:
cls.tempdir = tempfile.TemporaryDirectory()
cls.session_path = (Path(cls.tempdir.name)
.joinpath('fakelab', 'Subjects', 'fakemouse', '1900-01-01', '001'))
cls.session_path.mkdir(parents=True)
[docs] @classmethod
def tearDownClass(cls) -> None:
cls.tempdir.cleanup()
[docs] def test_next_num_folder(self):
self.session_path.rmdir() # Remove '001' folder
next_num = alfio.next_num_folder(self.session_path.parent)
self.assertEqual('001', next_num)
self.session_path.parent.rmdir() # Remove date folder
next_num = alfio.next_num_folder(self.session_path.parent)
self.assertEqual('001', next_num)
self.session_path.parent.joinpath(next_num).mkdir(parents=True) # Add '001' folder
next_num = alfio.next_num_folder(self.session_path.parent)
self.assertEqual('002', next_num)
self.session_path.parent.joinpath('053').mkdir() # Add '053' folder
next_num = alfio.next_num_folder(self.session_path.parent)
self.assertEqual('054', next_num)
self.session_path.parent.joinpath('099').mkdir() # Add '099' folder
next_num = alfio.next_num_folder(self.session_path.parent)
self.assertEqual('100', next_num)
self.session_path.parent.joinpath('999').mkdir() # Add '999' folder
with self.assertRaises(AssertionError):
alfio.next_num_folder(self.session_path.parent)
[docs] def test_remove_empty_folders(self):
root = Path(self.tempdir.name) / 'glob_dir'
root.mkdir()
root.joinpath('empty0').mkdir(exist_ok=True)
root.joinpath('full0').mkdir(exist_ok=True)
root.joinpath('full0', 'file.txt').touch()
self.assertTrue(len(list(root.glob('*'))) == 2)
alfio.remove_empty_folders(root)
self.assertTrue(len(list(root.glob('*'))) == 1)
if __name__ == "__main__":
unittest.main(exit=False, verbosity=2)