Source code for one.tests.alf.test_alf_io

"""Unit tests for the one.alf.io module."""
import logging
import unittest
import unittest.mock
import tempfile
from pathlib import Path
import shutil
import json
import uuid
import yaml

import numpy as np
import numpy.testing
import pandas as pd

from iblutil.io import jsonable

import one.alf.io as alfio
from one.alf.exceptions import ALFObjectNotFound
from one.alf.spec import FILE_SPEC, regex
from one.alf.path import ALFPath

try:
    import sparse
    SKIP_SPARSE = False
except ModuleNotFoundError:
    SKIP_SPARSE = True


[docs] class TestAlfBunch(unittest.TestCase):
[docs] def test_to_dataframe_scalars(self): simple = alfio.AlfBunch({'titi': np.random.rand(500), 'toto': np.random.rand(500)}) df = simple.to_df() self.assertTrue(np.all(df['titi'].values == simple.titi)) self.assertTrue(np.all(df['toto'].values == simple.toto)) self.assertTrue(len(df.columns) == 2) simple['titi'] = np.random.rand(50) with self.assertRaises(ValueError): simple.to_df() simple['toto'] = np.random.rand(50, 10, 5) with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING): self.assertTrue('toto' not in simple.to_df().columns)
[docs] def test_to_dataframe_vectors(self): vectors = alfio.AlfBunch({'titi': np.random.rand(500, 1), 'toto': np.random.rand(500), 'tata': np.random.rand(500, 12)}) df = vectors.to_df() self.assertTrue(np.all(df['titi'].values == vectors.titi[:, 0])) self.assertTrue(np.all(df['toto'].values == vectors.toto)) self.assertTrue(np.all(df['tata_0'].values == vectors.tata[:, 0])) self.assertTrue(np.all(df['tata_1'].values == vectors.tata[:, 1])) self.assertTrue(len(df.columns) == 12) self.assertEqual(10, len(df.filter(regex=r'tata_\d+', axis=1).columns), 'failed to truncate columns')
[docs] def test_from_dataframe(self): """Tests for AlfBunch.from_df method""" cols = ['foo_0', 'foo_1', 'bar_0', 'bar_1', 'baz'] df = pd.DataFrame(np.random.rand(10, 5), columns=cols) a = alfio.AlfBunch.from_df(df) self.assertIsInstance(a, alfio.AlfBunch) self.assertCountEqual(['foo', 'bar', 'baz'], a.keys()) numpy.testing.assert_array_equal(df['foo_0'], a['foo'][:, 0])
[docs] def test_append_numpy(self): a = alfio.AlfBunch({'titi': np.random.rand(500), 'toto': np.random.rand(500)}) b = alfio.AlfBunch({}) # test with empty elements self.assertTrue(np.all(np.equal(a.append({})['titi'], a['titi']))) self.assertTrue(np.all(np.equal(b.append(a)['titi'], a['titi']))) self.assertEqual(b.append({}), {}) # test with numpy arrays b = alfio.AlfBunch({'titi': np.random.rand(250), 'toto': np.random.rand(250)}) c = a.append(b) t = np.all(np.equal(c['titi'][0:500], a['titi'])) t &= np.all(np.equal(c['toto'][0:500], a['toto'])) t &= np.all(np.equal(c['titi'][500:], b['titi'])) t &= np.all(np.equal(c['toto'][500:], b['toto'])) self.assertTrue(t) a.append(b, inplace=True) self.assertTrue(np.all(np.equal(c['toto'], a['toto']))) self.assertTrue(np.all(np.equal(c['titi'], a['titi']))) # test warning thrown when uneven append occurs with self.assertLogs('one.alf.io', logging.WARNING): a.append({ 'titi': np.random.rand(10), 'toto': np.random.rand(4) })
[docs] def test_append_list(self): # test with lists a = alfio.AlfBunch({'titi': [0, 1, 3], 'toto': ['a', 'b', 'c']}) b = alfio.AlfBunch({'titi': [1, 2, 4], 'toto': ['d', 'e', 'f']}) c = a.append(b) self.assertTrue(len(c['toto']) == 6) self.assertTrue(len(a['toto']) == 3) c = c.append(b) self.assertTrue(len(c['toto']) == 9) self.assertTrue(len(a['toto']) == 3) c.append(b, inplace=True) self.assertTrue(len(c['toto']) == 12) self.assertTrue(len(a['toto']) == 3) with self.assertRaises(NotImplementedError): a.append(alfio.AlfBunch({'foobar': [8, 9, 10]})) a['foobar'] = '123' with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING) as log: a.append({'titi': [5], 'toto': [8], 'foobar': 'd'}) self.assertTrue('str' in log.output[0])
[docs] def test_check_dimensions(self): a = alfio.AlfBunch({'titi': np.array([0, 1, 3]), 'toto': np.array(['a', 'b', 'c'])}) self.assertFalse(a.check_dimensions) a['titi'] = np.append(a['titi'], 4) self.assertTrue(a.check_dimensions)
[docs] class TestsAlfPartsFilters(unittest.TestCase):
[docs] def setUp(self) -> None: self.tmpdir = Path(tempfile.gettempdir()) / 'iotest' self.tmpdir.mkdir(exist_ok=True)
[docs] def test_npy_parts_and_file_filters(self): a = {'riri': np.random.rand(100), 'fifi': np.random.rand(100)} alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts='tutu') alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts='tutu', timescale='toto') self.assertTrue(alfio.exists(self.tmpdir, 'neuveux')) b = alfio.load_object(self.tmpdir, 'neuveux', short_keys=True) # Should include timescale in keys self.assertCountEqual(list(b.keys()), ['fifi', 'fifi_toto', 'riri', 'riri_toto']) for k in a: self.assertTrue(np.all(a[k] == b[k])) # Test load with extra filter b = alfio.load_object(self.tmpdir, 'neuveux', timescale='toto', short_keys=True) self.assertCountEqual(list(b.keys()), ['fifi_toto', 'riri_toto']) with self.assertRaises(ALFObjectNotFound): alfio.load_object(self.tmpdir, 'neuveux', timescale='toto', namespace='baz') # also test file filters through wildcard self.assertTrue(alfio.exists(self.tmpdir, 'neu*')) c = alfio.load_object(self.tmpdir, 'neuveux', timescale='to*', short_keys=True) self.assertEqual(set(c.keys()), set([k for k in c.keys() if k.endswith('toto')])) # test with the long keys b = alfio.load_object(self.tmpdir, 'neuveux', short_keys=False) expected = ['fifi.tutu', 'fifi_toto.tutu', 'riri.tutu', 'riri_toto.tutu'] self.assertCountEqual(list(b.keys()), expected) # Test duplicate attributes alfio.save_object_npy(self.tmpdir, a, 'neuveux', parts=['tutu', 'titi']) with self.assertRaises(AssertionError): alfio.load_object(self.tmpdir, 'neuveux', short_keys=True) # Restricting by extra parts and using long keys should succeed alfio.load_object(self.tmpdir, 'neuveux', extra=['tutu', 'titi']) alfio.load_object(self.tmpdir, 'neuveux', short_keys=False)
[docs] def test_filter_by(self): """Test for one.alf.io.filter_by""" spec_idx_map = regex(FILE_SPEC).groupindex file_names = [ 'noalf.file', '_ibl_trials.intervals.npy', '_ibl_trials.intervals_bpod.csv', 'wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy', '_namespace_obj.attr_timescale.raw.v12.ext'] for f in file_names: (self.tmpdir / f).touch() # Test filter with None; should return files with no non-standard timescale alf_files, _ = alfio.filter_by(self.tmpdir, timescale=None) expected = [ 'wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy', '_ibl_trials.intervals.npy'] self.assertTrue(all(isinstance(x, ALFPath) for x in alf_files)) self.assertCountEqual( alf_files, map(ALFPath, expected), 'failed to filter with None attribute') # Test filtering by object; should return only 'wheel' ALF objects alf_files, parts = alfio.filter_by(self.tmpdir, object='wheel') expected = ['wheel.position.npy', 'wheel.timestamps.npy'] self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by object') self.assertEqual(len(alf_files), len(parts)) # Test wildcards; should return 'wheel' and 'wheelMoves' ALF objects alf_files, _ = alfio.filter_by(self.tmpdir, object='wh*') expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy'] self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter with wildcard') # Test wildcard arrays alf_files, _ = alfio.filter_by(self.tmpdir, object='wh*', attribute=['time*', 'pos*']) expected = ['wheel.position.npy', 'wheel.timestamps.npy'] self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter with wildcard') # Test filtering by specific timescale; test parts returned alf_files, parts = alfio.filter_by(self.tmpdir, timescale='bpod') expected = [ALFPath('_ibl_trials.intervals_bpod.csv')] self.assertEqual(alf_files, expected, 'failed to filter by timescale') expected = ('ibl', 'trials', 'intervals', 'bpod', None, 'csv') self.assertTupleEqual(parts[0], expected) self.assertEqual(len(parts[0]), len(spec_idx_map)) self.assertEqual(parts[0][spec_idx_map['timescale'] - 1], 'bpod') # Test filtering multiple attributes; should return only trials intervals alf_files, _ = alfio.filter_by(self.tmpdir, attribute='intervals', object='trials') expected = ['_ibl_trials.intervals.npy', '_ibl_trials.intervals_bpod.csv'] self.assertCountEqual( alf_files, map(ALFPath, expected), 'failed to filter by multiple attribute') # Test returning only ALF files alf_files, _ = alfio.filter_by(self.tmpdir) self.assertCountEqual( alf_files, map(ALFPath, file_names[1:]), 'failed to return ALF files') # Test return empty out = alfio.filter_by(self.tmpdir, object=None) self.assertEqual(out, ([], [])) # Test extras alf_files, _ = alfio.filter_by(self.tmpdir, extra='v12') expected = [ALFPath('_namespace_obj.attr_timescale.raw.v12.ext')] self.assertEqual(alf_files, expected, 'failed to filter extra attributes') alf_files, _ = alfio.filter_by(self.tmpdir, extra=['v12', 'raw']) expected = [ALFPath('_namespace_obj.attr_timescale.raw.v12.ext')] self.assertEqual(alf_files, expected, 'failed to filter extra attributes as list') alf_files, _ = alfio.filter_by(self.tmpdir, extra=['foo', 'v12']) self.assertEqual(alf_files, [], 'failed to filter extra attributes') # Assert kwarg validation; should raise TypeError with self.assertRaises(TypeError): alfio.filter_by(self.tmpdir, unknown=None) # Check regular expression search alf_files, _ = alfio.filter_by(self.tmpdir, object='^wheel.*', wildcards=False) expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy'] self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by regex') # Should work with lists alf_files, _ = alfio.filter_by(self.tmpdir, object=['^wheel$', '.*Moves'], wildcards=False) self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by regex')
[docs] def tearDown(self) -> None: shutil.rmtree(self.tmpdir)
[docs] class TestsAlf(unittest.TestCase):
[docs] def setUp(self) -> None: # riri, fifi and loulou are huey, duey and louie in French (Donald nephews for ignorants) self.tmpdir = Path(tempfile.gettempdir()) / 'iotest' self.tmpdir.mkdir(exist_ok=True) self.vfile = self.tmpdir / 'toto.titi.npy' self.tfile = self.tmpdir / 'toto.timestamps.npy' self.object_files = [self.tmpdir / 'neuveu.riri.npy', self.tmpdir / 'neuveu.fifi.npy', self.tmpdir / 'neuveu.loulou.npy', self.tmpdir / 'object.attribute.part1.part2.npy', self.tmpdir / 'object.attribute.part1.npy', self.tmpdir / 'neuveu.foobar_matlab.npy'] for f in self.object_files: shape = (5, 1) if 'matlab' in str(f) else (5,) np.save(file=f, arr=np.random.rand(*shape)) self.object_files.append(self.tmpdir / 'neuveu.timestamps.npy') np.save(file=self.object_files[-1], arr=np.ones((2, 2))) # Save an obj.data pqt file self.object_files.append(self.tmpdir / 'obj.table.pqt') cols = ['foo_0', 'foo_1', 'bar_0', 'bar_1', 'baz'] pd.DataFrame(np.random.rand(10, 5), columns=cols).to_parquet(self.object_files[-1])
[docs] def test_exists(self): """Test for one.alf.io.exists.""" self.assertFalse(alfio.exists(self.tmpdir, 'asodiujfas')) self.assertTrue(alfio.exists(self.tmpdir, 'neuveu')) # test with attribute string only self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', attributes='riri')) # test with list of attributes self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', attributes=['riri', 'fifi'])) self.assertFalse(alfio.exists(self.tmpdir, 'neuveu', attributes=['riri', 'fifiasdf'])) # test with extras self.assertTrue(alfio.exists(self.tmpdir, 'object', extra='part2')) self.assertTrue(alfio.exists(self.tmpdir, 'object', extra=['part1', 'part2'])) self.assertTrue(alfio.exists(self.tmpdir, 'neuveu', extra=None)) # test with wildcards self.assertTrue(alfio.exists(self.tmpdir, 'neu*', attributes='riri')) # globing with list: an empty part should return true as well self.assertTrue(alfio.exists(self.tmpdir, 'object', extra=['']))
[docs] def test_metadata_columns(self): # simple test with meta data to label columns file_alf = self.tmpdir / '_ns_object.attribute.npy' data = np.random.rand(500, 4) cols = ['titi', 'tutu', 'toto', 'tata'] np.save(file_alf, data) np.save(self.tmpdir / '_ns_object.gnagna.npy', data[:, -1]) path = alfio.save_metadata(file_alf, {'columns': cols}) self.assertIsInstance(path, ALFPath) self.assertEqual('_ns_object.attribute.metadata.json', path.name) dread = alfio.load_object(self.tmpdir, 'object', namespace='ns', short_keys=False) self.assertTrue(np.all(dread['titi'] == data[:, 0])) self.assertTrue(np.all(dread['gnagna'] == data[:, -1])) # add another field to the metadata alfio.save_metadata(file_alf, {'columns': cols, 'unit': 'potato'}) dread = alfio.load_object(self.tmpdir, 'object', namespace='ns', short_keys=False) self.assertTrue(np.all(dread['titi'] == data[:, 0])) self.assertTrue(dread['attributemetadata']['unit'] == 'potato') self.assertTrue(np.all(dread['gnagna'] == data[:, -1]))
[docs] def test_metadata_columns_UUID(self): data = np.random.rand(500, 4) # test with UUID extra field file_alf = self.tmpdir / '_ns_obj.attr1.2622b17c-9408-4910-99cb-abf16d9225b9.npy' np.save(file_alf, data) cols = ['titi', 'tutu', 'toto', 'tata'] file_meta = file_alf.parent / (file_alf.stem + '.metadata.json') with open(file_meta, 'w+') as fid: fid.write(json.dumps({'columns': cols}, indent=1)) dread = alfio.load_object(self.tmpdir, 'obj', namespace='ns', short_keys=False) self.assertTrue(np.all(dread['titi'] == data[:, 0]))
[docs] def test_read_ts(self): """Test for one.alf.io.read_ts""" # simplest test possible with one column in each file t = np.arange(0, 10) d = np.random.rand(10) np.save(self.vfile, d) np.save(self.tfile, t) t_, d_ = alfio.read_ts(self.vfile) self.assertTrue(np.all(t_ == t)) self.assertTrue(np.all(d_ == d)) # Test expands timeseries and deals with single column 2D vectors t = np.array([[0, 10], [0.3, 0.4]]).T d = np.random.rand(10, 1) np.save(self.vfile, d) np.save(self.tfile, t) t_, d_ = alfio.read_ts(str(self.vfile)) self.assertEqual(d_.ndim, 1) expected = np.around(np.arange(t[0, 1], t[1, 1], .01)[:-1], 2) np.testing.assert_array_equal(t_, expected) self.tfile.unlink() with self.assertRaises(FileNotFoundError): alfio.read_ts(self.vfile)
[docs] def test_load_object(self): """Test for one.alf.io.load_object""" # first usage of load object is to provide one of the files belonging to the object expected_keys = {'riri', 'fifi', 'loulou', 'foobar_matlab', 'timestamps'} obj = alfio.load_object(self.object_files[0]) self.assertTrue(obj.keys() == expected_keys) # Check flattens single column 2D vectors self.assertTrue(all([obj[o].shape == (5,) for o in obj])) # the second usage is to provide a directory and the object name obj = alfio.load_object(self.tmpdir, 'neuveu') self.assertTrue(obj.keys() == expected_keys) self.assertTrue(all([obj[o].shape == (5,) for o in obj])) # providing directory without object will return all ALF files with self.assertRaises(ValueError) as context: alfio.load_object(self.tmpdir) self.assertTrue('object name should be provided too' in str(context.exception)) # Check key conflicts np.save(file=str(self.tmpdir / 'neuveu.loulou.extra.npy'), arr=np.random.rand(5,)) obj = alfio.load_object(self.tmpdir, 'neuveu', short_keys=False) self.assertTrue('loulou.extra' in obj) with self.assertRaises(AssertionError): alfio.load_object(self.tmpdir, 'neuveu', short_keys=True) # the third usage is to provide file list obj = alfio.load_object(self.object_files[:3], short_keys=False) self.assertEqual(3, len(obj)) # Check dimension mismatch data = np.random.rand(list(obj.values())[0].size + 1) np.save(file=str(self.object_files[0]), arr=data) # Save a new size with self.assertLogs(logging.getLogger('one.alf.io'), logging.WARNING) as log: alfio.load_object(self.tmpdir, 'neuveu', short_keys=False) self.assertIn(str(data.shape), log.output[0]) # Check loading of 'table' attribute obj = alfio.load_object(self.tmpdir, 'obj') self.assertIsInstance(obj, alfio.AlfBunch) self.assertCountEqual(obj.keys(), ['foo', 'bar', 'baz']) self.assertEqual(obj['foo'].shape, (10, 2)) self.assertEqual(obj['bar'].shape, (10, 2)) self.assertEqual(obj['baz'].shape, (10,)) # Check behaviour on conflicting keys np.save(self.tmpdir.joinpath('obj.baz.npy'), np.arange(len(obj['foo']))) new_obj = alfio.load_object(self.tmpdir, 'obj') self.assertNotIn('table', new_obj) np.testing.assert_array_equal(new_obj['baz'], obj['baz'], 'Table attribute should take precedent') # Check behaviour loading table with long keys / extra ALF parts table_file = next(self.tmpdir.glob('*table*')) new_name = table_file.stem + '_clock.extra' + table_file.suffix table_file.rename(table_file.parent.joinpath(new_name)) new_obj = alfio.load_object(self.tmpdir, 'obj') expected = ['baz', 'baz_clock.extra', 'bar_clock.extra', 'foo_clock.extra'] self.assertCountEqual(expected, new_obj.keys())
[docs] def test_ls(self): """Test for one.alf.io._ls""" # Test listing all ALF files in a directory alf_files, _ = alfio._ls(self.tmpdir) self.assertIsInstance(alf_files[0], ALFPath) self.assertEqual(8, len(alf_files)) # Test with filepath alf_files, parts = alfio._ls(sorted(alf_files)[0]) self.assertEqual(5, len(alf_files)) self.assertTrue(all(x[1] == 'neuveu') for x in parts) # Test non-existent with self.assertRaises(ALFObjectNotFound): alfio._ls(self.tmpdir.joinpath('foobar'))
[docs] def test_save_npy(self): """Test for one.alf.io.save_npy""" # test with straight vectors a = {'riri': np.random.rand(100), 'fifi': np.random.rand(100)} alfio.save_object_npy(self.tmpdir, a, 'neuveux') # read after write b = alfio.load_object(self.tmpdir, 'neuveux') for k in a: self.assertTrue(np.all(a[k] == b[k])) # test with more exotic shapes, still valid a = {'riri': np.random.rand(100), 'fifi': np.random.rand(100, 2), 'loulou': np.random.rand(1, 2)} alfio.save_object_npy(self.tmpdir, a, 'neuveux') # read after write b = alfio.load_object(self.tmpdir, 'neuveux') for k in a: self.assertTrue(np.all(a[k] == b[k])) # test with non allowed shape a = {'riri': np.random.rand(100), 'fifi': np.random.rand(100, 2), 'loulou': np.random.rand(5, 2)} with self.assertRaises(Exception) as context: alfio.save_object_npy(self.tmpdir, a, 'neuveux') self.assertTrue('Dimensions are not consistent' in str(context.exception))
[docs] def test_check_dimensions(self): """Test for one.alf.io.check_dimensions""" a = {'a': np.ones([10, 10]), 'b': np.ones([10, 2]), 'c': np.ones([10])} status = alfio.check_dimensions(a) self.assertTrue(status == 0) a = {'a': np.ones([10, 10]), 'b': np.ones([10, 1]), 'c': np.ones([10])} status = alfio.check_dimensions(a) self.assertTrue(status == 0) a = {'a': np.ones([10, 15]), 'b': np.ones([1, 15]), 'c': np.ones([10])} status = alfio.check_dimensions(a) self.assertTrue(status == 0) a = {'a': np.ones([9, 10]), 'b': np.ones([10, 1]), 'c': np.ones([10])} status = alfio.check_dimensions(a) self.assertTrue(status == 1) # test for timestamps which is an exception to the rule a = {'a': np.ones([10, 15]), 'b': np.ones([1, 15]), 'c': np.ones([10])} a['timestamps'] = np.ones([2, 2]) a['timestamps_titi'] = np.ones([10, 1]) status = alfio.check_dimensions(a) self.assertTrue(status == 0) a['timestamps'] = np.ones([2, 4]) status = alfio.check_dimensions(a) self.assertTrue(status == 1)
[docs] def test_ts2vec(self): """Test for one.alf.io.ts2vec""" n = 10 # Test interpolate ts = np.array([[0, 10], [0, 100]]).T ts_ = alfio.ts2vec(ts, n) np.testing.assert_array_equal(ts_.astype(int), np.arange(0, 100, 10, dtype=int)) # Test flatten ts = np.ones((n, 1)) ts_ = alfio.ts2vec(ts, n) np.testing.assert_array_equal(ts_, np.ones(n)) # Test identity np.testing.assert_array_equal(ts_, alfio.ts2vec(ts_, n)) # Test ValueError with self.assertRaises(ValueError): alfio.ts2vec(np.empty((n, 2, 3)), n)
[docs] def tearDown(self) -> None: shutil.rmtree(self.tmpdir)
[docs] class TestsLoadFile(unittest.TestCase): """Tests for one.alf.io.load_fil_content function."""
[docs] def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.empty = Path(self.tmpdir.name) / 'foo.bar.npy' self.empty.touch() self.npy = Path(self.tmpdir.name) / 'foo.baz.npy' np.save(file=self.npy, arr=np.random.rand(5)) self.csv = Path(self.tmpdir.name) / 'foo.baz.csv' self.csvuids = Path(self.tmpdir.name) / 'uuids.csv' with open(self.csv, 'w') as f: f.write('a,b,c\n1,2,3') with open(self.csvuids, 'w') as f: f.write('\n'.join(['uuids'] + [str(uuid.uuid4()) for _ in range(6)])) self.ssv = Path(self.tmpdir.name) / 'foo.baz.ssv' with open(self.ssv, 'w') as f: f.write('a b c\n1 2 3') self.tsv = Path(self.tmpdir.name) / 'foo.baz.tsv' with open(self.tsv, 'w') as f: f.write('a\tb\tc\n1\t2\t3') self.json1 = Path(self.tmpdir.name) / 'foo.baz.json' with open(self.json1, 'w') as f: json.dump({'a': [1, 2, 3], 'b': [4, 5, 6]}, f) self.json2 = Path(self.tmpdir.name) / '_broken_foo.baz.json' with open(self.json2, 'w') as f: f.write('{"a": [1, 2, 3],"b": [4, 5 6]}') self.json3 = Path(self.tmpdir.name) / 'foo.baz.jsonable' jsonable.write(self.json3, {'a': [1, 2, 3], 'b': [4, 5, 6]}) self.yaml = Path(self.tmpdir.name) / 'foo.baz.yaml' with open(self.yaml, 'w') as f: yaml.dump({'a': [1, 2, 3], 'b': [4, 5, 6]}, f) self.xyz = Path(self.tmpdir.name) / 'foo.baz.xyz' with open(self.xyz, 'wb') as f: f.write(b'\x00\x00') self.npz1 = Path(self.tmpdir.name) / 'foo.baz.npz' np.savez_compressed(self.npz1, np.random.rand(5)) self.npz2 = Path(self.tmpdir.name) / 'foo.bar.npz' np.savez_compressed(self.npz2, np.random.rand(5), np.random.rand(5))
[docs] def test_load_file_content(self): """Test for one.alf.io.load_file_content""" self.assertIsNone(alfio.load_file_content(self.empty)) # csv / ssv / tsv files self.assertIsInstance(alfio.load_file_content(self.npy), np.ndarray) for file in (self.csv, self.ssv, self.tsv): with self.subTest('Loading text files', delim=file.suffix): loaded = alfio.load_file_content(file) self.assertEqual(3, loaded.size) self.assertCountEqual(loaded.columns, ['a', 'b', 'c']) # a single column file should be squeezed loaded = alfio.load_file_content(self.csvuids) self.assertEqual(loaded.shape, (6, )) loaded = alfio.load_file_content(self.json1) self.assertCountEqual(loaded.keys(), ['a', 'b']) self.assertIsNone(alfio.load_file_content(self.json2)) loaded = alfio.load_file_content(self.json3) self.assertCountEqual(loaded, ['a', 'b']) # Load a parquet file pqt = next(Path(__file__).parents[1].joinpath('fixtures').glob('*.pqt')) loaded = alfio.load_file_content(pqt) self.assertIsInstance(loaded, pd.DataFrame) # Unknown file should return ALFPath file = alfio.load_file_content(str(self.xyz)) self.assertEqual(file, self.xyz) self.assertIsInstance(file, ALFPath) self.assertIsNone(alfio.load_file_content(None)) # Load YAML file loaded = alfio.load_file_content(str(self.yaml)) self.assertCountEqual(loaded.keys(), ['a', 'b']) # Load npz file loaded = alfio.load_file_content(str(self.npz1)) self.assertIsInstance(loaded, np.ndarray, 'failed to unpack') self.assertEqual(loaded.shape, (5,)) loaded = alfio.load_file_content(str(self.npz2)) self.assertIsInstance(loaded, np.lib.npyio.NpzFile, 'failed to return npz array') self.assertEqual(loaded['arr_0'].shape, (5,))
[docs] def tearDown(self) -> None: self.tmpdir.cleanup()
[docs] @unittest.skipIf(SKIP_SPARSE, 'pydata sparse package not installed') class TestsLoadFileNonStandard(unittest.TestCase): """Tests for one.alf.io.load_fil_content function with non-standard libraries."""
[docs] def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.addCleanup(self.tmpdir.cleanup) self.sparse_npz = Path(self.tmpdir.name) / 'foo.baz.sparse_npz' with open(self.sparse_npz, 'wb') as fp: sparse.save_npz(fp, sparse.random((2, 2, 2)))
[docs] def test_load_sparse_npz(self): loaded = alfio.load_file_content(str(self.sparse_npz)) self.assertIsInstance(loaded, sparse.COO) with unittest.mock.patch('sparse.load_npz', side_effect=ModuleNotFoundError), \ self.assertWarns(UserWarning): loaded = alfio.load_file_content(str(self.sparse_npz)) self.assertEqual(loaded, self.sparse_npz)
[docs] class TestUUID_Files(unittest.TestCase):
[docs] def test_remove_uuid_recusive(self): uuid = '30c09473-4d3d-4f51-9910-c89a6840096e' with tempfile.TemporaryDirectory() as dir: f1 = Path(dir).joinpath(f'tutu.part1.part1.{uuid}.json') f2 = Path(dir).joinpath('tata.part1.part1.json') f3 = Path(dir).joinpath('toto.json') f4 = Path(dir).joinpath('collection', f'tutu.part1.part1.{uuid}.json') f1.touch() f2.touch() f2.touch() f3.touch() f4.parent.mkdir() f4.touch() alfio.remove_uuid_recursive(Path(dir)) self.assertFalse(len(list(Path(dir).rglob(f'*{uuid}*'))))
[docs] class TestALFFolders(unittest.TestCase): tempdir = None session_path = None
[docs] @classmethod def setUpClass(cls) -> None: cls.tempdir = tempfile.TemporaryDirectory() cls.session_path = (Path(cls.tempdir.name) .joinpath('fakelab', 'Subjects', 'fakemouse', '1900-01-01', '001')) cls.session_path.mkdir(parents=True)
[docs] @classmethod def tearDownClass(cls) -> None: cls.tempdir.cleanup()
[docs] def tearDown(self) -> None: for obj in reversed(sorted(Path(self.session_path).rglob('*'))): obj.unlink() if obj.is_file() else obj.rmdir()
[docs] def test_next_num_folder(self): """Test for one.alf.io.next_num_folder.""" self.session_path.rmdir() # Remove '001' folder next_num = alfio.next_num_folder(self.session_path.parent) self.assertEqual('001', next_num) self.session_path.parent.rmdir() # Remove date folder next_num = alfio.next_num_folder(self.session_path.parent) self.assertEqual('001', next_num) self.session_path.parent.joinpath(next_num).mkdir(parents=True) # Add '001' folder next_num = alfio.next_num_folder(self.session_path.parent) self.assertEqual('002', next_num) self.session_path.parent.joinpath('053').mkdir() # Add '053' folder next_num = alfio.next_num_folder(self.session_path.parent) self.assertEqual('054', next_num) self.session_path.parent.joinpath('099').mkdir() # Add '099' folder next_num = alfio.next_num_folder(self.session_path.parent) self.assertEqual('100', next_num) self.session_path.parent.joinpath('999').mkdir() # Add '999' folder with self.assertRaises(AssertionError): alfio.next_num_folder(self.session_path.parent)
[docs] def test_remove_empty_folders(self): """Test for one.alf.io.remove_empty_folders.""" root = Path(self.tempdir.name) / 'glob_dir' root.mkdir() root.joinpath('empty0').mkdir(exist_ok=True) root.joinpath('full0').mkdir(exist_ok=True) root.joinpath('full0', 'file.txt').touch() self.assertTrue(len(list(root.glob('*'))) == 2) alfio.remove_empty_folders(root) self.assertTrue(len(list(root.glob('*'))) == 1)
[docs] def test_iter_sessions(self): """Test for one.alf.io.iter_sessions.""" # Create invalid session folder self.session_path.parent.parent.joinpath('bad_session').mkdir() valid_sessions = alfio.iter_sessions(self.tempdir.name) self.assertEqual(next(valid_sessions), self.session_path) self.assertFalse(next(valid_sessions, False)) # makes sure that the session path returns itself on the iterator path = next(alfio.iter_sessions(self.session_path)) self.assertEqual(self.session_path, path) self.assertIsInstance(path, ALFPath) # test pattern arg valid_sessions = list(alfio.iter_sessions( self.tempdir.name, pattern='*/Subjects/*/????-??-??/*')) self.assertEqual([ALFPath(self.session_path)], valid_sessions) subjects_path = Path(self.tempdir.name, 'fakelab', 'Subjects') valid_sessions = alfio.iter_sessions(subjects_path, pattern='*/????-??-??/*') self.assertEqual(self.session_path, next(valid_sessions)) valid_sessions = alfio.iter_sessions(subjects_path, pattern='*/Subjects/*/????-??-??/*') self.assertFalse(next(valid_sessions, False))
[docs] def test_iter_datasets(self): """Test for one.alf.io.iter_datasets.""" # Create valid dataset dset = self.session_path.joinpath('collection', 'object.attribute.ext') dset.parent.mkdir() dset.touch() # Create invalid dataset self.session_path.joinpath('somefile.txt').touch() ses_files = list(alfio.iter_datasets(self.session_path)) self.assertEqual([Path(*dset.parts[-2:])], ses_files) self.assertIsInstance(ses_files[0], ALFPath)
[docs] class TestFindVariants(unittest.TestCase):
[docs] def setUp(self): tmp = tempfile.TemporaryDirectory() self.tmp = Path(tmp.name) self.addCleanup(tmp.cleanup) # Create tree self.session_path = self.tmp / 'subject' / '2020-01-01' / '001' self.dsets = [ self.session_path.joinpath('_x_foo.bar.npy'), self.session_path.joinpath('#2021-01-01#', 'foo.bar.npy'), self.session_path.joinpath(f'bar.baz.{uuid.uuid4()}.npy'), self.session_path.joinpath(f'bar.baz_y.{uuid.uuid4()}.npy'), self.session_path.joinpath('#2021-01-01#', f'bar.baz.{uuid.uuid4()}.npy'), self.session_path.joinpath('task_00', 'x.y.z'), self.session_path.joinpath('x.y.z') ] for f in self.dsets: f.parent.mkdir(exist_ok=True, parents=True) f.touch()
[docs] def test_unique(self): """Test for one.alf.io.find_variants function.""" dupes = alfio.find_variants(self.dsets) self.assertCountEqual(self.dsets, dupes.keys(), 'expected keys to match input files') self.assertFalse(any(map(any, dupes.values())), 'expected no duplicates') paths = filter(None, (*dupes.keys(), *dupes.values())) self.assertTrue(all(isinstance(x, ALFPath) for x in paths)) # With extra=False should treat files with extra parts as a variant dupes = alfio.find_variants(self.dsets, extra=False) # 'bar.baz.abc.npy' is a variant of '#revision#/bar.baz.def.npy' and vice versa self.assertEqual(dupes[self.dsets[2]], [self.dsets[4]]) self.assertEqual(dupes[self.dsets[4]], [self.dsets[2]]) # Expect all other datasets to be considered unique others = [v for k, v in dupes.items() if k not in (self.dsets[2], self.dsets[4])] self.assertFalse(any(map(any, others))) # Treat other file parts as variants files = [self.dsets[0], self.dsets[2], self.dsets[-1]] dupes = alfio.find_variants(files, namespace=False, timescale=False, extra=False) expected_files = (self.dsets[1:2], self.dsets[3:5], []) # expected variants for each file for key, expected in zip(files, expected_files): with self.subTest(key=key): self.assertCountEqual(dupes[self.session_path.joinpath(key)], expected)
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)