import unittest
from unittest import mock
import tempfile
from pathlib import PurePosixPath, Path
import json
import datetime
import random
import string
import uuid
from itertools import chain
from uuid import uuid4
from requests import HTTPError
import numpy as np
import pandas as pd
from one.api import ONE
from one.webclient import AlyxClient
import one.alf.exceptions as alferr
from one.util import QC_TYPE
import iblutil.io.params as iopar
from ibllib.oneibl import patcher, registration, data_handlers as handlers
import ibllib.io.extractors.base
from ibllib.pipes.behavior_tasks import ChoiceWorldTrialsBpod
from ibllib.tests import TEST_DB
from ibllib.io import session_params
[docs]
class TestUtils(unittest.TestCase):
"""Test helper functions in ibllib.oneibl.registration module."""
[docs]
def test_get_lab(self):
"""Test ibllib.oneibl.registration.get_lab function."""
alyx = AlyxClient(**TEST_DB)
session_path = Path.home().joinpath('foo', '2020-01-01', '001')
with mock.patch.object(alyx, 'rest', return_value=[{'lab': 'bar'}]):
self.assertEqual('bar', registration.get_lab(session_path, alyx))
# Should validate and raise error when session path invalid
self.assertRaises(ValueError, registration.get_lab, 'invalid/path', alyx)
with mock.patch.object(alyx, 'rest', return_value=[]):
self.assertRaises(alferr.AlyxSubjectNotFound, registration.get_lab, session_path, alyx)
# Should find intersection based on labs with endpoint ID
subjects = [{'lab': 'bar'}, {'lab': 'baz'}]
data_repo_labs = [{'name': 'baz'}, {'name': 'foobar'}]
with mock.patch.object(alyx, 'rest', side_effect=[subjects, data_repo_labs]), \
mock.patch('one.remote.globus.get_local_endpoint_id'):
self.assertEqual('baz', registration.get_lab(session_path, alyx))
[docs]
class TestFTPPatcher(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.one = ONE(**TEST_DB)
[docs]
def reset_params(self):
"""Remove the FTP parameters from the AlyxClient"""
par = iopar.as_dict(self.one.alyx._par)
self.one.alyx._par = iopar.from_dict({k: v for k, v in par.items()
if not k.startswith('FTP')})
[docs]
@mock.patch('ftplib.FTP_TLS')
def test_setup(self, _):
self.reset_params()
# Test silent setup (one instance is in silent mode)
patcher.FTPPatcher(one=self.one)
keys = ('FTP_DATA_SERVER', 'FTP_DATA_SERVER_LOGIN', 'FTP_DATA_SERVER_PWD')
self.assertTrue(all(k in self.one.alyx._par.as_dict() for k in keys))
# Silent mode off
self.reset_params()
self.one.alyx.silent = False
with mock.patch('builtins.input', new=self.mock_input), \
mock.patch('ibllib.oneibl.patcher.getpass', return_value='foobar'):
patcher.FTPPatcher(one=self.one)
self.assertEqual(self.one.alyx._par.FTP_DATA_SERVER_LOGIN, 'usr')
self.assertEqual(self.one.alyx._par.FTP_DATA_SERVER_PWD, 'foobar')
[docs]
class TestGlobusPatcher(unittest.TestCase):
"""Tests for the ibllib.oneibl.patcher.GlobusPatcher class."""
globus_sdk_mock = None
"""unittest.mock._patch: Mock object for globus_sdk package."""
[docs]
@mock.patch('one.remote.globus._setup')
def setUp(self, _) -> None:
# Create a temp dir for writing datasets to
self.tempdir = tempfile.TemporaryDirectory()
# The github CI root dir contains an alias/symlink so we must resolve it
self.root_path = Path(self.tempdir.name).resolve()
self.addCleanup(self.tempdir.cleanup)
# Mock the Globus setup process so the parameters aren't overwritten
self.pars = iopar.from_dict({
'GLOBUS_CLIENT_ID': '123',
'refresh_token': '456',
'local_endpoint': str(uuid.uuid1()),
'local_path': str(self.root_path),
'access_token': 'abc',
'expires_at_seconds': datetime.datetime.now().timestamp() + 60**2
})
# Mock the globus SDK so that no actual tasks are submitted
self.globus_sdk_mock = mock.patch('one.remote.globus.globus_sdk')
self.globus_sdk_mock.start()
self.addCleanup(self.globus_sdk_mock.stop)
self.one = ONE(**TEST_DB)
with mock.patch('one.remote.globus.load_client_params', return_value=self.pars):
self.globus_patcher = patcher.GlobusPatcher(one=self.one)
[docs]
def test_patch_datasets(self):
"""Tests for GlobusPatcher.patch_datasets and GlobusPatcher.launch_transfers methods."""
# Create a couple of datasets to patch
file_list = ['ZFM-01935/2021-02-05/001/alf/_ibl_wheelMoves.intervals.npy',
'ZM_1743/2019-06-14/001/alf/_ibl_wheel.position.npy']
dids = ['80fabd30-9dc8-4778-b349-d175af63e1bd', 'fede964f-55cd-4267-95e0-327454e68afb']
# These exist on the test database, so get their info in order to mock registration response
for r in (responses := self.one.alyx.rest('datasets', 'list', django=f'pk__in,{dids}')):
r['id'] = r['url'].split('/')[-1]
assert len(responses) == 2, f'one or both datasets {dids} not on database'
# Create the files on disk
for file in (file_list := list(map(self.root_path.joinpath, file_list))):
file.parent.mkdir(exist_ok=True, parents=True)
file.touch()
# Mock the post method of AlyxClient and assert that it was called during registration
with mock.patch.object(self.one.alyx, 'post') as rest_mock:
rest_mock.side_effect = [[r] for r in responses]
self.globus_patcher.patch_datasets(file_list)
self.assertEqual(rest_mock.call_count, 2)
for call, file in zip(rest_mock.call_args_list, file_list):
self.assertEqual(call.args[0], '/register-file')
path = file.relative_to(self.root_path).as_posix()
self.assertTrue(path.startswith(call.kwargs['data']['path']))
self.assertTrue(path.endswith(call.kwargs['data']['filenames'][0]))
# Check whether the globus transfers were updated
self.assertIsNotNone(self.globus_patcher.globus_transfer)
transfer_data = self.globus_patcher.globus_transfer['DATA']
self.assertEqual(len(transfer_data), len(file_list))
for data, file, did in zip(transfer_data, file_list, dids):
path = file.relative_to(self.root_path).as_posix()
self.assertTrue(data['source_path'].endswith(path))
self.assertIn(did, data['destination_path'], 'failed to add UUID to destination file name')
# Check added local server transfers
self.assertTrue(len(self.globus_patcher.globus_transfers_locals))
transfer_data = list(chain(*[x['DATA'] for x in self.globus_patcher.globus_transfers_locals.values()]))
for data, file, did in zip(transfer_data, file_list, dids):
path = file.relative_to(self.root_path).as_posix()
self.assertEqual(data['destination_path'], '/mnt/s0/Data/Subjects/' + path)
self.assertIn(did, data['source_path'], 'failed to add UUID to source file name')
# Check behaviour when tasks submitted
self.globus_patcher.client.get_task.return_value = {'completion_time': 0, 'fatal_error': None}
self.globus_patcher.launch_transfers(local_servers=True)
self.globus_patcher.client.submit_transfer.assert_called()
[docs]
class TestAlyx2Path(unittest.TestCase):
dset = {
'url': 'https://alyx.internationalbrainlab.org/'
'datasets/00059298-1b33-429c-a802-fa51bb662d72',
'name': 'channels.localCoordinates.npy',
'collection': 'alf/probe00',
'session': ('https://alyx.internationalbrainlab.org/'
'sessions/7cffad38-0f22-4546-92b5-fd6d2e8b2be9'),
'file_records': [
{'id': 'c9ae1b6e-03a6-41c9-9e1b-4a7f9b5cfdbf', 'data_repository': 'ibl_floferlab_SR',
'data_repository_path': '/mnt/s0/Data/Subjects/',
'relative_path': 'SWC_014/2019-12-11/001/alf/probe00/channels.localCoordinates.npy',
'data_url': None, 'exists': True},
{'id': 'f434a638-bc61-4695-884e-70fd1e521d60', 'data_repository': 'flatiron_hoferlab',
'data_repository_path': '/hoferlab/Subjects/',
'relative_path': 'SWC_014/2019-12-11/001/alf/probe00/channels.localCoordinates.npy',
'data_url': (
'https://ibl.flatironinstitute.org/hoferlab/Subjects/SWC_014/2019-12-11/001/'
'alf/probe00/channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy'),
'exists': True}],
}
[docs]
def test_dsets_2_path(self):
self.assertEqual(len(patcher.globus_path_from_dataset([self.dset] * 3)), 3)
sdsc_path = ('/mnt/ibl/hoferlab/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy')
globus_path_sdsc = ('/hoferlab/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.00059298-1b33-429c-a802-fa51bb662d72.npy')
globus_path_sr = ('/mnt/s0/Data/Subjects/SWC_014/2019-12-11/001/alf/probe00/'
'channels.localCoordinates.npy')
# Test sdsc_path_from_dataset
testable = patcher.sdsc_path_from_dataset(self.dset)
self.assertEqual(str(testable), sdsc_path)
self.assertIsInstance(testable, PurePosixPath)
# Test sdsc_globus_path_from_dataset
testable = patcher.sdsc_globus_path_from_dataset(self.dset)
self.assertEqual(str(testable), globus_path_sdsc)
self.assertIsInstance(testable, PurePosixPath)
# Test globus_path_from_dataset
testable = patcher.globus_path_from_dataset(self.dset, repository='ibl_floferlab_SR')
self.assertEqual(str(testable), globus_path_sr)
self.assertIsInstance(testable, PurePosixPath)
[docs]
def get_mock_session_settings(subject='clns0730', user='test_user'):
"""Create a basic session settings file for testing."""
return {
'SESSION_DATE': '2018-04-01',
'SESSION_DATETIME': '2018-04-01T12:48:26.795526',
'PYBPOD_CREATOR': [user, 'f092c2d5-c98a-45a1-be7c-df05f129a93c', 'local'],
'SESSION_NUMBER': '002',
'SUBJECT_NAME': subject,
'PYBPOD_BOARD': '_iblrig_mainenlab_behavior_1',
'PYBPOD_PROTOCOL': '_iblrig_tasks_ephysChoiceWorld',
'IBLRIG_VERSION': '5.4.1',
'SUBJECT_WEIGHT': 22,
}
[docs]
class TestRegistration(unittest.TestCase):
subject = ''
"""str: The name of the subject under which to create sessions."""
one = None
"""one.api.OneAlyx: An instance of ONE connected to a test database."""
[docs]
@classmethod
def setUpClass(cls):
"""Create a random new subject."""
cls.one = ONE(**TEST_DB, cache_rest=None)
cls.subject = ''.join(random.choices(string.ascii_letters, k=10))
cls.one.alyx.rest('subjects', 'create', data={'lab': 'mainenlab', 'nickname': cls.subject})
[docs]
def setUp(self) -> None:
self.settings = get_mock_session_settings(self.subject)
self.session_dict = {
'subject': self.subject,
'start_time': '2018-04-01T12:48:26.795526',
'number': 2,
'users': [self.settings['PYBPOD_CREATOR'][0]]
}
# makes sure tests start without session created
self.td = tempfile.TemporaryDirectory()
self.session_path = Path(self.td.name).joinpath(self.subject, '2018-04-01', '002')
self.alf_path = self.session_path.joinpath('alf')
self.alf_path.mkdir(parents=True)
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
self.revision = ''.join(random.choices(string.ascii_letters, k=3))
self.rev_path = self.alf_path.joinpath(f'#{self.revision}#')
self.rev_path.mkdir(parents=True)
np.save(self.rev_path.joinpath('spikes.times.npy'), np.random.random(300))
np.save(self.rev_path.joinpath('spikes.amps.npy'), np.random.random(300))
self.today_revision = datetime.datetime.today().strftime('%Y-%m-%d')
# Create a revision if doesn't already exist
try:
self.rev = self.one.alyx.rest('revisions', 'read', id=self.revision)
except HTTPError:
self.rev = self.one.alyx.rest('revisions', 'create', data={'name': self.revision})
# Create a new tag
tag_name = 'test_tag_' + ''.join(random.choices(string.ascii_letters, k=5))
tag_data = {'name': tag_name, 'protected': True}
self.tag = self.one.alyx.rest('tags', 'create', data=tag_data)
[docs]
def test_registration_datasets(self):
# registers a single file
ses = self.one.alyx.rest('sessions', 'create', data=self.session_dict)
st_file = self.alf_path.joinpath('spikes.times.npy')
registration.register_dataset(file_list=st_file, one=self.one)
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
self.assertTrue(len(dsets) == 1)
# registers a list of files
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one)
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
self.assertTrue(len(dsets) == 2)
self.assertTrue(all(not d['revision'] for d in r))
self.assertTrue(all(d['default'] for d in r))
self.assertTrue(all(d['collection'] == 'alf' for d in r))
# simulate all the datasets exists, re-register and asserts that exists is set to True
# as the files haven't changed
frs = self.one.alyx.rest('files', 'list', django=f"dataset__session,{ses['url'][-36:]}")
for fr in frs:
self.one.alyx.rest('files', 'partial_update',
id=fr['url'][-36:], data={'exists': True})
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(all(fr['exists'] for fr in rr['file_records']) for rr in r))
# now that files have changed, makes sure the exists flags are set to False
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
r = registration.register_dataset(file_list=flist, one=self.one)
self.assertTrue(all(all(not fr['exists'] for fr in rr['file_records']) for rr in r))
# Add a protected tag to all the datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Check that we get an exception error unless force=True
flist = list(self.rev_path.glob('*.npy'))
with self.assertRaises(FileExistsError):
registration.register_dataset(file_list=flist, one=self.one)
# Test registering with a revision already in the file path, should use this rather than create one with today's date
flist = list(self.rev_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one, force=True)
self.assertTrue(all(d['revision'] == self.revision for d in r))
self.assertTrue(all(d['default'] for d in r))
self.assertTrue(all(d['collection'] == 'alf' for d in r))
# Add a protected tag to all the datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:])
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Register again with revision in file path, it should register to self.revision + a
flist = list(self.rev_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one, force=True)
self.assertTrue(all(d['revision'] == f'{self.revision}a' for d in r))
self.assertTrue(self.alf_path.joinpath(f'#{self.revision}a#', 'spikes.times.npy').exists())
self.assertTrue(self.alf_path.joinpath(f'#{self.revision}a#', 'spikes.amps.npy').exists())
self.assertFalse(self.alf_path.joinpath(f'#{self.revision}#', 'spikes.times.npy').exists())
self.assertFalse(self.alf_path.joinpath(f'#{self.revision}#', 'spikes.amps.npy').exists())
# When we re-register the original it should move them into revision with today's date
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one, force=True)
self.assertTrue(all(d['revision'] == self.today_revision for d in r))
self.assertTrue(self.alf_path.joinpath(f'#{self.today_revision}#', 'spikes.times.npy').exists())
self.assertTrue(self.alf_path.joinpath(f'#{self.today_revision}#', 'spikes.amps.npy').exists())
self.assertFalse(self.alf_path.joinpath('spikes.times.npy').exists())
self.assertFalse(self.alf_path.joinpath('spikes.amps.npy').exists())
# Protect the latest datasets
dsets = self.one.alyx.rest('datasets', 'list', session=ses['url'][-36:], no_cache=True)
for d in dsets:
self.one.alyx.rest('datasets', 'partial_update',
id=d['url'][-36:], data={'tags': [self.tag['name']]})
# Same day revision
# Need to remake the original files
np.save(self.alf_path.joinpath('spikes.times.npy'), np.random.random(500))
np.save(self.alf_path.joinpath('spikes.amps.npy'), np.random.random(500))
flist = list(self.alf_path.glob('*.npy'))
r = registration.register_dataset(file_list=flist, one=self.one, force=True)
self.assertTrue(all(d['revision'] == self.today_revision + 'a' for d in r))
def _write_settings_file(self):
behavior_path = self.session_path.joinpath('raw_behavior_data')
behavior_path.mkdir()
settings_file = behavior_path.joinpath('_iblrig_taskSettings.raw.json')
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
return settings_file
[docs]
def test_create_sessions(self):
flag_file = self.session_path.joinpath('create_me.flag')
flag_file.touch()
rc = registration.IBLRegistrationClient(one=self.one)
# Should raise an error if settings file does not exist
self.assertRaises(ValueError, rc.create_sessions, self.session_path)
self._write_settings_file()
# Test dry
sessions, records = rc.create_sessions(self.session_path, dry=True)
self.assertEqual(1, len(sessions))
self.assertEqual(sessions[0], self.session_path)
self.assertIsNone(records[0])
self.assertTrue(flag_file.exists())
# Test not dry
sessions, records = rc.create_sessions(self.session_path, dry=False)
self.assertEqual(1, len(sessions))
self.assertEqual(sessions[0], self.session_path)
self.assertFalse(flag_file.exists())
[docs]
def test_registration_session(self):
"""Test IBLRegistrationClient.register_session method."""
settings_file = self._write_settings_file()
rc = registration.IBLRegistrationClient(one=self.one)
rc.register_session(str(self.session_path), procedures=['Ephys recording with acute probe(s)'])
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
datasets = self.one.alyx.rest('datasets', 'list', session=eid)
for ds in datasets:
self.assertTrue(ds['hash'] is not None)
self.assertTrue(ds['file_size'] is not None)
self.assertTrue(ds['version'] == ibllib.__version__)
# checks the procedure of the session
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == ['Ephys recording with acute probe(s)'])
self.one.alyx.rest('sessions', 'delete', id=eid)
# re-register the session as behaviour this time
self.settings['PYBPOD_PROTOCOL'] = '_iblrig_tasks_trainingChoiceWorld6.3.1'
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
rc.register_session(self.session_path)
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == [])
# re-register the session as unknown protocol, this time without removing session first
self.settings['PYBPOD_PROTOCOL'] = 'gnagnagna'
# also add an end time
start = datetime.datetime.fromisoformat(self.settings['SESSION_DATETIME'])
self.settings['SESSION_START_TIME'] = rc.ensure_ISO8601(start)
self.settings['SESSION_END_TIME'] = rc.ensure_ISO8601(start + datetime.timedelta(hours=1))
with open(settings_file, 'w') as fid:
json.dump(self.settings, fid)
rc.register_session(self.session_path)
eid = self.one.search(subject=self.subject, date_range=['2018-04-01', '2018-04-01'],
query_type='remote')[0]
ses_info = self.one.alyx.rest('sessions', 'read', id=eid)
self.assertTrue(ses_info['procedures'] == [])
self.assertEqual(self.settings['SESSION_END_TIME'], ses_info['end_time'])
self.one.alyx.rest('sessions', 'delete', id=eid)
[docs]
def test_registration_session_passive(self):
"""Test IBLRegistrationClient.register_session method when there is no iblrig bpod data.
For truly passive sessions there is no Bpod data (no raw_behavior_data or raw_task_data folders).
In this situation the must already be a session on Alyx manually created by the experimenter,
which needs to contain the start time, location, lab, and user data.
"""
rc = registration.IBLRegistrationClient(one=self.one)
experiment_description = {
'procedures': ['Ephys recording with acute probe(s)'],
'sync': {'nidq': {'collection': 'raw_ephys_data'}}}
session_params.write_params(self.session_path, experiment_description)
# Should fail because the session doesn't exist on Alyx
self.assertRaises(AssertionError, rc.register_session, self.session_path)
# Create the session
ses_ = {
'subject': self.subject, 'users': [self.one.alyx.user],
'type': 'Experiment', 'number': int(self.session_path.name),
'start_time': rc.ensure_ISO8601(self.session_path.parts[-2]),
'n_correct_trials': 100, 'n_trials': 200
}
session = self.one.alyx.rest('sessions', 'create', data=ses_)
# Should fail because the session lacks critical information
self.assertRaisesRegex(
AssertionError, 'missing session information: location', rc.register_session, self.session_path)
session = self.one.alyx.rest(
'sessions', 'partial_update', id=session['url'][-36:], data={'location': self.settings['PYBPOD_BOARD']})
# Should now register
ses, dsets = rc.register_session(self.session_path)
# Check that session was updated, namely the n trials and procedures
self.assertEqual(session['url'], ses['url'])
self.assertTrue(ses['n_correct_trials'] == ses['n_trials'] == 0)
self.assertEqual(experiment_description['procedures'], ses['procedures'])
self.assertEqual(5, len(dsets))
registered = [d['file_records'][0]['relative_path'] for d in dsets]
expected = [
f'{self.subject}/2018-04-01/002/_ibl_experiment.description.yaml',
f'{self.subject}/2018-04-01/002/alf/spikes.amps.npy',
f'{self.subject}/2018-04-01/002/alf/spikes.times.npy',
f'{self.subject}/2018-04-01/002/alf/#{self.revision}#/spikes.amps.npy',
f'{self.subject}/2018-04-01/002/alf/#{self.revision}#/spikes.times.npy'
]
self.assertCountEqual(expected, registered)
[docs]
def test_register_chained_session(self):
"""Tests for registering a session with chained (multiple) protocols."""
behaviour_paths = [self.session_path.joinpath(f'raw_task_data_{i:02}') for i in range(2)]
for p in behaviour_paths:
p.mkdir()
# Set the collections
params_path = Path(__file__).parent.joinpath('fixtures', 'io', '_ibl_experiment.description.yaml')
experiment_description = session_params.read_params(params_path)
assert experiment_description
collections = map(lambda x: next(iter(x.values())), experiment_description['tasks'])
for collection, d in zip(map(lambda x: x.parts[-1], behaviour_paths), collections):
d['collection'] = collection
# Save experiment description
session_params.write_params(self.session_path, experiment_description)
self.settings['POOP_COUNT'] = 10
with open(behaviour_paths[1].joinpath('_iblrig_taskSettings.raw.json'), 'w') as fid:
json.dump(self.settings, fid)
settings = self.settings.copy()
settings['PYBPOD_PROTOCOL'] = '_iblrig_tasks_passiveChoiceWorld'
settings['POOP_COUNT'] = 53
start_time = (datetime.datetime.fromisoformat(settings['SESSION_DATETIME']) -
datetime.timedelta(hours=1, minutes=2, seconds=12))
settings['SESSION_DATETIME'] = start_time.isoformat()
with open(behaviour_paths[0].joinpath('_iblrig_taskSettings.raw.json'), 'w') as fid:
json.dump(settings, fid)
rc = registration.IBLRegistrationClient(one=self.one)
session, recs = rc.register_session(self.session_path)
self.assertEqual(7, len(recs))
ses_info = self.one.alyx.rest('sessions', 'read', id=session['id'])
self.assertCountEqual(experiment_description['procedures'], ses_info['procedures'])
self.assertCountEqual(experiment_description['projects'], ses_info['projects'])
# Poo count should be sum of values in both settings files
expected = {'IS_MOCK': False, 'IBLRIG_VERSION': '5.4.1', 'POOP_COUNT': 63}
self.assertDictEqual(expected, ses_info['json'])
self.assertEqual('2018-04-01T11:46:14.795526', ses_info['start_time'])
# Test task protocol
expected = '_iblrig_tasks_passiveChoiceWorld5.4.1/_iblrig_tasks_ephysChoiceWorld5.4.1'
self.assertEqual(expected, ses_info['task_protocol'])
# Test weightings created on Alyx
w = self.one.alyx.rest('subjects', 'read', id=self.subject)['weighings']
self.assertEqual(2, len(w))
self.assertCountEqual({22.}, {x['weight'] for x in w})
weight_dates = {x['date_time'] for x in w}
self.assertEqual(2, len(weight_dates))
self.assertIn(ses_info['start_time'], weight_dates)
[docs]
def tearDown(self) -> None:
self.td.cleanup()
self.one.alyx.rest('revisions', 'delete', id=self.rev['name'])
self.one.alyx.rest('tags', 'delete', id=self.tag['id'])
today_revision = self.one.alyx.rest('revisions', 'list', id=self.today_revision)
today_rev = [rev for rev in today_revision if self.today_revision in rev['name']]
for rev in today_rev:
self.one.alyx.rest('revisions', 'delete', id=rev['name'])
v1_rev = [rev for rev in today_revision if self.revision in rev['name']]
for rev in v1_rev:
self.one.alyx.rest('revisions', 'delete', id=rev['name'])
# Delete weighings
for w in self.one.alyx.rest('subjects', 'read', id=self.subject)['weighings']:
self.one.alyx.rest('weighings', 'delete', id=w['url'].split('/')[-1])
# Delete sessions
eid = self.one.search(subject=self.subject, date_range='2018-04-01', query_type='remote')
for ei in eid:
self.one.alyx.rest('sessions', 'delete', id=ei)
[docs]
@classmethod
def tearDownClass(cls) -> None:
# Note: sessions and datasets deleted in cascade
cls.one.alyx.rest('subjects', 'delete', id=cls.subject)
[docs]
class TestDataHandlers(unittest.TestCase):
"""Tests for ibllib.oneibl.data_handlers.DataHandler classes."""
[docs]
@mock.patch('ibllib.oneibl.data_handlers.register_dataset')
def test_server_upload_data(self, register_dataset_mock):
"""A test for ServerDataHandler.uploadData method."""
one = mock.MagicMock()
one.alyx = None
signature = {
'input_files': [],
'output_files': [
('_iblrig_taskData.raw.*', 'raw_task_data_00', True),
('_iblrig_taskSettings.raw.*', 'raw_task_data_00', True),
('_iblrig_encoderEvents.raw*', 'raw_task_data_00', False),
]
}
handler = handlers.ServerDataHandler('subject/2023-01-01/001', signature, one=one)
register_dataset_mock.return_value = [{'id': 1}, {'id': 2}, {'id': 3}] # toy alyx dataset records
# in reality these would be pathlib.Path objects
files = ['_iblrig_taskData.raw.jsonable', '_iblrig_taskSettings.raw.json', '_iblrig_encoderEvents.raw.json']
# check dry=True (shouldn't updated processed map)
out = handler.uploadData(files, '0.0.0', dry=True)
self.assertEqual(register_dataset_mock.return_value, out)
self.assertEqual({}, handler.processed)
register_dataset_mock.assert_called_once_with(files, one=one, versions=['0.0.0'] * 3, repository=None, dry=True)
register_dataset_mock.reset_mock()
# check dry=False
out = handler.uploadData(files, '0.0.0', dry=False)
self.assertEqual(register_dataset_mock.return_value, out)
register_dataset_mock.assert_called_once_with(files, one=one, versions=['0.0.0'] * 3, repository=None, dry=False)
expected = {k: v for k, v in zip(files, register_dataset_mock.return_value)}
self.assertDictEqual(expected, handler.processed)
# with clobber=False, register_dataset should be called with no new files to register
expected = out
register_dataset_mock.return_value = None
out = handler.uploadData(files, '0.0.0', dry=False)
self.assertEqual([], register_dataset_mock.call_args.args[0])
self.assertEqual(expected, out, 'failed to return previously registered dataset records')
# try with 1 new file
register_dataset_mock.return_value = [{'id': 4}]
files.append('_misc_taskSettings.raw.json')
out = handler.uploadData(files, '0.0.0')
self.assertEqual([files[-1]], register_dataset_mock.call_args.args[0])
expected += register_dataset_mock.return_value
self.assertEqual(expected, out)
expected = {k: v for k, v in zip(files, expected)}
self.assertDictEqual(expected, handler.processed)
# try with clobber=True
register_dataset_mock.return_value = out
out = handler.uploadData(files, '0.0.0', clobber=True)
self.assertEqual(files, register_dataset_mock.call_args.args[0], 'failed to re-register files')
self.assertEqual(4, len(out))
self.assertDictEqual(expected, handler.processed)
[docs]
def test_getData(self):
"""Test for DataHandler.getData method."""
one = ONE(**TEST_DB, mode='auto')
session_path = Path('KS005/2019-04-01/001')
task = ChoiceWorldTrialsBpod(session_path, one=one, collection='raw_behavior_data')
task.get_signatures()
handler = handlers.ServerDataHandler(session_path, task.signature, one=one)
# Check getData returns data frame of signature inputs
df = handler.getData()
self.assertIsInstance(df, pd.DataFrame)
self.assertEqual(len(task.input_files), len(df))
# Check with no ONE
handler.one = None
self.assertIsNone(handler.getData())
# Check when no inputs
handler.signature['input_files'] = []
self.assertTrue(handler.getData(one=one).empty)
[docs]
def test_dataset_from_name(self):
"""Test dataset_from_name function."""
I = handlers.ExpectedDataset.input # noqa
dA = I('foo.bar.*', 'alf', True)
dB = I('bar.baz.*', 'alf', True)
input_files = [I('obj.attr.ext', 'collection', True), dA | dB]
dsets = handlers.dataset_from_name('obj.attr.ext', input_files)
self.assertEqual(input_files[0:1], dsets)
dsets = handlers.dataset_from_name('bar.baz.*', input_files)
self.assertEqual([dB], dsets)
dsets = handlers.dataset_from_name('foo.baz.ext', input_files)
self.assertEqual([], dsets)
[docs]
def test_update_collections(self):
"""Test update_collections function."""
I = handlers.ExpectedDataset.input # noqa
dataset = I('foo.bar.ext', 'alf/probe??', True, unique=False)
dset = handlers.update_collections(dataset, 'alf/probe00')
self.assertIsNot(dset, dataset)
self.assertEqual('alf/probe00', dset.identifiers[0])
self.assertEqual(dataset.register, dset.register)
self.assertTrue(dset.unique)
# Check replacing with non-unique collection
dset = handlers.update_collections(dataset, 'probe??')
self.assertFalse(dset.unique)
self.assertEqual('probe??', dset.identifiers[0])
# Check replacing with multiple collections
collections = ['alf/probe00', 'alf/probe01']
dset = handlers.update_collections(dataset, collections)
self.assertIsInstance(dset, handlers.Input)
self.assertEqual('and', dset.operator)
self.assertCountEqual(collections, [x[0] for x in dset.identifiers])
# Check with register values and compound datasets
dataset = (I('foo.bar.*', 'alf/probe??', True, unique=False) |
I('baz.bar.*', 'alf/probe??', False, True, unique=False))
dset = handlers.update_collections(dataset, collections)
self.assertIsInstance(dset, handlers.Input)
self.assertEqual('or', dset.operator)
for d in dset._identifiers:
self.assertIsInstance(d, handlers.Input)
self.assertEqual('and', d.operator)
self.assertCountEqual(collections, [x[0] for x in d.identifiers])
self.assertFalse(any(dd.unique for dd in d._identifiers))
# Check behaviour with unique and substring args
dset = handlers.update_collections(
dataset, ['probe00', 'probe01'], substring='probe??', unique=True)
for d in dset._identifiers:
self.assertCountEqual(collections, [x[0] for x in d.identifiers])
self.assertTrue(all(dd.unique for dd in d._identifiers))
# Check behaviour with None collections and optional dataset
dataset = (I('foo.bar.*', 'alf/probe00', True, unique=False) |
I('baz.bar.*', None, False, True, unique=False))
dset = handlers.update_collections(dataset, 'foo', substring='alf')
expected_types = (handlers.Input, handlers.OptionalInput)
expected_collections = ('foo/probe00', None)
for d, typ, col in zip(dset._identifiers, expected_types, expected_collections):
self.assertIsInstance(d, typ)
self.assertEqual(col, d.identifiers[0])
# Check with revisions
dataset = I(None, None, True)
dataset._identifiers = ('alf', '#2020-01-01#', 'foo.bar.ext')
self.assertRaises(NotImplementedError, handlers.update_collections, dataset, None)
[docs]
class TestSDSCDataHandler(unittest.TestCase):
"""Test for SDSCDataHandler class."""
[docs]
def setUp(self):
tmp = tempfile.TemporaryDirectory()
self.addCleanup(tmp.cleanup)
self.one = ONE(**TEST_DB, mode='auto')
self.patch_path = Path(tmp.name, 'patch')
self.root_path = Path(tmp.name, 'root')
self.root_path.mkdir(), self.patch_path.mkdir()
self.session_path = self.root_path.joinpath('KS005/2019-04-01/001')
[docs]
def test_handler(self):
"""Test for SDSCDataHandler.setUp and cleanUp methods."""
# Create a task in order to check how the signature files are symlinked by handler
task = ChoiceWorldTrialsBpod(self.session_path, one=self.one, collection='raw_behavior_data')
task.get_signatures()
handler = handlers.SDSCDataHandler(self.session_path, task.signature, self.one)
handler.patch_path = self.patch_path
handler.root_path = self.root_path
# Add some files on disk to check they are symlinked by setUp method
for uid, rel_path in handler.getData().rel_path.items():
filepath = self.session_path.joinpath(rel_path)
filepath.parent.mkdir(exist_ok=True, parents=True)
filepath.with_stem(f'{filepath.stem}.{uid}').touch()
# Check setUp does the symlinks and updates the task session path
handler.setUp(task=task)
expected = self.patch_path.joinpath('ChoiceWorldTrialsBpod', *self.session_path.parts[-3:])
self.assertEqual(expected, task.session_path, 'failed to update task session path')
linked = list(expected.glob('raw_behavior_data/*.*.*'))
self.assertTrue(all(f.is_symlink for f in linked), 'failed to sym link input files')
self.assertEqual(len(task.input_files), len(linked), 'unexpected number of linked patch files')
# Check all links to root session
session_path = self.session_path.resolve() # NB: GitHub CI uses linked temp dir so we resolve here
self.assertTrue(all(f.resolve().is_relative_to(session_path) for f in linked))
# Check sym link doesn't contain UUID
self.assertTrue(len(linked[0].resolve().stem.split('.')[-1]) == 36, 'source path missing UUID')
self.assertFalse(len(linked[0].stem.split('.')[-1]) == 36, 'symlink contains UUID')
# Check cleanUp removes links
handler.cleanUp(task=task)
self.assertFalse(any(map(Path.exists, linked)))
# Check no root files were deleted
self.assertEqual(len(task.input_files), len(list(self.session_path.glob('raw_behavior_data/*.*.*'))))
[docs]
class TestExpectedDataset(unittest.TestCase):
[docs]
def setUp(self):
tmp = tempfile.TemporaryDirectory()
self.addCleanup(tmp.cleanup)
self.tmp = Path(tmp.name)
self.session_path = self.tmp / 'subject' / '2020-01-01' / '001'
self.session_path.mkdir(parents=True)
self.img_path = self.session_path / 'raw_imaging_data_00'
self.img_path.mkdir()
for i in range(5):
self.img_path.joinpath(f'foo_{i:05}.tif').touch()
self.img_path.joinpath('imaging.frames.tar.bz2').touch()
[docs]
def test_and(self):
I = handlers.ExpectedDataset.input # noqa
sig = I('*.tif', 'raw_imaging_data_[0-9]*') & I('imaging.frames.tar.bz2', 'raw_imaging_data_[0-9]*')
self.assertEqual('and', sig.operator)
ok, files, missing = sig.find_files(self.session_path)
self.assertTrue(ok)
self.assertEqual(6, len(files))
self.assertEqual('foo_00000.tif', files[0].name)
self.assertEqual('imaging.frames.tar.bz2', files[-1].name)
self.assertEqual(set(), missing)
# Deleting one tif file shouldn't affect the signature
files[0].unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertTrue(ok)
self.assertTrue(len(files))
self.assertEqual('foo_00001.tif', files[0].name)
self.assertEqual(set(), missing)
# Deleting the tar file should make the signature fail
files[-1].unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertFalse(ok)
self.assertNotEqual('imaging.frames.tar.bz2', files[-1].name)
self.assertEqual({'raw_imaging_data_[0-9]*/imaging.frames.tar.bz2'}, missing)
[docs]
def test_or(self):
I = handlers.ExpectedDataset.input # noqa
sig = I('*.tif', 'raw_imaging_data_[0-9]*') | I('imaging.frames.tar.bz2', 'raw_imaging_data_[0-9]*')
self.assertEqual('or', sig.operator)
ok, files, missing = sig.find_files(self.session_path)
self.assertTrue(ok)
self.assertEqual(5, len(files))
self.assertEqual({'.tif'}, set(x.suffix for x in files))
self.assertEqual(set(), missing)
for f in files:
f.unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertTrue(ok)
self.assertEqual(1, len(files))
self.assertEqual('imaging.frames.tar.bz2', files[0].name)
self.assertEqual({'raw_imaging_data_[0-9]*/*.tif'}, missing)
files[0].unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertFalse(ok)
self.assertEqual([], files)
self.assertEqual({'raw_imaging_data_[0-9]*/*.tif', 'raw_imaging_data_[0-9]*/imaging.frames.tar.bz2'}, missing)
[docs]
def test_xor(self):
I = handlers.ExpectedDataset.input # noqa
sig = I('*.tif', 'raw_imaging_data_[0-9]*') ^ I('imaging.frames.tar.bz2', 'raw_imaging_data_[0-9]*')
self.assertEqual('xor', sig.operator)
ok, files, missing = sig.find_files(self.session_path)
self.assertFalse(ok)
self.assertEqual(6, len(files))
self.assertEqual({'.tif', '.bz2'}, set(x.suffix for x in files))
expected_missing = {'raw_imaging_data_[0-9]*/*.tif', 'raw_imaging_data_[0-9]*/imaging.frames.tar.bz2'}
self.assertEqual(expected_missing, missing)
for f in filter(lambda x: x.suffix == '.tif', files):
f.unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertTrue(ok)
self.assertEqual(1, len(files))
self.assertEqual('imaging.frames.tar.bz2', files[0].name)
self.assertEqual(set(), missing)
files[0].unlink()
ok, files, missing = sig.find_files(self.session_path)
self.assertFalse(ok)
self.assertEqual([], files)
self.assertEqual(expected_missing, missing)
[docs]
def test_filter(self):
"""Test for ExpectedDataset.filter method.
This test can be extended to support AND operators e.g. (dset1 AND dset2) OR (dset2 AND dset3).
"""
I = handlers.ExpectedDataset.input # noqa
# Optional datasets 1
column_names = ['session_path', 'id', 'rel_path', 'file_size', 'hash', 'exists']
session_path = '/'.join(self.session_path.parts[-3:])
dsets = [
[session_path, uuid4(), f'raw_ephys_data/_spikeglx_sync.{attr}.npy', 1024, None, True]
for attr in ('channels', 'polarities', 'times')
]
qc = pd.Categorical.from_codes(np.zeros(len(dsets), dtype=int), dtype=QC_TYPE)
df1 = pd.DataFrame(dsets, columns=column_names).set_index('id').sort_index().assign(qc=qc)
# Optional datasets 2
dsets = [
[session_path, uuid4(), f'raw_ephys_data/probe{p:02}/_spikeglx_sync.{attr}.probe{p:02}.npy', 1024, None, True]
for attr in ('channels', 'polarities', 'times') for p in range(2)
]
qc = pd.Categorical.from_codes(np.zeros(len(dsets), dtype=int), dtype=QC_TYPE)
df2 = pd.DataFrame(dsets, columns=column_names).set_index('id').sort_index().assign(qc=qc)
# Test simple input (no op)
d = I('_*_sync.channels.npy', 'raw_ephys_data', True)
ok, filtered_df = d.filter(df1)
self.assertTrue(ok)
expected = df1.index[df1['rel_path'] == 'raw_ephys_data/_spikeglx_sync.channels.npy']
self.assertCountEqual(expected, filtered_df.index)
# Test inverted (assert absence)
d.inverted = True
ok, filtered_df = d.filter(df1)
self.assertFalse(ok)
expected = df1.index[df1['rel_path'] == 'raw_ephys_data/_spikeglx_sync.channels.npy']
self.assertCountEqual(expected, filtered_df.index)
ok, filtered_df = d.filter(df2)
self.assertTrue(ok)
# Test OR
d = (I('_spikeglx_sync.channels.npy', 'raw_ephys_data', True) |
I('_spikeglx_sync.channels.probe??.npy', 'raw_ephys_data/probe??', True, unique=False))
merged = pd.concat([df1, df2])
ok, filtered_df = d.filter(merged)
self.assertTrue(ok)
self.assertCountEqual(expected, filtered_df.index)
ok, filtered_df = d.filter(df2)
self.assertTrue(ok)
expected = df2.index[df2['rel_path'].str.contains('channels')]
self.assertCountEqual(expected, filtered_df.index)
# Test XOR
d = (I('_spikeglx_sync.channels.npy', 'raw_ephys_data', True) ^
I('_spikeglx_sync.channels.probe??.npy', 'raw_ephys_data/probe??', True, unique=False))
ok, filtered_df = d.filter(merged)
self.assertFalse(ok)
expected = merged.index[merged['rel_path'].str.contains('channels')]
self.assertCountEqual(expected, filtered_df.index)
ok, filtered_df = d.filter(df1)
self.assertTrue(ok)
expected = df1.index[df1['rel_path'] == 'raw_ephys_data/_spikeglx_sync.channels.npy']
self.assertCountEqual(expected, filtered_df.index)
ok, filtered_df = d.filter(df2)
self.assertTrue(ok)
expected = df2.index[df2['rel_path'].str.contains('channels')]
self.assertCountEqual(expected, filtered_df.index)
[docs]
def test_identifiers(self):
"""Test identifiers property getter."""
I = handlers.ExpectedDataset.input # noqa
dataset = I('foo.bar.baz', 'collection', True)
self.assertEqual(('collection', None, 'foo.bar.baz'), dataset.identifiers)
# Ensure that nested identifiers are returned as flat tuple of tuples
dataset = (I('dataset_a', None, True, unique=False) |
I('dataset_b', 'foo', True) |
I('dataset_c', None, True, unique=False))
self.assertEqual(3, len(dataset.identifiers))
self.assertEqual({3}, set(map(len, dataset.identifiers)))
expected = ((None, None, 'dataset_a'), ('foo', None, 'dataset_b'), (None, None, 'dataset_c'))
self.assertEqual(expected, dataset.identifiers)
if __name__ == '__main__':
unittest.main()