Source code for iblutil.tests.test_io

import unittest
from unittest import mock
import uuid
import tempfile
import os
from pathlib import Path
import json
import asyncio

import numpy as np

from iblutil.io.parquet import uuid2np, np2uuid, np2str, str2np
from iblutil.io import params
import iblutil.io.jsonable as jsonable
from iblutil.numerical import intersect2d, ismember2d, ismember


[docs] class TestParquet(unittest.TestCase):
[docs] def test_uuids_conversions(self): str_uuid = 'a3df91c8-52a6-4afa-957b-3479a7d0897c' one_np_uuid = np.array([-411333541468446813, 8973933150224022421]) two_np_uuid = np.tile(one_np_uuid, [2, 1]) # array gives a list self.assertTrue(all(map(lambda x: x == str_uuid, np2str(two_np_uuid)))) # single uuid gives a string self.assertTrue(np2str(one_np_uuid) == str_uuid) # list uuids with some None entries uuid_list = ['bc74f49f33ec0f7545ebc03f0490bdf6', 'c5779e6d02ae6d1d6772df40a1a94243', None, '643371c81724378d34e04a60ef8769f4'] assert np.all(str2np(uuid_list)[2, :] == 0)
[docs] def test_uuids_intersections(self): ntotal = 500 nsub = 17 nadd = 3 eids = uuid2np([uuid.uuid4() for _ in range(ntotal)]) np.random.seed(42) isel = np.floor(np.argsort(np.random.random(nsub)) / nsub * ntotal).astype(np.int16) sids = np.r_[eids[isel, :], uuid2np([uuid.uuid4() for _ in range(nadd)])] np.random.shuffle(sids) # check the intersection v, i0, i1 = intersect2d(eids, sids) assert np.all(eids[i0, :] == sids[i1, :]) assert np.all(np.sort(isel) == np.sort(i0)) v_, i0_, i1_ = np.intersect1d(eids[:, 0], sids[:, 0], return_indices=True) assert np.setxor1d(v_, v[:, 0]).size == 0 assert np.setxor1d(i0, i0_).size == 0 assert np.setxor1d(i1, i1_).size == 0 for a, b in zip(ismember2d(sids, eids), ismember(sids[:, 0], eids[:, 0])): assert np.all(a == b) # check conversion to numpy back and forth uuids = [uuid.uuid4() for _ in np.arange(4)] np_uuids = uuid2np(uuids) assert np2uuid(np_uuids) == uuids
[docs] class TestParams(unittest.TestCase):
[docs] @mock.patch('sys.platform', 'linux') def test_set_hidden(self): with tempfile.TemporaryDirectory() as td: file = Path(td).joinpath('file') file.touch() hidden_file = params.set_hidden(file, True) self.assertFalse(file.exists()) self.assertTrue(hidden_file.exists()) self.assertEqual(hidden_file.name, '.file') params.set_hidden(hidden_file, False) self.assertFalse(hidden_file.exists()) self.assertTrue(file.exists())
[docs] class TestFileLock(unittest.IsolatedAsyncioTestCase): tmp = None
[docs] @classmethod def setUpClass(cls): tmp = tempfile.TemporaryDirectory() cls.tmp = Path(tmp.name) cls.addClassCleanup(tmp.cleanup)
[docs] def setUp(self): self.file = self.tmp / 'foo.bar' self.addCleanup(self.file.unlink, missing_ok=True) self.lock_file = self.file.with_suffix('.lock') self.addCleanup(self.lock_file.unlink, missing_ok=True)
[docs] @mock.patch('iblutil.io.params.time.sleep') def test_file_lock_sync(self, sleep_mock): """Test synchronous FileLock context manager.""" # Check input validation self.assertRaises(ValueError, params.FileLock, self.file, timeout_action='foo') # Check behaviour when lock file doesn't exist (i.e. no other process writing to file) assert not self.lock_file.exists() with params.FileLock(self.file, timeout_action='raise'): self.assertTrue(self.lock_file.exists(), 'Failed to create lock file') self.assertFalse(self.lock_file.exists(), 'Failed to remove lock file upon exit of context manager') sleep_mock.assert_not_called() # no file present so no need to sleep # Check behaviour when lock file present and not removed by other process self.lock_file.touch() assert self.lock_file.exists() lock = params.FileLock(self.file, timeout_action='raise') with self.assertLogs('iblutil.io.params', 10) as lg: self.assertRaises(TimeoutError, lock.__enter__) msg = next((x.getMessage() for x in lg.records if x.levelno == 10), None) self.assertEqual('file lock contents: <empty>', msg) # should try 5 attempts by default; default total timeout is 10 seconds so should sleep 5x for 2 seconds each expected_attempts = 5 sleep_mock.assert_called_with(2) self.assertEqual(expected_attempts, sleep_mock.call_count) self.assertEqual(expected_attempts, len([x for x in lg.records if x.levelno == 20])) msg = next(x.getMessage() for x in lg.records if x.levelno == 20) self.assertRegex(msg, 'file lock found, waiting 2.00 seconds') # Check delete timeout action assert self.lock_file.exists() with self.assertLogs('iblutil.io.params', 10) as lg, \ params.FileLock(self.file, timeout_action='delete'): # Should have replaced empty lock file with timestamped one self.assertTrue(self.lock_file.exists()) with open(self.lock_file, 'r') as fp: lock_info = json.load(fp) self.assertCountEqual(('datetime', 'hostname'), lock_info) self.assertFalse(self.lock_file.exists(), 'Failed to remove lock file upon exit of context manager') self.assertRegex(lg.records[-1].getMessage(), 'stale file lock found, deleting')
async def _mock(self, obj): """ Add side effect to mock object that awaits a future. This is required because async lambdas are not supported. Parameters ---------- obj : unittest.mock.AsyncMock An asynchronous mock object to install side effect for. Returns ------- asyncio.Future A future awaited by input mock object. """ fut = asyncio.get_event_loop().create_future() self.addCleanup(fut.cancel) async def wait(_): return await fut obj.side_effect = wait return fut
[docs] @mock.patch('iblutil.io.params.asyncio.sleep') async def test_file_lock_async(self, sleep_mock): """Test asynchronous FileLock context manager.""" # Check behaviour when lock file doesn't exist (i.e. no other process writing to file) assert not self.lock_file.exists() async with params.FileLock(self.file, timeout_action='raise'): self.assertTrue(self.lock_file.exists(), 'Failed to create lock file') self.assertFalse(self.lock_file.exists(), 'Failed to remove lock file upon exit of context manager') sleep_mock.assert_not_called() # no file present so no need to sleep # Check behaviour when lock file present and not removed by other process self.lock_file.touch() assert self.lock_file.exists() lock = params.FileLock(self.file, timeout=1e-3, timeout_action='raise') # The loop that checks the lock file is too fast when async.sleep is mocked so adding a side # effect that awaits a future that's never set allows the timeout code to execute. await self._mock(sleep_mock) with self.assertLogs('iblutil.io.params', 10) as lg, self.assertRaises(asyncio.TimeoutError): await lock.__aenter__() # fut = asyncio.get_running_loop().create_future() # with mock.patch.object(lock, '_lock_check_async', return_value) as m: # async with params.FileLock(self.file, timeout=1e-3, timeout_action='raise') as lock: # ... sleep_mock.assert_awaited_with(lock._async_poll_freq) msg = next((x.getMessage() for x in lg.records if x.levelno == 10), None) self.assertEqual('file lock contents: <empty>', msg) # Check remove timeout action assert self.lock_file.exists() await self._mock(sleep_mock) with self.assertLogs('iblutil.io.params', 10) as lg: async with params.FileLock(self.file, timeout=1e-5, timeout_action='delete'): # Should have replaced empty lock file with timestamped one self.assertTrue(self.lock_file.exists()) with open(self.lock_file, 'r') as fp: lock_info = json.load(fp) self.assertCountEqual(('datetime', 'hostname', 'pid'), lock_info) self.assertFalse(self.lock_file.exists(), 'Failed to remove lock file upon exit of context manager')
[docs] class TestsJsonable(unittest.TestCase):
[docs] def setUp(self) -> None: self.tfile = tempfile.NamedTemporaryFile(delete=False)
[docs] def testReadWrite(self): data = [{'a': 'thisisa', 'b': 1, 'c': [1, 2, 3]}, {'a': 'thisisb', 'b': 2, 'c': [2, 3, 4]}] jsonable.write(self.tfile.name, data) data2 = jsonable.read(self.tfile.name) self.assertEqual(data, data2) jsonable.append(self.tfile.name, data) data3 = jsonable.read(self.tfile.name) self.assertEqual(data + data, data3)
[docs] def tearDown(self) -> None: self.tfile.close() os.unlink(self.tfile.name)
[docs] class TestLoadTaskData(unittest.TestCase):
[docs] def test_load_task_jsonable(self): jsonable_file = Path(__file__).parent.joinpath('fixtures', 'task_data_short.jsonable') trials_table, bpod_data = jsonable.load_task_jsonable(jsonable_file) assert trials_table.shape[0] == 2 assert len(bpod_data) == 2
[docs] def test_load_task_jsonable_partial(self): jsonable_file = Path(__file__).parent.joinpath('fixtures', 'task_data_short.jsonable') with open(jsonable_file) as fp: fp.readline() offset = fp.tell() trials_table, bpod_data = jsonable.load_task_jsonable(jsonable_file, offset=offset) trials_table_full, bpod_data_full = jsonable.load_task_jsonable(jsonable_file) for c in trials_table.columns: if not np.isnan(trials_table[c][0]): np.testing.assert_equal(trials_table_full[c].values[-1], trials_table[c][0]) assert bpod_data_full[-1] == bpod_data[0]
if __name__ == '__main__': unittest.main(exit=False, verbosity=2)