import unittest
from unittest import mock
import tempfile
from pathlib import PurePosixPath, Path
import json
import datetime
import random
import string
import uuid
from itertools import chain
from requests import HTTPError
import numpy as np
from one.api import ONE
from one.webclient import AlyxClient
import one.alf.exceptions as alferr
import iblutil.io.params as iopar
from ibllib.oneibl import patcher, registration
import ibllib.io.extractors.base
from ibllib.tests import TEST_DB
from ibllib.io import session_params
[docs]
class TestUtils(unittest.TestCase):
"""Test helper functions in ibllib.oneibl.registration module."""
[docs]
def test_get_lab(self):
"""Test ibllib.oneibl.registration.get_lab function."""
alyx = AlyxClient(**TEST_DB)
session_path = Path.home().joinpath('foo', '2020-01-01', '001')
with mock.patch.object(alyx, 'rest', return_value=[{'lab': 'bar'}]):
self.assertEqual('bar', registration.get_lab(session_path, alyx))
# Should validate and raise error when session path invalid
self.assertRaises(ValueError, registration.get_lab, 'invalid/path', alyx)
with mock.patch.object(alyx, 'rest', return_value=[]):
self.assertRaises(alferr.AlyxSubjectNotFound, registration.get_lab, session_path, alyx)
# Should find intersection based on labs with endpoint ID
subjects = [{'lab': 'bar'}, {'lab': 'baz'}]
data_repo_labs = [{'name': 'baz'}, {'name': 'foobar'}]
with mock.patch.object(alyx, 'rest', side_effect=[subjects, data_repo_labs]), \
mock.patch('one.remote.globus.get_local_endpoint_id'):
self.assertEqual('baz', registration.get_lab(session_path, alyx))
[docs]
class TestFTPPatcher(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.one = ONE(**TEST_DB)
[docs]
def reset_params(self):
"""Remove the FTP parameters from the AlyxClient"""
par = iopar.as_dict(self.one.alyx._par)
self.one.alyx._par = iopar.from_dict({k: v for k, v in par.items()
if not k.startswith('FTP')})
[docs]
@mock.patch('ftplib.FTP_TLS')
def test_setup(self, _):
self.reset_params()
# Test silent setup (one instance is in silent mode)
patcher.FTPPatcher(one=self.one)
keys = ('FTP_DATA_SERVER', 'FTP_DATA_SERVER_LOGIN', 'FTP_DATA_SERVER_PWD')
self.assertTrue(all(k in self.one.alyx._par.as_dict() for k in keys))
# Silent mode off
self.reset_params()
self.one.alyx.silent = False
with mock.patch('builtins.input', new=self.mock_input), \
mock.patch('ibllib.oneibl.patcher.getpass', return_value='foobar'):
patcher.FTPPatcher(one=self.one)
self.assertEqual(self.one.alyx._par.FTP_DATA_SERVER_LOGIN, 'usr')
self.assertEqual(self.one.alyx._par.FTP_DATA_SERVER_PWD, 'foobar')
[docs]
class TestGlobusPatcher(unittest.TestCase):
"""Tests for the ibllib.oneibl.patcher.GlobusPatcher class."""
globus_sdk_mock = None
"""unittest.mock._patch: Mock object for globus_sdk package."""
[docs]
@mock.patch('one.remote.globus._setup')
def setUp(self, _) -> None:
# Create a temp dir for writing datasets to
self.tempdir = tempfile.TemporaryDirectory()
# The github CI root dir contains an alias/symlink so we must resolve it
self.root_path = Path(self.tempdir.name).resolve()
self.addCleanup(self.tempdir.cleanup)
# Mock the Globus setup process so the parameters aren't overwritten
self.pars = iopar.from_dict({
'GLOBUS_CLIENT_ID': '123',
'refresh_token': '456',
'local_endpoint': str(uuid.uuid1()),
'local_path': str(self.root_path),
'access_token': 'abc',
'expires_at_seconds': datetime.datetime.now().timestamp() + 60**2
})
# Mock the globus SDK so that no actual tasks are submitted
self.globus_sdk_mock = mock.patch('one.remote.globus.globus_sdk')
self.globus_sdk_mock.start()
self.addCleanup(self.globus_sdk_mock.stop)
self.one = ONE(**TEST_DB)
with mock.patch('one.remote.globus.load_client_params', return_value=self.pars):
self.globus_patcher = patcher.GlobusPatcher(one=self.one)
[docs]
def test_patch_datasets(self):
"""Tests for GlobusPatcher.patch_datasets and GlobusPatcher.launch_transfers methods."""
# Create a couple of datasets to patch
file_list = ['ZFM-01935/2021-02-05/001/alf/_ibl_wheelMoves.intervals.npy',
'ZM_1743/2019-06-14/001/alf/_ibl_wheel.position.npy']
dids = ['80fabd30-9dc8-4778-b349-d175af63e1bd', 'fede964f-55cd-4267-95e0-327454e68afb']
# These exist on the test database, so get their info in order to mock registration response
for r in (responses := self.one.alyx.rest('datasets', 'list', django=f'pk__in,{dids}')):
r['id'] = r['url'].split('/')[-1]
assert len(responses) == 2, f'one or both datasets {dids} not on database'
# Create the files on disk
for file in (file_list := list(map(self.root_path.joinpath, file_list))):
file.parent.mkdir(exist_ok=True, parents=True)
file.touch()
# Mock the post method of AlyxClient and assert that it was called during registration
with mock.patch.object(self.one.alyx, 'post') as rest_mock:
rest_mock.side_effect = responses
self.globus_patcher.patch_datasets(file_list)
self.assertEqual(rest_mock.call_count, 2)
for call, file in zip(rest_mock.call_args_list, file_list):
self.assertEqual(call.args[0], '/register-file')
path = file.relative_to(self.root_path).as_posix()
self.assertTrue(path.startswith(call.kwargs['data']['path']))
self.assertTrue(path.endswith(call.kwargs['data']['filenames'][0]))
# Check whether the globus transfers were updated
self.assertIsNotNone(self.globus_patcher.globus_transfer)
transfer_data = self.globus_patcher.globus_transfer['DATA']
self.assertEqual(len(transfer_data), len(file_list))
for data, file, did in zip(transfer_data, file_list, dids):
path = file.relative_to(self.root_path).as_posix()
self.assertTrue(data['source_path'].endswith(path))
self.assertIn(did, data['destination_path'], 'failed to add UUID to destination file name')
# Check added local server transfers
self.assertTrue(len(self.globus_patcher.globus_transfers_locals))
transfer_data = list(chain(*[x['DATA'] for x in self.globus_patcher.globus_transfers_locals.values()]))
for data, file, did in zip(transfer_data, file_list, dids):
path = file.relative_to(self.root_path).as_posix()
self.assertEqual(data['destination_path'], '/mnt/s0/Data/Subjects/' + path)
self.assertIn(did, data['source_path'], 'failed to add UUID to source file name')
# Check behaviour when tasks submitted
self.globus_patcher.client.get_task.return_value = {'completion_time': 0, 'fatal_error': None}
self.globus_patcher.launch_transfers(local_servers=True)
self.globus_patcher.client.submit_transfer.assert_called()
[docs]
class TestAlyx2Path(unittest.TestCase):
dset = {
'url': 'https://alyx.internationalbrainlab.org/'
'datasets/00059298-1b33-429c-a802-fa51bb662d72',
'name': 'channels.localCoordinates.npy',
'collection': 'alf/probe00',
'session': ('https://alyx.internationalbrainlab.org/'
'sessions/7cffad38-0f22-4546-92b5-fd6d2e8b2be9'),
'file_records': [
{'id': 'c9ae1b6e-03a6-41c9-9e1b-4a7f9b5cfdbf', 'data_repository': 'ibl_floferlab_SR',
'data_repository_path': '/mnt/s0/Data/Subjects/',
'relative_path': 'SWC_014/2019-12-11/001/alf/probe00/channels.localCoordinates.npy',
'data_url': None, 'exists': True},
{'id': 'f434a638-bc61-4695-884e-70fd1e521d60', 'data_repository': 'flatiron_hoferlab',
'data_repository_path': '/hoferlab/Subjects/',
'relative_path': 'SWC_014/2019-12-11/001/alf/probe00/channels.localCoordinates.npy',
'data_url': (
'https://ibl.flatironinstitute.org/hoferlab/Subjects/SWC_014/2019-12-11/001/'
'alf/probe00/channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy'),
'exists': True}],
}
[docs]
def test_dsets_2_path(self):
self.assertEqual(len(patcher.globus_path_from_dataset([self.dset] * 3)), 3)
sdsc_path = ('/mnt/ibl/hoferlab/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy')
globus_path_sdsc = ('/hoferlab/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy')
globus_path_sr = ('/mnt/s0/Data/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.npy')
# Test sdsc_path_from_dataset
testable = patcher.sdsc_path_from_dataset(self.dset)
self.assertEqual(str(testable), sdsc_path)
self.assertIsInstance(testable, PurePosixPath)
# Test sdsc_globus_path_from_dataset
testable = patcher.sdsc_globus_path_from_dataset(self.dset)
self.assertEqual(str(testable), globus_path_sdsc)
self.assertIsInstance(testable, PurePosixPath)
# Test globus_path_from_dataset
testable = patcher.globus_path_from_dataset(self.dset, repository='ibl_floferlab_SR')
self.assertEqual(str(testable), globus_path_sr)
self.assertIsInstance(testable, PurePosixPath)
[docs]
def get_mock_session_settings(subject='clns0730', user='test_user'):
"""Create a basic session settings file for testing."""
return {
'SESSION_DATE': '2018-04-01',
'SESSION_DATETIME': '2018-04-01T12:48:26.795526',
'PYBPOD_CREATOR': [user, 'f092c2d5-c98a-45a1-be7c-df05f129a93c', 'local'],
'SESSION_NUMBER': '002',
'SUBJECT_NAME': subject,
'PYBPOD_BOARD': '_iblrig_mainenlab_behavior_1',
'PYBPOD_PROTOCOL': '_iblrig_tasks_ephysChoiceWorld',
'IBLRIG_VERSION': '5.4.1',
'SUBJECT_WEIGHT': 22,
}
[docs]
class TestRegistrationEndpoint(unittest.TestCase):
[docs]
class TestRegistration(unittest.TestCase):
subject = ''
"""str: The name of the subject under which to create sessions."""
one = None
"""one.api.OneAlyx: An instance of ONE connected to a test database."""
[docs]
@classmethod
def setUpClass(cls):
"""Create a random new subject."""
cls.one = ONE(**TEST_DB, cache_rest=None)
cls.subject = ''.join(random.choices(string.ascii_letters, k=10))
cls.one.alyx.rest('subjects', 'create', data={'lab': 'mainenlab', 'nickname': cls.subject})
[docs]
def setUp(self) -> None:
self.settings = get_mock_session_settings(self.subject)
self.session_dict = {
'subject': self.subject,
'start_time': '2018-04-01T12:48:26.795526',
'number': 2,
'users': [self.settings['PYBPOD_CREATOR'][0]]
}
# makes sure tests start without session created
self.td = tempfile.TemporaryDirectory()
self.session_path = Path(self.td.name).joinpath(self.subject, '2018-04-01', '002')
self.alf_path = self.session_path.joinpath('alf')
self.alf_path.mkdir(parents=True)
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
self.revision = ''.join(random.choices(string.ascii_letters, k=3))
self.rev_path = self.alf_path.joinpath(f'#{self.revision}#')
self.rev_path.mkdir(parents=True)
np.save(self.rev_path.joinpath('spikes.times.npy'), np.random.random(300))
np.save(self.rev_path.joinpath('spikes.amps.npy'), np.random.random(300))
self.today_revision = datetime.datetime.today().strftime('%Y-%m-%d')
# Create a revision if doesn't already exist
try:
self.rev = self.one.alyx.rest('revisions', 'read', id=self.revision)
except HTTPError:
self.rev = self.one.alyx.rest('revisions', 'create', data={'name': self.revision})
# Create a new tag
tag_name = 'test_tag_' + ''.join(random.choices(string.ascii_letters, k=5))
tag_data = {'name': tag_name, 'protected': True}
self.tag = self.one.alyx.rest('tags', 'create', data=tag_data)
[docs]
def test_registration_datasets(self):
# registers a single file
ses = self.one.alyx.rest('sessions', 'create', data=self.session_dict)
st_file = self.alf_path.joinpath('spikes.times.npy')
registration.register_dataset(file_list=st_file, one=self.one)
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
self.assertTrue(len(dsets) == 1)
# registers a list of files
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
self.assertTrue(len(dsets) == 2)
self.assertTrue(all(not d['revision'] for d in r))
self.assertTrue(all(d['default'] for d in r))
self.assertTrue(all(d['collection'] == 'alf' for d in r))
# simulate all the datasets exists, re-register and asserts that exists is set to True
# as the files haven't changed
frs = self.one.alyx.rest('files', 'list', django=f"dataset__session,{ses['url'][-36:]}")
for fr in frs:
self.one.alyx.rest('files', 'partial_update',
id=fr['url'][-36:], data={'exists': True})
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(all(fr['exists'] for fr in rr['file_records']) for rr in r))
# now that files have changed, makes sure the exists flags are set to False
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(all(not fr['exists'] for fr in rr['file_records']) for rr in r))
# Add a protected tag to all the datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Test registering with a revision already in the file path, should use this rather than create one with today's date
flist = list(self.rev_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(d['revision'] == self.revision for d in r))
self.assertTrue(all(d['default'] for d in r))
self.assertTrue(all(d['collection'] == 'alf' for d in r))
# Add a protected tag to all the datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Register again with revision in file path, it should register to self.revision + a
flist = list(self.rev_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(d['revision'] == f'{self.revision}a' for d in r))
self.assertTrue(self.alf_path.joinpath(f'#{self.revision}a#', 'spikes.times.npy').exists())
self.assertTrue(self.alf_path.joinpath(f'#{self.revision}a#', 'spikes.amps.npy').exists())
self.assertFalse(self.alf_path.joinpath(f'#{self.revision}#', 'spikes.times.npy').exists())
self.assertFalse(self.alf_path.joinpath(f'#{self.revision}#', 'spikes.amps.npy').exists())
# When we re-register the original it should move them into revision with today's date
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(d['revision'] == self.today_revision for d in r))
self.assertTrue(self.alf_path.joinpath(f'#{self.today_revision}#', 'spikes.times.npy').exists())
self.assertTrue(self.alf_path.joinpath(f'#{self.today_revision}#', 'spikes.amps.npy').exists())
self.assertFalse(self.alf_path.joinpath('spikes.times.npy').exists())
self.assertFalse(self.alf_path.joinpath('spikes.amps.npy').exists())
# Protect the latest datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:], no_cache=True)
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Same day revision
# Need to remake the original files
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(d['revision'] == self.today_revision + 'a' for d in r))
def _write_settings_file(self):
behavior_path = self.session_path.joinpath('raw_behavior_data')
behavior_path.mkdir()
settings_file = behavior_path.joinpath('_iblrig_taskSettings.raw.json')
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
return settings_file
[docs]
def test_create_sessions(self):
flag_file = self.session_path.joinpath('create_me.flag')
flag_file.touch()
rc = registration.IBLRegistrationClient(one=self.one)
# Should raise an error if settings file does not exist
self.assertRaises(ValueError, rc.create_sessions, self.session_path)
self._write_settings_file()
# Test dry
sessions, records = rc.create_sessions(self.session_path, dry=True)
self.assertEqual(1, len(sessions))
self.assertEqual(sessions[0], self.session_path)
self.assertIsNone(records[0])
self.assertTrue(flag_file.exists())
# Test not dry
sessions, records = rc.create_sessions(self.session_path, dry=False)
self.assertEqual(1, len(sessions))
self.assertEqual(sessions[0], self.session_path)
self.assertFalse(flag_file.exists())
[docs]
def test_registration_session(self):
settings_file = self._write_settings_file()
rc = registration.IBLRegistrationClient(one=self.one)
rc.register_session(str(self.session_path))
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
datasets = self.one.alyx.rest('datasets', 'list', session=eid)
for ds in datasets:
self.assertTrue(ds['hash'] is not None)
self.assertTrue(ds['file_size'] is not None)
self.assertTrue(ds['version'] == ibllib.__version__)
# checks the procedure of the session
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == ['Ephys recording with acute probe(s)'])
self.one.alyx.rest('sessions', 'delete', id=eid)
# re-register the session as behaviour this time
self.settings['PYBPOD_PROTOCOL'] = '_iblrig_tasks_trainingChoiceWorld6.3.1'
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
rc.register_session(self.session_path)
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == ['Behavior training/tasks'])
self.one.alyx.rest('sessions', 'delete', id=eid)
# re-register the session as unknown protocol this time
self.settings['PYBPOD_PROTOCOL'] = 'gnagnagna'
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
rc.register_session(self.session_path)
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == [])
self.one.alyx.rest('sessions', 'delete', id=eid)
[docs]
def test_register_chained_session(self):
"""Tests for registering a session with chained (multiple) protocols"""
behaviour_paths = [self.session_path.joinpath(f'raw_task_data_{i:02}') for i in range(2)]
for p in behaviour_paths:
p.mkdir()
# Set the collections
params_path = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml')
experiment_description = session_params.read_params(params_path)
assert experiment_description
collections = map(lambda x: next(iter(x.values())), experiment_description['tasks'])
for collection, d in zip(map(lambda x: x.parts[-1], behaviour_paths), collections):
d['collection'] = collection
# Save experiment description
session_params.write_params(self.session_path, experiment_description)
with open(behaviour_paths[1].joinpath('_iblrig_taskSettings.raw.json'), 'w') as fid:
json.dump(self.settings, fid)
settings = self.settings.copy()
settings['PYBPOD_PROTOCOL'] = '_iblrig_tasks_passiveChoiceWorld'
start_time = (datetime.datetime.fromisoformat(settings['SESSION_DATETIME']) -
datetime.timedelta(hours=1, minutes=2, seconds=12))
settings['SESSION_DATETIME'] = start_time.isoformat()
with open(behaviour_paths[0].joinpath('_iblrig_taskSettings.raw.json'), 'w') as fid:
json.dump(settings, fid)
rc = registration.IBLRegistrationClient(one=self.one)
session, recs = rc.register_session(self.session_path)
ses_info = self.one.alyx.rest('sessions', 'read', id=session['id'])
self.assertCountEqual(experiment_description['procedures'], ses_info['procedures'])
self.assertCountEqual(experiment_description['projects'], ses_info['projects'])
self.assertCountEqual({'IS_MOCK': False, 'IBLRIG_VERSION': None}, ses_info['json'])
self.assertEqual('2018-04-01T11:46:14.795526', ses_info['start_time'])
# Test task protocol
expected = '_iblrig_tasks_passiveChoiceWorld5.4.1/_iblrig_tasks_ephysChoiceWorld5.4.1'
self.assertEqual(expected, ses_info['task_protocol'])
# Test weightings created on Alyx
w = self.one.alyx.rest('subjects', 'read', id=self.subject)['weighings']
self.assertEqual(2, len(w))
self.assertCountEqual({22.}, {x['weight'] for x in w})
weight_dates = {x['date_time'] for x in w}
self.assertEqual(2, len(weight_dates))
self.assertIn(ses_info['start_time'], weight_dates)
[docs]
def tearDown(self) -> None:
self.td.cleanup()
self.one.alyx.rest('revisions', 'delete', id=self.rev['name'])
self.one.alyx.rest('tags', 'delete', id=self.tag['id'])
today_revision = self.one.alyx.rest('revisions', 'list', id=self.today_revision)
today_rev = [rev for rev in today_revision if self.today_revision in rev['name']]
for rev in today_rev:
self.one.alyx.rest('revisions', 'delete', id=rev['name'])
v1_rev = [rev for rev in today_revision if self.revision in rev['name']]
for rev in v1_rev:
self.one.alyx.rest('revisions', 'delete', id=rev['name'])
# Delete weighings
for w in self.one.alyx.rest('subjects', 'read', id=self.subject)['weighings']:
self.one.alyx.rest('weighings', 'delete', id=w['url'].split('/')[-1])
# Delete sessions
eid = self.one.search(subject=self.subject, date_range='2018-04-01', query_type='remote')
for ei in eid:
self.one.alyx.rest('sessions', 'delete', id=ei)
[docs]
@classmethod
def tearDownClass(cls) -> None:
# Note: sessions and datasets deleted in cascade
cls.one.alyx.rest('subjects', 'delete', id=cls.subject)
if __name__ == '__main__':
unittest.main()