"""Unit tests for the one.registration module."""
import json
import logging
import unittest
import unittest.mock
import string
import random
import datetime
import fnmatch
from io import StringIO
from pathlib import Path
from iblutil.util import Bunch
from requests.exceptions import HTTPError
from requests.models import Response
from one.api import ONE
from one import registration
import one.alf.exceptions as alferr
from one.alf.io import next_num_folder
from . import TEST_DB_1, OFFLINE_ONLY
from one.tests import util
[docs]
class TestDatasetTypes(unittest.TestCase):
"""Tests for dataset type validation."""
[docs]
def test_get_dataset_type(self):
"""Test one.registration.get_dataset_type function."""
dtypes = [
{'name': 'obj.attr', 'filename_pattern': ''},
{'name': 'foo.bar', 'filename_pattern': '*FOO.b?r*'},
{'name': 'bar.baz', 'filename_pattern': ''},
{'name': 'some_file', 'filename_pattern': 'some_file.*'}
]
dtypes = list(map(Bunch, dtypes))
filename_typename = (
('foo.bar.npy', 'foo.bar'), ('foo.bir.npy', 'foo.bar'),
('_ns_obj.attr_clock.extra.npy', 'obj.attr'),
('bar.baz.ext', 'bar.baz'), ('some_file.ext', 'some_file'))
for filename, dataname in filename_typename:
with self.subTest(filename=filename):
dtype = registration.get_dataset_type(filename, dtypes)
self.assertEqual(dtype.name, dataname)
self.assertRaises(ValueError, registration.get_dataset_type, 'no_match', dtypes)
# Add an ambiguous dataset type pattern
dtypes.append(Bunch({'name': 'foo.bor', 'filename_pattern': 'foo.bor*'}))
self.assertRaises(ValueError, registration.get_dataset_type, 'foo.bor', dtypes)
[docs]
@unittest.skipIf(OFFLINE_ONLY, 'online only test')
class TestRegistrationClient(unittest.TestCase):
"""Test class for RegistrationClient class."""
one = None
subject = None
temp_dir = None
tag = None
[docs]
@classmethod
def setUpClass(cls) -> None:
cls.temp_dir = util.set_up_env()
cls.one = ONE(**TEST_DB_1, cache_dir=cls.temp_dir.name)
cls.subject = ''.join(random.choices(string.ascii_letters, k=10))
cls.one.alyx.rest('subjects', 'create', data={'lab': 'mainenlab', 'nickname': cls.subject})
# Create a tag if doesn't already exist
tag = ''.join(random.choices(string.ascii_letters, k=5))
cls.tag = cls.one.alyx.rest('tags', 'create', data={'name': tag, 'protected': True})
# Create some files for this subject
session_path = cls.one.alyx.cache_dir / cls.subject / str(datetime.date.today()) / '001'
cls.session_path = session_path
for rel_path in cls.one.list_datasets():
filepath = session_path.joinpath(rel_path)
filepath.parent.mkdir(exist_ok=True, parents=True)
filepath.touch()
[docs]
def setUp(self) -> None:
self.client = registration.RegistrationClient(one=self.one)
[docs]
def test_water_administration(self):
"""Test for RegistrationClient.register_water_administration."""
record = self.client.register_water_administration(self.subject, 35.10000000235)
self.assertEqual(record['subject'], self.subject)
self.assertEqual(record['water_administered'], 35.1)
self.assertEqual(record['water_type'], 'Water')
self.assertEqual(record['user'], self.one.alyx.user)
# Create session to associate
d = {'subject': self.subject,
'procedures': ['Behavior training/tasks'],
'type': 'Base',
'users': [self.one.alyx.user],
}
ses = self.one.alyx.rest('sessions', 'create', data=d)
volume = random.random()
record = self.client.register_water_administration(self.subject, volume,
session=ses['url'])
self.assertEqual(record['subject'], self.subject)
self.assertEqual(record['session'], ses['url'][-36:])
# Check validations
with self.assertRaises(ValueError):
self.client.register_water_administration(self.subject, volume, session='NaN')
with self.assertRaises(ValueError):
self.client.register_water_administration(self.subject, .0)
with unittest.mock.patch.object(self.client.one, 'to_eid', return_value=None), \
self.assertRaises(ValueError):
self.client.register_water_administration(self.subject, 3.6, session=ses['url'])
[docs]
def test_register_weight(self):
"""Test for RegistrationClient.register_weight."""
record = self.client.register_weight(self.subject, 35.10000000235)
self.assertEqual(record['subject'], self.subject)
self.assertEqual(record['weight'], 35.1)
self.assertEqual(record['user'], self.one.alyx.user)
# Check validations
with self.assertRaises(ValueError):
self.client.register_weight(self.subject, 0.0)
[docs]
def test_ensure_ISO8601(self):
"""Test for RegistrationClient.ensure_ISO8601."""
date = datetime.datetime(2021, 7, 14, 15, 53, 15, 525119)
self.assertEqual(self.client.ensure_ISO8601(date), '2021-07-14T15:53:15.525119')
self.assertEqual(self.client.ensure_ISO8601(date.date()), '2021-07-14T00:00:00')
date_str = '2021-07-14T15:53:15.525119'
self.assertEqual(self.client.ensure_ISO8601(date_str), date_str)
with self.assertRaises(ValueError):
self.client.ensure_ISO8601(f'{date:%D}')
[docs]
def test_exists(self):
"""Test for RegistrationClient.assert_exists."""
# Check user endpoint
with self.assertRaises(alferr.AlyxSubjectNotFound):
self.client.assert_exists('foobar', 'subjects')
self.client.assert_exists(self.subject, 'subjects')
# Check user endpoint with list
with self.assertRaises(alferr.ALFError) as ex:
self.client.assert_exists([self.one.alyx.user, 'foobar'], 'users')
self.assertIn('foobar', str(ex.exception))
# Check raises non-404
err = HTTPError()
err.response = Bunch({'status_code': 500})
with unittest.mock.patch.object(self.one.alyx, 'get', side_effect=err), \
self.assertRaises(HTTPError):
self.client.assert_exists('foobar', 'subjects')
[docs]
def test_find_files(self):
"""Test for RegistrationClient.find_files."""
# Remove a dataset type from the client to check that the dataset(s) are ignored
existing = (x['filename_pattern'] and any(self.session_path.rglob(x['filename_pattern']))
for x in self.client.dtypes)
removed = self.client.dtypes.pop(next(i for i, x in enumerate(existing) if x))
files = list(self.client.find_files(self.session_path))
self.assertEqual(6, len(files))
self.assertTrue(all(map(Path.is_file, files)))
# Check removed file pattern not in file list
self.assertFalse(fnmatch.filter([x.name for x in files], removed['filename_pattern']))
[docs]
def test_create_new_session(self):
"""Test for RegistrationClient.create_new_session."""
# Check register = True
session_path, eid = self.client.create_new_session(
self.subject, date='2020-01-01', projects='ibl_neuropixel_brainwide_01'
)
expected = self.one.alyx.cache_dir.joinpath(self.subject, '2020-01-01', '001').as_posix()
self.assertEqual(session_path.as_posix(), expected)
self.assertIsNotNone(eid)
# Check projects added to Alyx session
projects = self.one.get_details(eid, full=True)['projects']
self.assertEqual(projects, ['ibl_neuropixel_brainwide_01'])
# Check register = False
session_path, eid = self.client.create_new_session(
self.subject, date='2020-01-01', register=False)
expected = self.one.alyx.cache_dir.joinpath(self.subject, '2020-01-01', '002').as_posix()
self.assertEqual(session_path.as_posix(), expected)
self.assertIsNone(eid)
[docs]
def test_register_session(self):
"""Test for RegistrationClient.register_session."""
# Find some datasets to create
datasets = self.one.list_datasets(self.one.search(dataset='raw')[0])
session_path = self.one.alyx.cache_dir.joinpath(
'mainenlab', 'Subjects', self.subject, '2020-01-01', '001'
)
# Ensure session exists
file_list = [session_path.joinpath(x) for x in datasets]
# Create the files before registering
for x in file_list:
x.parent.mkdir(exist_ok=True, parents=True)
x.touch()
ses, recs = self.client.register_session(str(session_path),
end_time='2020-01-02',
procedures='Behavior training/tasks')
self.assertTrue(len(ses['data_dataset_session_related']))
self.assertEqual(len(ses['data_dataset_session_related']), len(recs))
self.assertTrue(isinstance(ses['procedures'], list) and len(ses['procedures']) == 1)
self.assertEqual(ses['end_time'], '2020-01-02T00:00:00')
# Check value error raised when provided lab name doesn't match
with self.assertRaises(ValueError):
self.client.register_session(str(session_path), lab='cortexlab')
# Check updating existing session
start_time = '2020-01-01T10:36:10'
ses, _ = self.client.register_session(str(session_path), start_time=start_time)
self.assertEqual(start_time, ses['start_time'])
[docs]
def test_create_sessions(self):
"""Test for RegistrationClient.create_sessions."""
session_path = self.session_path.parent / next_num_folder(self.session_path.parent)
session_path.mkdir(parents=True)
session_path.joinpath('create_me.flag').touch()
# Should print session path in dry mode
with unittest.mock.patch('sys.stdout', new_callable=StringIO) as stdout:
session_paths, ses = self.client.create_sessions(self.one.alyx.cache_dir, dry=True)
self.assertTrue(str(session_path) in stdout.getvalue())
self.assertTrue(len(ses) == 1 and ses[0] is None)
self.assertTrue(session_path.joinpath('create_me.flag').exists())
# Should find and register session
session_paths, ses = self.client.create_sessions(self.one.alyx.cache_dir)
self.assertTrue(len(ses) == 1 and len(session_paths) == 1)
self.assertFalse(session_path.joinpath('create_me.flag').exists())
self.assertEqual(ses[0]['number'], int(session_path.parts[-1]))
self.assertEqual(session_paths[0], session_path)
[docs]
def test_prepare_files(self):
"""Test for RegistrationClient.prepare_files method."""
session_path = self.session_path.parent / next_num_folder(self.session_path.parent)
session_path_2 = session_path.parent / next_num_folder(session_path)
file_list = [session_path.joinpath('wheel.position.npy'),
session_path.joinpath('wheel.timestamps.npy'),
session_path_2.joinpath('wheel.position.npy')]
# Test with file list and version is None
F, V, _, _ = self.client.prepare_files(file_list)
self.assertTrue(len(F), 2)
self.assertListEqual(sorted(list(F.keys())), sorted([session_path, session_path_2]))
for sess, n in zip([session_path, session_path_2], [2, 1]):
self.assertTrue(len(F[sess]), n)
self.assertTrue(len(V[sess]), n)
self.assertIsNone(V[session_path][0])
# Test with specifying version
versions = ['1.2.2', 'v1.2', '1.3.4']
_, V, _, _ = self.client.prepare_files(file_list, versions=versions)
self.assertListEqual(V[session_path], versions[:-1])
self.assertListEqual(V[session_path_2], [versions[-1]])
[docs]
def test_check_protected(self):
"""Test for RegistrationClient.check_protected_files method."""
session_path, eid = self.client.create_new_session(self.subject)
file_name = session_path.joinpath('wheel.timestamps.npy')
file_name.touch()
# register a dataset
rec = self.client.register_files(str(file_name))
# Check if it is protected, it shouldn't be, response 200
protected = self.client.check_protected_files(str(file_name))
self.assertEqual(protected['status_code'], 200)
# Add a protected tag to all the datasets
tag = self.tag['name']
self.one.alyx.rest('datasets', 'partial_update', id=rec['id'], data={'tags': [tag]})
# check protected
protected = self.client.check_protected_files(str(file_name))
self.assertEqual(protected['status_code'], 403)
self.assertEqual(protected['error'], 'One or more datasets is protected')
[docs]
def test_register_files(self):
"""Test for RegistrationClient.register_files."""
# Test a few things not checked in register_session
session_path, eid = self.client.create_new_session(self.subject)
# Check registering single file, dry run, default False
file_name = session_path.joinpath('wheel.position.npy')
file_name.touch()
labs = 'mainenlab,cortexlab'
rec = self.client.register_files(str(file_name), default=False, dry=True, labs=labs)
self.assertIsInstance(rec, dict)
self.assertFalse(rec['default'])
self.assertNotIn('id', rec)
self.assertEqual(labs, rec['labs'], 'failed to include optional kwargs in request')
# Add ambiguous dataset type to types list
self.client.dtypes.append(self.client.dtypes[-1].copy())
self.client.dtypes[-1]['name'] += '1'
# Try registering ambiguous / invalid datasets
ambiguous = self.client.dtypes[-1]['filename_pattern'].replace('*', 'npy')
files = [session_path.joinpath('wheel.position.xxx'), # Unknown ext
session_path.joinpath('foo.bar.npy'), # Unknown dtype
session_path.joinpath(ambiguous), # Ambiguous dtype
session_path.with_name('foo').joinpath('spikes.times.npy') # Invalid session
]
version = ['1.2.9'] * len(files)
with self.assertLogs('one.registration', logging.DEBUG) as dbg:
rec = self.client.register_files(files, versions=version)
self.assertIn('wheel.position.xxx: No matching extension', dbg.records[0].message)
self.assertRegex(dbg.records[1].message, 'No dataset type .* "foo.bar.npy"')
self.assertRegex(dbg.records[2].message, f'Multiple matching .* "{ambiguous}"')
self.assertEqual([None] * len(files), rec)
# Check the handling of revisions
rec = self.client.register_files(str(file_name))
# Add a protected tag to all the datasets
tag = self.tag['name']
self.one.alyx.rest('datasets', 'partial_update', id=rec['id'], data={'tags': [tag]})
# Test registering with a revision already in the file path,
# should use this rather than create one with today's date
rev = self.one.alyx.rest('revisions', 'create', data={'name': f'{tag}1'})
self.addCleanup(self.one.alyx.rest, 'revisions', 'delete', id=rev['name'])
file = file_name.parent.joinpath(f'#{rev["name"]}#', file_name.name)
file.parent.mkdir(), file.touch() # Create file in new revision folder
r, = self.client.register_files(file_list=[file])
self.assertEqual(r['revision'], rev['name'])
self.assertTrue(r['default'])
self.assertIsNone(r['collection'])
# Register exact dataset revision again - it should append an 'a'
# When we re-register the original it should move them into revision with today's date
self.one.alyx.rest('datasets', 'partial_update', id=r['id'], data={'tags': [tag]})
files = [file,
session_path.joinpath('wheel.position.npy'),
session_path.joinpath('wheel.position.ssv')] # Last dataset unprotected
files[-1].touch()
r1, r2, r3 = self.client.register_files(file_list=files)
self.assertEqual(r1['revision'], rev['name'] + 'a')
self.assertTrue(file.parents[1].joinpath(f'#{r1["revision"]}#', file.name).exists())
self.assertFalse(file.exists(), 'failed to move protected dataset to new revision')
self.assertEqual(r2['revision'], str(datetime.date.today()))
self.assertFalse(files[1].exists(), 'failed to move protected dataset to new revision')
file = files[1].parent.joinpath(f'#{r2["revision"]}#', file.name)
self.assertTrue(file.exists())
self.assertIsNone(r3['revision'])
self.assertTrue(files[2].exists())
# Protect the latest datasets
self.one.alyx.rest('datasets', 'partial_update', id=r2['id'], data={'tags': [tag]})
# Same day revision
today = datetime.date.today()
r, = self.client.register_files(file_list=[file])
self.assertEqual(r['revision'], f'{today}a')
self.assertTrue(file.parents[1].joinpath(f'#{r["revision"]}#', file.name).exists())
self.assertFalse(file.exists(), 'failed to move protected dataset to new revision')
# Re-register a protected date; given original and revision a are protected
# we expect to create revision b
file = session_path.joinpath(f'#{today}#', file_name.name)
file.touch()
self.one.alyx.rest('datasets', 'partial_update', id=r['id'], data={'tags': [tag]})
r, = self.client.register_files(file_list=[file])
self.assertEqual(r['revision'], str(datetime.date.today()) + 'b')
# For this logic it's simpler to spoof the response from Alyx
with unittest.mock.patch.object(self.one.alyx, 'post') as alyx_mock:
mock_resp = Response()
mock_resp.status_code = 403
response = {'status_code': 403, 'error': 'One or more datasets is protected',
'details': [
# Dataset protected but latest (blank) revision is unprotected
{'wheel.position.ssv': [
{'': False},
{'puHIt1': True}]},
# Dataset protected but latest (date) revision is unprotected
{'wheel.position.npy': [
{f'{today}': False},
{'puHIt1': True}]},
# Dataset protected and latest (date) revision is today
{'wheel.timestamps.npy': [
{str(today): True},
{'puHIt1': True}]},
# Dataset protected and latest (date) revision was updated twice
{'wheel.timestamps.ssv': [
{f'{today}a': True},
{str(today): True}]},
# Dataset protected but latest (misc) revision not protected
{'wheel.velocity.ssv': [
{'puHIt1': False},
{'foo': True}]},
# Dataset protected and latest (date) revision end with a different
# character
{'wheel.velocity.npy': [
{f'{today}A': True},
{str(today): True}]},
]}
mock_resp._content = bytes(json.dumps(response), 'utf-8')
alyx_mock.side_effect = [HTTPError(response=mock_resp), [{}]]
files = [session_path.joinpath(next(iter(x.keys()))) for x in response['details']]
[x.touch() for x in files]
self.client.register_files(file_list=files)
self.assertEqual(2, alyx_mock.call_count)
actual = alyx_mock.call_args.kwargs.get('data', {}).get('filenames', [])
expected = [
'wheel.position.ssv',
f'#{today}#/wheel.position.npy',
f'#{today}a#/wheel.timestamps.npy',
f'#{today}b#/wheel.timestamps.ssv',
f'#{today}#/wheel.velocity.ssv',
f'#{today}B#/wheel.velocity.npy']
self.assertEqual(expected, actual)
# Finally, simply check that the exception is re-raised for non-revision related errors
# NB Unfortunately this passes even if the exception isn't re-raised after being caught
response['status_code'] = 404
mock_resp._content = bytes(json.dumps(response), 'utf-8')
alyx_mock.side_effect = HTTPError(response=mock_resp)
self.assertRaises(HTTPError, self.client.register_files, file_list=[file])
[docs]
def test_next_revision(self):
"""Test RegistrationClient._next_revision method."""
self.assertEqual('2020-01-01a', self.client._next_revision('2020-01-01'))
reserved = ['2020-01-01a', '2020-01-01b']
self.assertEqual('2020-01-01c', self.client._next_revision('2020-01-01', reserved))
new_revision = self.client._next_revision('2020-01-01', alpha=chr(945))
self.assertEqual(945, ord(new_revision[-1]))
self.assertRaises(TypeError, self.client._next_revision, '2020-01-01', alpha='do')
[docs]
def test_instantiation(self):
"""Test RegistrationClient.__init__ with no args."""
with unittest.mock.patch('one.registration.ONE') as mk:
client = registration.RegistrationClient()
self.assertIsInstance(client.one, unittest.mock.MagicMock)
mk.assert_called_with(cache_rest=None)
[docs]
@classmethod
def tearDownClass(cls) -> None:
for admin in cls.one.alyx.rest('water-administrations', 'list',
django='subject__nickname,' + cls.subject, no_cache=True):
cls.one.alyx.rest('water-administrations', 'delete', id=admin['url'][-36:])
# Note: datasets deleted in cascade
for ses in cls.one.alyx.rest('sessions', 'list', subject=cls.subject, no_cache=True):
cls.one.alyx.rest('sessions', 'delete', id=ses['url'][-36:])
cls.one.alyx.rest('subjects', 'delete', id=cls.subject)
cls.one.alyx.rest('tags', 'delete', id=cls.tag['id'])
cls.temp_dir.cleanup()
if __name__ == '__main__':
unittest.main(exit=False)