Source code for one.tests.test_one

"""Unit tests for the one.api module.

Wherever possible the ONE tests should not rely on an internet connection.

Fixture locations:

- The cache tables for the public test instance are in tests/fixtures/
- The test db parameters can be found in tests/fixtures/params/
- Some REST GET requests can be found in tests/fixtures/rest_responses/
- These can be copied over to a temporary directory using the functions in tests/util.py,
  then construct ONE with the directory as cache_dir, mode='local' and silent=True

Imported constants:

- For tests that do require a remote connection use the tests.OFFLINE_ONLY flag in the skipIf
  decorator.
- For testing REST POST requests use TEST_DB_1 (test.alyx.internationalbrainlab.org).
- For testing download functions, use TEST_DB_2 (openalyx.internationalbrainlab.org).

Note ONE and AlyxClient use caching:

- When verifying remote changes via the rest method, use the no_cache flag to ensure the remote
  databaseis queried.  You can clear the cache using AlyxClient.clear_rest_cache(),
  or mock iblutil.io.params.getfile to return a temporary cache directory.
- An One object created through the one.api.ONE function, make sure you restore the
  properties to their original state on teardown, or call one.api.ONE.cache_clear().

"""
import datetime
import logging
import time
from pathlib import Path
from itertools import permutations, combinations_with_replacement
from functools import partial
import unittest
from unittest import mock
import tempfile
from uuid import UUID, uuid4
import io

import numpy as np
import pandas as pd
from requests.exceptions import HTTPError
from iblutil.io import parquet
from iblutil.util import Bunch

from one import __version__
from one.api import ONE, One, OneAlyx
from one.util import (
    validate_date_range, index_last_before, filter_datasets, _collection_spec,
    filter_revision_last_before, parse_id, autocomplete, LazyId, ensure_list
)
import one.params
import one.alf.exceptions as alferr
from one.converters import datasets2records
from one.alf import spec
from one.alf.path import get_alf_path
from one.alf.cache import EMPTY_DATASETS_FRAME, EMPTY_SESSIONS_FRAME
from . import util
from . import OFFLINE_ONLY, TEST_DB_1, TEST_DB_2  # 1 = TestAlyx; 2 = OpenAlyx


[docs] class TestONECache(unittest.TestCase): """Test methods that use sessions and datasets tables. This class loads the parquet tables from the fixtures and builds a file tree in a temp folder """ tempdir = None
[docs] def setUp(self) -> None: self.tempdir = util.set_up_env() # Create ONE object with temp cache dir self.one = ONE(mode='local', cache_dir=self.tempdir.name) # Create dset files from cache util.create_file_tree(self.one) # here we create some variations to get coverage over more case # the 10 first records will have the right file size (0) but the wrong hash # the next 10 records will have the right file size (0) but the correct empty file md5 # all remaining records have NaN in file_size and None in hash (default cache table) cols = self.one._cache['datasets'].columns self.one._cache['datasets'].iloc[:20, cols.get_loc('file_size')] = 0 self.one._cache['datasets'].iloc[:20, cols.get_loc('hash')]\ = 'd41d8cd98f00b204e9800998ecf8427e' # empty hash correct self.one._cache['datasets'].iloc[:10, cols.get_loc('hash')]\ = 'd41d8cda454aaaa4e9800998ecf8497e' # wrong hash
[docs] def tearDown(self) -> None: while Path(self.one.cache_dir).joinpath('.cache.lock').exists(): time.sleep(.1) self.tempdir.cleanup()
[docs] def test_list_subjects(self): """Test One.list_subejcts""" subjects = self.one.list_subjects() expected = ['KS005', 'ZFM-01935', 'ZM_1094', 'ZM_1150', 'ZM_1743', 'ZM_335', 'clns0730', 'flowers'] self.assertCountEqual(expected, subjects)
[docs] def test_offline_repr(self): """Test for One.offline property""" self.assertTrue('offline' in str(self.one)) self.assertTrue(str(self.tempdir.name) in str(self.one))
[docs] def test_filter(self): """Test one.util.filter_datasets""" datasets = self.one._cache.datasets.iloc[:5].copy() # Test identity verifiable = filter_datasets(datasets, None, None, None, assert_unique=False, revision_last_before=False) self.assertEqual(len(datasets), len(verifiable)) # Test collection filter verifiable = filter_datasets(datasets, None, 'alf', None, assert_unique=False, revision_last_before=False) self.assertEqual(2, len(verifiable)) with self.assertRaises(alferr.ALFMultipleCollectionsFound): filter_datasets(datasets, None, 'raw.*', None, revision_last_before=False) # Test filter empty collection datasets.rel_path[-1] = '_ibl_trials.rewardVolume.npy' verifiable = filter_datasets(datasets, None, '', None, revision_last_before=False) self.assertTrue(len(verifiable), 1) # Test dataset filter verifiable = filter_datasets(datasets, '_iblrig_.*', None, None, assert_unique=False, revision_last_before=False) self.assertEqual(3, len(verifiable)) with self.assertRaises(alferr.ALFMultipleObjectsFound): filter_datasets(datasets, '_iblrig_.*', 'raw_video_data', None, revision_last_before=False) # Test list as logical OR verifiable = filter_datasets(datasets, ['alf/_ibl_wheel.*', '_ibl_trials.*'], None, None, assert_unique=False, revision_last_before=False) self.assertTrue(all(x.startswith('_ibl_trials') or x.startswith('alf/_ibl_wheel') for x in verifiable.rel_path)) # Test as dict dataset = dict(namespace='ibl', object='trials') verifiable = filter_datasets(datasets, dataset, None, None, assert_unique=False, revision_last_before=False) self.assertEqual(1, len(verifiable)) dataset = dict(timescale='bpod', object='trials') verifiable = filter_datasets(self.one._cache.datasets, dataset, None, None, assert_unique=False, revision_last_before=False) self.assertEqual(verifiable['rel_path'].values[0], 'alf/_ibl_trials.intervals_bpod.npy') # As dict with list (should act as logical OR) dataset = dict(attribute=['time.+', 'raw']) verifiable = filter_datasets(datasets, dataset, None, None, assert_unique=False, revision_last_before=False) self.assertEqual(4, len(verifiable)) # QC datasets.loc[:, 'qc'] = ['NOT_SET', 'PASS', 'WARNING', 'FAIL', 'CRITICAL'] verifiable = filter_datasets(datasets, assert_unique=False) self.assertEqual(4, len(verifiable), 'failed to filter QC value') self.assertTrue(all(verifiable.qc < 'CRITICAL'), 'did not exclude CRITICAL QC by default') # 'ignore_qc_not_set' kwarg should ignore records without QC verifiable = filter_datasets(datasets, assert_unique=False, ignore_qc_not_set=True) self.assertEqual(3, len(verifiable), 'failed to filter QC value') self.assertTrue(all(verifiable.qc > 'NOT_SET'), 'did not exclude NOT_SET QC datasets') # Check QC input types verifiable = filter_datasets(datasets, assert_unique=False, qc='PASS') self.assertEqual(2, len(verifiable), 'failed to filter QC value') self.assertTrue(all(verifiable.qc < 'WARNING')) verifiable = filter_datasets(datasets, assert_unique=False, qc=30) self.assertEqual(3, len(verifiable), 'failed to filter QC value') self.assertTrue(all(verifiable.qc < 'FAIL')) verifiable = filter_datasets(datasets, qc=spec.QC.PASS, ignore_qc_not_set=True) self.assertEqual(1, len(verifiable)) self.assertTrue(all(verifiable['qc'] == 'PASS')) datasets.iat[-1, -1] = 'PASS' # set CRITICAL dataset to PASS so not excluded by default # Revisions revisions = [ 'alf/probe00/#2020-01-01#/spikes.times.npy', 'alf/probe00/#2020-08-31#/spikes.times.npy', 'alf/probe00/spikes.times.npy', 'alf/probe00/#2021-xx-xx#/spikes.times.npy', 'alf/probe01/#2020-01-01#/spikes.times.npy' ] datasets['rel_path'] = revisions # Should return last revision before date for each collection/dataset # These comprise mixed revisions which should trigger ALF warning revision = '2020-09-06' expected_warn = 'Multiple revisions: "2020-08-31", "2020-01-01"' with self.assertWarnsRegex(alferr.ALFWarning, expected_warn): verifiable = filter_datasets(datasets, None, None, revision, assert_unique=False) self.assertEqual(2, len(verifiable)) self.assertTrue(all(x.split('#')[1] < revision for x in verifiable['rel_path'])) # with no default_revisions column there should be a warning about return latest revision # when no revision is provided. with self.assertWarnsRegex(alferr.ALFWarning, 'No default revision for dataset'): verifiable = filter_datasets( datasets, '*spikes.times*', 'alf/probe00', None, assert_unique=True, wildcards=True, revision_last_before=True) self.assertEqual(1, len(verifiable)) self.assertTrue(verifiable['rel_path'].str.contains('#2021-xx-xx#').all()) # Should return matching revision verifiable = filter_datasets(datasets, None, None, r'2020-08-\d{2}', assert_unique=False, revision_last_before=False) self.assertEqual(1, len(verifiable)) self.assertTrue(verifiable['rel_path'].str.contains('#2020-08-31#').all()) # Matches more than one revision; should raise error with self.assertRaises(alferr.ALFMultipleRevisionsFound): filter_datasets(datasets, None, '.*probe00', r'2020-0[18]-\d{2}', revision_last_before=False) # Should return revision that's lexicographically first for each dataset verifiable = filter_datasets(datasets, None, None, None, assert_unique=False) self.assertEqual(2, len(verifiable)) actual = tuple(x.split('#')[1] for x in verifiable['rel_path']) self.assertEqual(('2021-xx-xx', '2020-01-01'), actual) # Should return those without revision verifiable = filter_datasets(datasets, None, None, '', assert_unique=False) self.assertFalse(verifiable['rel_path'].str.contains('#').any()) # Should return empty verifiable = filter_datasets(datasets, None, '.*01', '', assert_unique=False) self.assertEqual(0, len(verifiable)) verifiable = filter_datasets(datasets, None, '.*01', None, assert_unique=False) self.assertEqual(1, len(verifiable)) self.assertTrue(verifiable['rel_path'].str.contains('#2020-01-01#').all()) # Should return dataset marked as default datasets['default_revision'] = [True] + [False] * 4 verifiable = filter_datasets(datasets, None, None, None, assert_unique=False) self.assertEqual(revisions[0], verifiable.rel_path.values[0]) # Load dataset with default revision verifiable = filter_datasets(datasets, '*spikes.times*', 'alf/probe00', None, assert_unique=True, wildcards=True, revision_last_before=True) self.assertEqual(verifiable.rel_path.to_list(), ['alf/probe00/#2020-01-01#/spikes.times.npy']) # When revision_last_before is false, expect multiple revisions error with self.assertRaises(alferr.ALFMultipleRevisionsFound): filter_datasets(datasets, '*spikes.times*', 'alf/probe00', None, assert_unique=True, wildcards=True, revision_last_before=False)
[docs] def test_filter_wildcards(self): """Test one.util.filter_datasets with wildcards flag set to True""" datasets = self.one._cache.datasets.iloc[:5].copy() # Test identity verifiable = filter_datasets(datasets, '_ibl_*', '*lf', None, assert_unique=False, wildcards=True) self.assertTrue(len(verifiable) == 2) # As dict with list (should act as logical OR) kwargs = dict(assert_unique=False, revision_last_before=False, wildcards=True) dataset = dict(attribute=['timestamp?', 'raw']) verifiable = filter_datasets(datasets, dataset, None, None, **kwargs) self.assertEqual(4, len(verifiable)) # Test handling greedy captures of collection parts when there are wildcards at the start # of the filename patten. # Add some identical files that exist in collections and sub-collections # (i.e. raw_ephys_data, raw_ephys_data/probe00, raw_ephys_data/probe01) all_datasets = self.one._cache.datasets meta_datasets = all_datasets[all_datasets.rel_path.str.contains('meta')].copy() datasets = pd.concat([datasets, meta_datasets]) # Matching *meta should not capture raw_ephys_data/probe00, etc. verifiable = filter_datasets(datasets, '*.meta', 'raw_ephys_data', None, **kwargs) expected = ['raw_ephys_data/_spikeglx_ephysData_g0_t0.nidq.meta'] self.assertCountEqual(expected, verifiable.rel_path) verifiable = filter_datasets(datasets, '*.meta', 'raw_ephys_data/probe??', None, **kwargs) self.assertEqual(2, len(verifiable)) verifiable = filter_datasets(datasets, '*.meta', 'raw_ephys_data*', None, **kwargs) self.assertEqual(3, len(verifiable))
[docs] def test_list_datasets(self): """Test One.list_datasets""" # test filename dsets = self.one.list_datasets(filename='_ibl_trials*') self.assertEqual(len(dsets), 18) dsets = self.one.list_datasets(filename='gnagnag') self.assertEqual(len(dsets), 0) # Test no eid dsets = self.one.list_datasets(details=True) self.assertEqual(len(dsets), len(self.one._cache.datasets)) self.assertFalse(dsets is self.one._cache.datasets) self.assertEqual(2, dsets.index.nlevels, 'details data frame should be with eid index') # Test list for eid dsets = self.one.list_datasets('KS005/2019-04-02/001', details=True) self.assertEqual(27, len(dsets)) self.assertEqual(1, dsets.index.nlevels, 'details data frame should be without eid index') # Test keep_eid_index parameter dsets = self.one.list_datasets('KS005/2019-04-02/001', details=True, keep_eid_index=True) self.assertEqual(27, len(dsets)) self.assertEqual(2, dsets.index.nlevels, 'details data frame should be with eid index') # Test filters filename = {'attribute': ['times', 'intervals'], 'extension': 'npy'} dsets = self.one.list_datasets('ZFM-01935/2021-02-05/001', filename) self.assertEqual(10, len(dsets)) self.assertTrue(all(any(y in x for y in ('.times.', '.intervals')) for x in dsets)) filename['attribute'][0] += '*' # Include wildcard to match both times and timestamps dsets = self.one.list_datasets('ZFM-01935/2021-02-05/001', filename) self.assertEqual(13, len(dsets)) self.assertEqual(3, sum('.timestamps.' in x for x in dsets)) # Test empty dsets = self.one.list_datasets('FMR019/2021-03-18/002', details=True) self.assertIsInstance(dsets, pd.DataFrame) self.assertEqual(len(dsets), 0) # Test details=False, with and without eid for eid in [None, 'KS005/2019-04-02/001']: dsets = self.one.list_datasets(eid, details=False) self.assertIsInstance(dsets, list) self.assertTrue(len(dsets) == np.unique(dsets).size) # Test default_revisions_only=True with self.assertRaises(alferr.ALFError): # should raise as no 'default_revision' column self.one.list_datasets('KS005/2019-04-02/001', default_revisions_only=True) # Add the column and add some alternates datasets = util.revisions_datasets_table(collections=['alf'], revisions=['', '2023-01-01']) datasets['default_revision'] = [False, True] * 2 self.one._cache.datasets['default_revision'] = True self.one._cache.datasets = pd.concat([self.one._cache.datasets, datasets]).sort_index() eid, *_ = datasets.index.get_level_values(0) dsets = self.one.list_datasets(eid, 'spikes.*', default_revisions_only=False) self.assertEqual(4, len(dsets)) dsets = self.one.list_datasets(eid, 'spikes.*', default_revisions_only=True) self.assertEqual(2, len(dsets)) self.assertTrue(all('#2023-01-01#' in x for x in dsets)) # Should be the same with details=True dsets = self.one.list_datasets(eid, 'spikes.*', default_revisions_only=True, details=True) self.assertEqual(2, len(dsets)) self.assertTrue(all('#2023-01-01#' in x for x in dsets.rel_path))
[docs] def test_list_collections(self): """Test One.list_collections""" # Test no eid dsets = self.one.list_collections() expected = [ '', 'alf', 'alf/ks2', 'alf/probe00', 'raw_behavior_data', 'raw_ephys_data', 'raw_ephys_data/probe00', 'raw_passive_data', 'raw_video_data' ] self.assertCountEqual(expected, dsets) # Test details for eid dsets = self.one.list_collections('KS005/2019-04-02/001', details=True) self.assertIsInstance(dsets, dict) self.assertTrue(set(dsets.keys()) <= set(expected)) self.assertIsInstance(dsets['alf'], pd.DataFrame) self.assertTrue(dsets['alf'].rel_path.str.startswith('alf').all()) # Test empty self.assertFalse(len(self.one.list_collections('FMR019/2021-03-18/002', details=True))) self.assertFalse(len(self.one.list_collections('FMR019/2021-03-18/002', details=False)))
[docs] def test_list_revisions(self): """Test One.list_revisions""" # No revisions in cache fixture so generate our own revisions_datasets = util.revisions_datasets_table() self.one._cache.datasets = pd.concat([self.one._cache.datasets, revisions_datasets]) eid, _ = revisions_datasets.index[0] # Test no eid dsets = self.one.list_revisions() expected = ['', '2020-01-08', '2021-07-06'] self.assertCountEqual(expected, dsets) # Test details for eid dsets = self.one.list_revisions(eid, details=True) self.assertIsInstance(dsets, dict) self.assertTrue(set(dsets.keys()) <= set(expected)) self.assertIsInstance(dsets['2020-01-08'], pd.DataFrame) self.assertTrue(dsets['2020-01-08'].rel_path.str.contains('#2020-01-08#').all()) # Test dataset filter dsets = self.one.list_revisions(eid, filename='spikes.times.npy', details=True) self.assertTrue(dsets['2020-01-08'].rel_path.str.endswith('spikes.times.npy').all()) # Test collections filter dsets = self.one.list_revisions(eid, collection='alf/probe01', details=True) self.assertTrue(dsets['2020-01-08'].rel_path.str.startswith('alf/probe01').all()) # Test revision filter revisions = self.one.list_revisions(eid, revision=['202[01]*']) self.assertCountEqual(['2020-01-08', '2021-07-06'], revisions) # Test empty self.assertFalse(len(self.one.list_revisions('FMR019/2021-03-18/002', details=True))) self.assertFalse(len(self.one.list_revisions('FMR019/2021-03-18/002', details=False)))
[docs] def test_get_details(self): """Test One.get_details""" eid = 'aaf101c3-2581-450a-8abd-ddb8f557a5ad' det = self.one.get_details(eid) self.assertIsInstance(det, pd.Series) self.assertEqual('KS005', det.subject) self.assertEqual('2019-04-04', str(det.date)) self.assertEqual(4, det.number) # Test details flag det2 = self.one.get_details(eid, full=True) self.assertIsInstance(det2, pd.DataFrame) self.assertTrue('rel_path' in det2.columns) # Test errors with self.assertRaises(alferr.ALFObjectNotFound): self.one.get_details(eid.replace('a', 'b')) sessions = self.one._cache.sessions self.one._cache.sessions = pd.concat([sessions, det.to_frame().T]).sort_index() with self.assertRaises(alferr.ALFMultipleObjectsFound): self.one.get_details(eid)
[docs] def test_check_filesystem(self): """Test for One._check_filesystem. Most is already covered by other tests, this checks that it can deal with dataset frame without eid index and without a session_path column. """ # Get two eids to test eids = self.one._cache['datasets'].index.get_level_values(0)[[0, -1]] datasets = self.one._cache['datasets'].loc[eids] files = self.one._check_filesystem(datasets) self.assertEqual(53, len(files)) # Expect same number of unique session paths as eids session_paths = set(map(lambda p: p.parents[1], files)) self.assertEqual(len(eids), len(session_paths)) expected = map(lambda x: '/'.join(x.parts[-3:]), session_paths) session_parts = self.one._cache['sessions'].loc[eids, ['subject', 'date', 'number']].values self.assertCountEqual(expected, map(lambda x: f'{x[0]}/{x[1]}/{x[2]:03}', session_parts)) # Test a very rare occurence of a missing dataset with eid index missing # but session_path column present idx = self.one._cache.datasets.index[(i := 5)] # pick a random dataset to make vanish _eid2path = { e: self.one.eid2path(e).relative_to(self.one.cache_dir).as_posix() for e in eids } session_paths = list(map(_eid2path.get, datasets.index.get_level_values(0))) datasets['session_path'] = session_paths datasets = datasets.droplevel(0) files[(i := 5)].unlink() # For this check the current state should be exists==True in the main cache assert self.one._cache.datasets.loc[idx, 'exists'].all() _files = self.one._check_filesystem(datasets, update_exists=True) self.assertIsNone(_files[i]) self.assertFalse( self.one._cache.datasets.loc[idx, 'exists'].all(), 'failed to update cache exists') files[i].touch() # restore file for next check # Attempt to load datasets with both eid index # and session_path column missing (most common) datasets = datasets.drop('session_path', axis=1) self.assertEqual(files, self.one._check_filesystem(datasets)) # Test with uuid_filenames as True self.one.uuid_filenames = True try: for file, (uuid, _) in zip(files, datasets.iterrows()): file.rename(file.with_suffix(f'.{uuid}{file.suffix}')) files = self.one._check_filesystem(datasets) self.assertTrue(all(files)) self.assertIn(datasets.index[0], files[0].name) finally: self.one.uuid_filenames = False
[docs] def test_load_dataset(self): """Test One.load_dataset""" eid = 'KS005/2019-04-02/001' # Check download only file = self.one.load_dataset(eid, '_ibl_wheel.position.npy', download_only=True) self.assertIsInstance(file, Path) # Check loading data np.save(str(file), np.arange(3)) # Make sure we have something to load dset = self.one.load_dataset(eid, '_ibl_wheel.position.npy') self.assertTrue(np.all(dset == np.arange(3))) # Check collection filter file = self.one.load_dataset(eid, '_iblrig_leftCamera.timestamps.ssv', download_only=True, collection='raw_video_data') self.assertIsNotNone(file) # Test errors # ID not in cache fake_id = self.one.to_eid(eid).replace('b', 'a') with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_dataset(fake_id, '_iblrig_leftCamera.timestamps.ssv') file.unlink() with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_dataset(eid, '_iblrig_leftCamera.timestamps.ssv') with self.assertRaises(ValueError): self.one.load_dataset(eid, 'alf/_ibl_trials.choice', collection='') # Check loading without extension file = self.one.load_dataset(eid, '_ibl_wheel.position', download_only=True) self.assertTrue(str(file).endswith('wheel.position.npy')) # Check loading with relative path file = self.one.load_dataset(eid, 'alf/_ibl_trials.choice.npy', download_only=True) self.assertIsNotNone(file) with self.assertWarns(UserWarning): self.one.load_dataset(eid, 'alf*/_ibl_trials.choice.npy', download_only=True)
[docs] def test_load_datasets(self): """Test One.load_datasets""" eid = 'KS005/2019-04-02/001' # Check download only dsets = ['_ibl_wheel.position.npy', '_ibl_wheel.timestamps.npy'] files, meta = self.one.load_datasets(eid, dsets, download_only=True, assert_present=False) self.assertIsInstance(files, list) self.assertTrue(all(isinstance(x, Path) for x in files)) # Check loading data and missing dataset dsets = ['_ibl_wheel.position.npy', '_ibl_wheel.timestamps_bpod.npy'] np.save(str(files[0]), np.arange(3)) # Make sure we have something to load data, meta = self.one.load_datasets(eid, dsets, download_only=False, assert_present=False) self.assertEqual(2, len(data)) self.assertEqual(2, len(meta)) self.assertTrue(np.all(data[0] == np.arange(3))) # Check assert_present raises error with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_datasets(eid, dsets, assert_present=True) # Check collection and revision filters dsets = ['_ibl_wheel.position.npy', '_ibl_wheel.timestamps.npy'] files, meta = self.one.load_datasets(eid, dsets, collections='alf', revisions=[None, None], download_only=True, assert_present=False) self.assertTrue(all(files)) files, meta = self.one.load_datasets(eid, dsets, collections=['alf', ''], download_only=True, assert_present=False) self.assertIsNone(files[-1]) # Check validations with self.assertRaises(ValueError): self.one.load_datasets(eid, dsets, collections=['alf', '', 'foo']) with self.assertRaises(ValueError): self.one.load_datasets(eid, [f'alf/{x}' for x in dsets], collections='alf') with self.assertRaises(TypeError): self.one.load_datasets(eid, 'spikes.times') with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_datasets('ff812ca5-ce60-44ac-b07e-66c2c37e98eb', dsets) with self.assertLogs(logging.getLogger('one.api'), 'WARNING'): data, meta = self.one.load_datasets('ff812ca5-ce60-44ac-b07e-66c2c37e98eb', dsets, assert_present=False) self.assertIsNone(data) self.assertEqual(0, len(meta)) self.assertIsNone(self.one.load_datasets(eid, [])[0]) with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_datasets(eid, dsets, collections='none', assert_present=True) # Check behaviour when data are not downloaded for any reason with mock.patch.object(self.one, '_check_filesystem', side_effect=lambda x, **kwargs: [None] * len(x)): with self.assertLogs(logging.getLogger('one.api'), 'WARNING'): self.one.load_datasets(eid, dsets, collections='alf', assert_present=False) with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_datasets(eid, dsets, collections='alf', assert_present=True) # Check loading without extensions # Check download only dsets = ['_ibl_wheel.position.npy', '_ibl_wheel.timestamps'] files, meta = self.one.load_datasets(eid, dsets, download_only=True) self.assertTrue(all(isinstance(x, Path) for x in files)) # Check behaviour when loading with a data frame (undocumented) eid = '01390fcc-4f86-4707-8a3b-4d9309feb0a1' datasets = self.one._cache.datasets.loc[([eid],), :].iloc[:3, :] files, meta = self.one.load_datasets(eid, datasets, download_only=True) self.assertTrue(all(isinstance(x, Path) for x in files)) # Should raise when data frame contains a different eid self.assertRaises(AssertionError, self.one.load_datasets, uuid4(), datasets) # Mix of str and dict # Check download only dsets = [ spec.regex(spec.FILE_SPEC).match('_ibl_wheel.position.npy').groupdict(), '_ibl_wheel.timestamps.npy' ] with self.assertRaises(ValueError): self.one.load_datasets('KS005/2019-04-02/001', dsets, assert_present=False) # Loading of non default revisions without using the revision kwarg causes user warning. # With relative paths provided as input, dataset uniqueness validation is suppressed. eid = self.one._cache.sessions.iloc[0].name datasets = util.revisions_datasets_table( revisions=('', '2020-01-08'), attributes=('times',), touch_path=self.one.eid2path(eid)) datasets['default_revision'] = [False, True] * 3 datasets.index = datasets.index.set_levels([eid], level=0) self.one._cache.datasets = datasets with self.assertWarns(alferr.ALFWarning): self.one.load_datasets(eid, datasets['rel_path'].to_list(), download_only=True) # When loading without collections in the dataset list (i.e. just the dataset names) # an exception should be raised when datasets belong to multiple collections. self.assertRaises( alferr.ALFMultipleCollectionsFound, self.one.load_datasets, eid, ['spikes.times']) # Ensure that when rel paths are passed, a null collection/revision is not interpreted as # an ANY. NB this means the output of 'spikes.times.npy' will be different depending on # weather other datasets in list include a collection or revision. self.one._cache.datasets = datasets.iloc[:2, :].copy() # only two datasets, one default (file,), _ = self.one.load_datasets(eid, ['spikes.times.npy', ], download_only=True) self.assertTrue(file.as_posix().endswith('001/#2020-01-08#/spikes.times.npy')) (file, _), _ = self.one.load_datasets( eid, ['spikes.times.npy', 'xx/obj.attr.ext'], download_only=True, assert_present=False) self.assertTrue(file.as_posix().endswith('001/spikes.times.npy'))
[docs] def test_load_dataset_from_id(self): """Test One.load_dataset_from_id""" uid = np.array([[-9204203870374650458, -6411285612086772563]]) file = self.one.load_dataset_from_id(uid, download_only=True) self.assertIsInstance(file, Path) expected = 'ZFM-01935/2021-02-05/001/alf/probe00/_phy_spikes_subset.waveforms.npy' self.assertTrue(file.as_posix().endswith(expected)) # Details _, details = self.one.load_dataset_from_id(uid, download_only=True, details=True) self.assertIsInstance(details, pd.Series) # Load file content with str id s_id, = parquet.np2str(uid) data = np.arange(3) np.save(str(file), data) # Ensure data to load dset = self.one.load_dataset_from_id(s_id) self.assertTrue(np.array_equal(dset, data)) # Load file content with UUID dset = self.one.load_dataset_from_id(UUID(s_id)) self.assertTrue(np.array_equal(dset, data)) # Test errors # ID not in cache with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_dataset_from_id(s_id.replace('a', 'b')) # File missing file.unlink() with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_dataset_from_id(s_id) # Duplicate ids in cache details.name = ('d3372b15-f696-4279-9be5-98f15783b5bb', s_id) datasets = self.one._cache.datasets self.one._cache.datasets = pd.concat([datasets, details.to_frame().T]).sort_index() with self.assertRaises(alferr.ALFMultipleObjectsFound): self.one.load_dataset_from_id(s_id)
[docs] def test_load_object(self): """Test One.load_object""" eid = 'aaf101c3-2581-450a-8abd-ddb8f557a5ad' files = self.one.load_object(eid, 'wheel', download_only=True) self.assertEqual(len(files), 3) self.assertTrue(all(isinstance(x, Path) for x in files)) # Save some data into the files N = 10 # length of data for f in files: np.save(str(f), np.random.rand(N)) wheel = self.one.load_object(eid, 'wheel') self.assertIsInstance(wheel, dict) self.assertCountEqual(wheel.keys(), ('position', 'velocity', 'timestamps')) self.assertTrue( all(x.size == N for x in wheel.values()) ) # Test errors with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_object(eid, 'spikes') # Test behaviour with missing session with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_object(eid.replace('a', 'b'), 'wheel') # Test missing files on disk [f.unlink() for f in files] with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_object(eid, 'wheel') # Check the three wheel datasets set to exist = False in cache self.assertFalse(self.one._cache.datasets.loc[(eid,), 'exists'].all()) eid = 'ZFM-01935/2021-02-05/001' with self.assertRaises(alferr.ALFMultipleCollectionsFound): self.one.load_object(eid, 'ephysData_g0_t0') with self.assertRaises(alferr.ALFMultipleObjectsFound): self.one.load_object(eid, '*Camera')
[docs] def test_load_collection(self): """Test One.load_collection""" # Check download_only output eid = 'aaf101c3-2581-450a-8abd-ddb8f557a5ad' files = self.one.load_collection(eid, 'alf', download_only=True) self.assertEqual(len(files), 18) self.assertTrue(all(isinstance(x, Path) for x in files)) # Check load alf = self.one.load_collection(eid, 'alf', attribute='*time*') self.assertCountEqual(alf.keys(), ('trials', 'wheel')) self.assertIsInstance(alf, Bunch) self.assertIn('feedback_times', alf.trials) # Check object filter alf = self.one.load_collection(eid, 'alf', object='trials') self.assertNotIn('wheel', alf) # Check errors with self.assertRaises(alferr.ALFError): self.one.load_collection(eid, '') with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_collection(eid, 'alf', object='foo') # Should raise error when no objects found on disk with mock.patch.object(self.one, '_check_filesystem', side_effect=lambda x, **kwargs: [None] * len(x)), \ self.assertRaises(alferr.ALFObjectNotFound) as ex: self.one.load_collection(eid, 'alf') self.assertIn('not found on disk', str(ex))
[docs] def test_load_cache(self): """Test One._load_cache""" # Test loading unsorted table with no id index set df = self.one._cache['datasets'].reset_index() info = self.one._cache['_meta']['raw']['datasets'] with tempfile.TemporaryDirectory() as tdir: # Loading from empty dir # In offline mode, load_cache will not raise, but should warn when no tables found self.assertWarns(UserWarning, self.one.load_cache, tdir) self.assertTrue(self.one._cache['_meta']['expired']) # Save unindexed parquet.save(Path(tdir) / 'datasets.pqt', df, info) del self.one._cache['datasets'] self.one.load_cache(tdir) self.assertIsInstance(self.one._cache['datasets'].index, pd.MultiIndex) # Save shuffled id_keys = ['eid', 'id'] df[id_keys] = np.random.permutation(df[id_keys]) assert not df.set_index(id_keys).index.is_monotonic_increasing parquet.save(Path(tdir) / 'datasets.pqt', df, info) del self.one._cache['datasets'] self.one.load_cache(tdir) self.assertTrue(self.one._cache['datasets'].index.is_monotonic_increasing) # Save a parasitic table that will not be loaded pd.DataFrame().to_parquet(Path(tdir).joinpath('gnagna.pqt')) with self.assertLogs(logging.getLogger('one.api'), logging.WARNING) as log: self.one.load_cache(tdir) self.assertIn('gnagna.pqt', log.output[0]) # Save table with missing id columns df.drop(id_keys, axis=1, inplace=True) parquet.save(Path(tdir) / 'datasets.pqt', df, info) with self.assertRaises(KeyError): self.one.load_cache(tdir)
[docs] def test_refresh_cache(self): """Test One.refresh_cache""" self.one._cache.datasets = self.one._cache.datasets.iloc[0:0].copy() prev_loaded = self.one._cache['_meta']['loaded_time'] for mode in ('auto', 'local', 'remote'): with self.subTest("Refresh modes", mode=mode): loaded = self.one.refresh_cache(mode) self.assertFalse(len(self.one._cache.datasets)) self.assertEqual(prev_loaded, loaded) loaded = self.one.refresh_cache('refresh') self.assertTrue(len(self.one._cache.datasets)) self.assertTrue(loaded > prev_loaded) self.one.cache_expiry = datetime.timedelta() # Immediately expire self.one._cache.datasets = self.one._cache.datasets.iloc[0:0].copy() self.one.refresh_cache('auto') self.assertTrue(len(self.one._cache.datasets)) with self.assertRaises(ValueError): self.one.refresh_cache('double')
[docs] def test_save_cache(self): """Test one.util.save_cache""" self.one._cache['_meta'].pop('modified_time', None) # Should be no cache save as it's not been modified with tempfile.TemporaryDirectory() as tdir: self.one._save_cache(save_dir=tdir) self.assertFalse(any(Path(tdir).glob('*'))) # Should save two tables self.one._cache['_meta']['modified_time'] = datetime.datetime.now() self.one._save_cache(save_dir=tdir) self.assertEqual(2, len(list(Path(tdir).glob('*.pqt')))) # Load with One and check modified time is preserved raw_modified = One(cache_dir=tdir)._cache['_meta']['raw']['datasets']['date_modified'] expected = self.one._cache['_meta']['modified_time'].strftime('%Y-%m-%d %H:%M') self.assertEqual(raw_modified, expected) # Test file lock t_now = time.time() with mock.patch('one.api.time') as time_mock: lock_file = Path(self.one.cache_dir).joinpath('.cache.lock') lock_file.touch() # We expect the process to sleep at first, then after skipping time, # the stale lock file should be removed. time_mock.time.side_effect = (t_now, t_now + 30) self.one._save_cache(save_dir=tdir, force=True) # force flag ignores modified time self.assertFalse(lock_file.exists(), 'failed to remove stale lock file') time_mock.sleep.assert_called()
[docs] def test_reset_cache(self): """Test One._reset_cache method, namely that cache types are correct.""" # Assert cache dtypes are indeed what are expected self.one._reset_cache() self.assertCountEqual(['datasets', 'sessions', '_meta'], self.one._cache.keys()) self.assertTrue(self.one._cache.datasets.empty) self.assertCountEqual(EMPTY_SESSIONS_FRAME.columns, self.one._cache.sessions.columns) self.assertTrue(self.one._cache.sessions.empty) self.assertCountEqual(EMPTY_DATASETS_FRAME.columns, self.one._cache.datasets.columns) # Check sessions data frame types sessions_types = EMPTY_SESSIONS_FRAME.reset_index().dtypes.to_dict() s_types = self.one._cache.sessions.reset_index().dtypes.to_dict() self.assertDictEqual(sessions_types, s_types) # Check datasets data frame types datasets_types = EMPTY_DATASETS_FRAME.reset_index().dtypes.to_dict() d_types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.assertDictEqual(datasets_types, d_types)
[docs] def test_update_cache_from_records(self): """Test One._update_cache_from_records""" sessions_types = self.one._cache.sessions.reset_index().dtypes.to_dict() datasets_types = self.one._cache.datasets.reset_index().dtypes.to_dict() # Update with single record (pandas.Series), one exists, one doesn't session = self.one._cache.sessions.iloc[0].squeeze() session.name = str(uuid4()) # New record dataset = self.one._cache.datasets.iloc[0].squeeze() dataset['exists'] = not dataset['exists'] self.one._update_cache_from_records(sessions=session, datasets=dataset) self.assertTrue(session.name in self.one._cache.sessions.index) updated, = dataset['exists'] == self.one._cache.datasets.loc[dataset.name, 'exists'] self.assertTrue(updated) # Check that the updated data frame has kept its original dtypes types = self.one._cache.sessions.reset_index().dtypes.to_dict() self.assertDictEqual(sessions_types, types) types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.assertDictEqual(datasets_types, types) # Update a number of records datasets = self.one._cache.datasets.iloc[:3].copy() datasets.loc[:, 'exists'] = ~datasets.loc[:, 'exists'] # Make one of the datasets a new record idx = datasets.index.values idx[-1] = (idx[-1][0], str(uuid4())) datasets.index = pd.MultiIndex.from_tuples(idx, names=('eid', 'id')) self.one._update_cache_from_records(datasets=datasets) self.assertTrue(idx[-1] in self.one._cache.datasets.index) verifiable = self.one._cache.datasets.loc[datasets.index.values, 'exists'] self.assertTrue(np.all(verifiable == datasets.loc[:, 'exists'])) # Check that the updated data frame has kept its original dtypes types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.assertDictEqual(datasets_types, types) # Check behaviour when columns don't match datasets.loc[:, 'exists'] = ~datasets.loc[:, 'exists'] datasets['extra_column'] = True self.one._cache.datasets['foo_bar'] = 12 # this column is missing in our new records self.one._cache.datasets['new_column'] = False expected_datasets_types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.addCleanup(self.one._cache.datasets.drop, 'foo_bar', axis=1, inplace=True) # An exception is exists_* as the Alyx cache contains exists_aws and exists_flatiron # These should simply be filled with the values of exists as Alyx won't return datasets # that don't exist on FlatIron and if they don't exist on AWS it falls back to this. self.one._cache.datasets['exists_aws'] = False with self.assertRaises(AssertionError): self.one._update_cache_from_records(datasets=datasets, strict=True) self.one._update_cache_from_records(datasets=datasets) verifiable = self.one._cache.datasets.loc[datasets.index.values, 'exists'] self.assertTrue(np.all(verifiable == datasets.loc[:, 'exists'])) self.one._update_cache_from_records(datasets=datasets) verifiable = self.one._cache.datasets.loc[datasets.index.values, 'exists_aws'] self.assertTrue(np.all(verifiable == datasets.loc[:, 'exists'])) # If the extra column does not start with 'exists' it should be set to NaN verifiable = self.one._cache.datasets.loc[datasets.index.values, 'foo_bar'] self.assertTrue(np.isnan(verifiable).all()) # Check that the missing columns were updated to nullable fields expected_datasets_types.update( foo_bar=pd.Int64Dtype(), exists_aws=pd.BooleanDtype(), new_column=pd.BooleanDtype()) types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.assertDictEqual(expected_datasets_types, types) # Check fringe cases with self.assertRaises(KeyError): self.one._update_cache_from_records(unknown=datasets) self.assertIsNone(self.one._update_cache_from_records(datasets=None)) # Absent cache table self.one.load_cache(tables_dir='/foo') sessions_types = self.one._cache.sessions.reset_index().dtypes.to_dict() datasets_types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.one._update_cache_from_records(sessions=session, datasets=dataset) self.assertTrue(all(self.one._cache.sessions == pd.DataFrame([session]))) self.assertEqual(1, len(self.one._cache.datasets)) self.assertEqual(self.one._cache.datasets.squeeze().name, dataset.name) self.assertCountEqual(self.one._cache.datasets.squeeze().to_dict(), dataset.to_dict()) types = self.one._cache.datasets.reset_index().dtypes.to_dict() self.assertDictEqual(datasets_types, types)
[docs] def test_save_loaded_ids(self): """Test One.save_loaded_ids and logic within One._check_filesystem""" self.one.record_loaded = True # Turn on saving UUIDs self.one._cache.pop('_loaded_datasets', None) # Ensure we start with a clean slate eid = 'd3372b15-f696-4279-9be5-98f15783b5bb' files = self.one.load_object(eid, 'trials', download_only=True) # Check datasets added to list self.assertIn('_loaded_datasets', self.one._cache) loaded = self.one._cache['_loaded_datasets'] self.assertEqual(len(files), len(loaded)) # Ensure all are from the same session eids = self.one._cache.datasets.loc[(slice(None), loaded), ].index.get_level_values(0) self.assertTrue(np.all(eids == eid)) # Test loading a dataset that doesn't exist dset = self.one.list_datasets(eid, filename='*trials*', details=True).iloc[-1] dset['rel_path'] = dset['rel_path'].replace('.npy', '.pqt') dset.name = (eid, str(uuid4())) old_cache = self.one._cache['datasets'] try: datasets = [self.one._cache.datasets, dset.to_frame().T] datasets = pd.concat(datasets).astype(old_cache.dtypes).sort_index() datasets.index.set_names(('eid', 'id'), inplace=True) self.one._cache['datasets'] = datasets dsets = [dset['rel_path'], '_ibl_trials.feedback_times.npy'] new_files, rec = self.one.load_datasets(eid, dsets, assert_present=False) loaded = self.one._cache['_loaded_datasets'] # One dataset is already in the list (test for duplicates) and other doesn't exist self.assertEqual(len(files), len(loaded), 'No new UUIDs should have been added') self.assertEqual(len(loaded), len(np.unique(loaded))) self.assertNotIn(dset.name[1], loaded) # Wasn't loaded as doesn't exist on disk finally: self.one._cache['datasets'] = old_cache # Test saving the loaded datasets list ids, filename = self.one.save_loaded_ids(clear_list=False) self.assertTrue(loaded is ids) self.assertTrue(filename.exists()) # Load from file ids = pd.read_csv(filename) self.assertCountEqual(ids['dataset_uuid'], loaded) self.assertTrue(self.one._cache['_loaded_datasets'].size, 'List unexpectedly cleared') # Test as session UUIDs ids, filename = self.one.save_loaded_ids(sessions_only=True, clear_list=False) self.assertCountEqual([eid], ids) self.assertEqual(pd.read_csv(filename)['session_uuid'][0], eid) # Check dataset int IDs and clear list ids, _ = self.one.save_loaded_ids(clear_list=True) self.assertCountEqual(loaded, ids) self.assertFalse(self.one._cache['_loaded_datasets'].size, 'List not cleared') # Test clear list and warn on empty with self.assertWarns(Warning): ids, filename = self.one.save_loaded_ids() self.assertEqual(ids, []) self.assertIsNone(filename)
[docs] def test_remove_cache_table_files(self): """Test One._remove_cache_table_files method.""" root = self.one._tables_dir for name in ('cache_info.json', 'foo.pqt'): root.joinpath(name).touch() removed = self.one._remove_cache_table_files() expected = ['sessions.pqt', 'datasets.pqt', 'cache_info.json'] self.assertCountEqual(expected, [str(x.relative_to(root)) for x in removed]) with self.assertLogs('one.alf.cache', 30) as cm: removed = self.one._remove_cache_table_files(tables=('foo',)) self.assertEqual([root / 'foo.pqt'], removed) self.assertIn('cache_info.json not found', cm.records[0].message)
[docs] @unittest.skipIf(OFFLINE_ONLY, 'online only test') class TestOneAlyx(unittest.TestCase): """ This could be an offline test. Would need to add /docs REST cache fixture. """ tempdir = None one = None
[docs] @classmethod def setUpClass(cls) -> None: cls.tempdir = util.set_up_env() with mock.patch('one.params.iopar.getfile', new=partial(util.get_file, cls.tempdir.name)): # util.setup_test_params(token=True) cls.one = OneAlyx( **TEST_DB_1, cache_dir=cls.tempdir.name, mode='local' )
[docs] def tearDown(self) -> None: self.one.mode = 'local'
[docs] def test_type2datasets(self): """Test OneAlyx.type2datasets""" self.one.mode = 'remote' eid = 'cf264653-2deb-44cb-aa84-89b82507028a' # when the dataset is at the root, there shouldn't be the separator dsets = self.one.type2datasets(eid, 'eye.blink') self.assertCountEqual(dsets, ['eye.blink.npy']) # test multiples datasets with collections eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7' dtypes = ['trials.feedback_times', '_iblrig_codeFiles.raw'] dsets = self.one.type2datasets(eid, dtypes) expected = ['alf/_ibl_trials.feedback_times.npy', 'raw_behavior_data/_iblrig_codeFiles.raw.zip'] self.assertCountEqual(dsets, expected) # this returns a DataFrame dsets = self.one.type2datasets(eid, dtypes, details=True) self.assertIsInstance(dsets, pd.DataFrame) # check validation with self.assertRaises(TypeError): self.one.type2datasets(eid, 14)
[docs] def test_dataset2type(self): """Test for OneAlyx.dataset2type""" # Test dataset ID did = np.array([[-1058666951852871669, -6012002505064768322]]) # Check assertion in local mode with self.assertRaises(AssertionError): self.one.dataset2type(did) self.one.mode = 'remote' dset_type = self.one.dataset2type(did) self.assertEqual('wheelMoves.peakAmplitude', dset_type) # Check with tuple int dset_type = self.one.dataset2type(tuple(did.squeeze().tolist())) self.assertEqual('wheelMoves.peakAmplitude', dset_type) # Check with str id did, = parquet.np2str(did) dset_type = self.one.dataset2type(did) self.assertEqual('wheelMoves.peakAmplitude', dset_type) dset_type = self.one.dataset2type('_ibl_wheelMoves.peakAmplitude.npy') self.assertEqual('wheelMoves.peakAmplitude', dset_type) bad_id = np.array([[-1058666951852871669, -6012002505064768312]]) with self.assertRaises(ValueError): self.one.dataset2type(bad_id)
[docs] def test_pid2eid(self): """Test OneAlyx.pid2eid""" pid = 'b529f2d8-cdae-4d59-aba2-cbd1b5572e36' eid, collection = self.one.pid2eid(pid, query_type='remote') self.assertEqual('fc737f3c-2a57-4165-9763-905413e7e341', eid) self.assertEqual('probe00', collection) with self.assertRaises(NotImplementedError): self.one.pid2eid(pid, query_type='local')
[docs] @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) def test_describe_revision(self, mock_stdout): """Test OneAlyx.describe_revision""" self.one.mode = 'remote' # Choose a date in the past so as to not conflict with registration tests record = { 'name': str(datetime.date.today() - datetime.timedelta(days=5)) + 'a', 'description': 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.' } try: self.one.alyx.rest('revisions', 'read', id=record['name'], no_cache=True) except HTTPError: self.one.alyx.rest('revisions', 'create', data=record) self.one.describe_revision(record['name']) self.assertEqual(record['description'], mock_stdout.getvalue().strip()) self.one.describe_revision('foobar') self.assertTrue('not found' in mock_stdout.getvalue()) # Check full kwarg full = self.one.describe_revision(record['name'], full=True) self.assertIsInstance(full, dict) # Check raises non-404 error err = HTTPError() err.response = Bunch({'status_code': 500}) with mock.patch.object(self.one.alyx, 'get', side_effect=err), \ self.assertRaises(HTTPError): self.one.describe_revision(record['name'])
[docs] @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) def test_describe_dataset(self, mock_stdout): """Test OneAlyx.describe_dataset. NB This could be offline: REST responses in fixtures. """ self.one.mode = 'remote' # Test all datasets dset_types = self.one.describe_dataset() self.assertEqual(7, len(dset_types)) self.assertEqual('unknown', dset_types[0]['name']) # Test dataset type out = self.one.describe_dataset('wheel.velocity') expected = 'Signed velocity of wheel' self.assertTrue(expected in mock_stdout.getvalue()) self.assertEqual(expected, out['description']) # Test dataset name expected = 'amplitude of the wheel move' out = self.one.describe_dataset('_ibl_wheelMoves.peakAmplitude.npy') self.assertTrue(expected in mock_stdout.getvalue()) self.assertEqual(expected, out['description']) # Test unknown dataset name with self.assertRaises(ValueError): self.one.describe_dataset('_ibl_foo.bar.baz')
[docs] def test_url_from_path(self): """Test OneAlyx.path2url""" file = Path(self.tempdir.name).joinpath('cortexlab', 'Subjects', 'KS005', '2019-04-04', '004', 'alf', '_ibl_wheel.position.npy') url = self.one.path2url(file) self.assertTrue(url.startswith(self.one.alyx._par.HTTP_DATA_SERVER)) self.assertTrue('91546fc6-b67c-4a69-badc-5e66088519c4' in url) # Check remote mode url = self.one.path2url(file, query_type='remote') # Local data server path ends with '/public'; remote path does not self.assertTrue(url.startswith(self.one.alyx._par.HTTP_DATA_SERVER.rsplit('/', 1)[0])) file = file.parent / '_fake_obj.attr.npy' self.assertIsNone(self.one.path2url(file)) # Check remote mode FIXME Different behaviour between remote and local modes with self.assertRaises(alferr.ALFObjectNotFound): self.one.path2url(file, query_type='remote')
[docs] def test_url_from_record(self): """Test ConversionMixin.record2url""" idx = (slice(None), '91546fc6-b67c-4a69-badc-5e66088519c4') dataset = self.one._cache['datasets'].loc[idx, :] url = self.one.record2url(dataset.squeeze()) expected = ('https://ibl.flatironinstitute.org/public/' 'cortexlab/Subjects/KS005/2019-04-04/004/alf/' '_ibl_wheel.position.91546fc6-b67c-4a69-badc-5e66088519c4.npy') self.assertEqual(expected, url)
[docs] def test_load_cache(self): """Test loading the remote cache""" self.one.alyx.silent = False # For checking log self.one._cache._meta['expired'] = True try: with self.assertLogs(logging.getLogger('one.api'), logging.INFO) as lg: with mock.patch.object(self.one.alyx, 'get', side_effect=HTTPError): self.one.load_cache(clobber=True) self.assertEqual('remote', self.one.mode) self.assertRegex(lg.output[0], 'cache over .+ old') self.assertTrue('Failed to load' in lg.output[1]) with mock.patch.object(self.one.alyx, 'get', side_effect=ConnectionError): self.one.load_cache(clobber=True) self.assertEqual('local', self.one.mode) self.assertTrue('Failed to connect' in lg.output[-1]) # In online mode, the error cause should suggest re-running setup with mock.patch.object(self.one.alyx, 'get', side_effect=FileNotFoundError), \ self.assertRaises(FileNotFoundError) as cm: self.one.load_cache(clobber=True) self.assertIn('run ONE.setup', str(cm.exception.__cause__)) cache_info = {'min_api_version': '200.0.0'} # Check version verification with mock.patch.object(self.one.alyx, 'get', return_value=cache_info), \ self.assertWarns(UserWarning): self.one.load_cache(clobber=True) # Check cache tags raw_meta = self.one._cache._meta['raw'] raw_meta['sessions']['database_tags'] = 'Q3-2020-TAG' raw_meta['datasets']['database_tags'] = 'ANOTHER_TAG' cache_info['database_tags'] = ['ANOTHER_TAG'] with mock.patch.object(self.one.alyx, 'get', return_value=cache_info): self.assertRaises(NotImplementedError, self.one.load_cache) # Check for warning when origins are mixed raw_meta['sessions']['origin'] = raw_meta['datasets']['origin'] = 'alyx' cache_info['origin'] = 'public' cache_info['min_api_version'] = __version__ cache_info['date_created'] = datetime.datetime.now().isoformat() raw_meta['datasets']['database_tags'] = 'Q3-2020-TAG' files = list(self.one.cache_dir.glob('*.pqt')) with mock.patch.object(self.one.alyx, 'download_cache_tables', return_value=files), \ mock.patch.object(self.one.alyx, 'get', return_value=cache_info), \ self.assertWarns(UserWarning, msg='another origin'): self.one.load_cache(tag='ANOTHER_TAG') self.assertTrue(str(self.one._tables_dir).endswith('ANOTHER_TAG')) self.assertTrue(self.one._tables_dir.exists(), 'failed to create tag dir') # Check table_dir behaviour prev_loc = self.one._tables_dir # should be same location as previous with mock.patch.object(self.one.alyx, 'download_cache_tables', return_value=files): self.one.load_cache(clobber=True) self.assertEqual(prev_loc, self.one._tables_dir) new_loc = prev_loc.parent # user input should override default with mock.patch.object(self.one.alyx, 'download_cache_tables', return_value=files): self.one.load_cache(tables_dir=new_loc, clobber=True) self.assertEqual(new_loc, self.one._tables_dir) finally: # Restore properties self.one.mode = 'auto' self.one.alyx.silent = True
[docs] def test_check_filesystem(self): """Test for One._check_filesystem. Most is already covered by other tests, this just checks that it can deal with dataset dicts as input. """ eid = 'd3372b15-f696-4279-9be5-98f15783b5bb' dataset_type = 'probes.description' dsets = self.one.alyx.rest('datasets', 'list', session=eid, dataset_type=dataset_type) # Create file on disk file_ = self.one.eid2path(eid).joinpath('alf', 'probes.description.json') file_.parent.mkdir(parents=True) file_.touch() # Test method file, = self.one._check_filesystem(dsets) self.assertIsNotNone(file)
[docs] @mock.patch('boto3.Session') def test_download_aws(self, boto3_mock): """ Tests for the OneAlyx._download_aws method""" N = 5 dsets = self.one._cache['datasets'].iloc[:N].copy() dsets['exists_aws'] = True self.one.mode = 'auto' # Can't download in local mode # Return a file size so progress bar callback hook functions file_object = mock.MagicMock() file_object.content_length = 1024 boto3_mock().resource().Object.return_value = file_object # Mock _download_dataset for safety: method should not be called with mock.patch.object(self.one, '_download_dataset') as fallback_method: out_paths = self.one._download_datasets(dsets) fallback_method.assert_not_called() self.assertEqual(len(out_paths), N, 'Returned list should equal length of input datasets') self.assertTrue(all(isinstance(x, Path) for x in out_paths)) # These values come from REST cache fixture boto3_mock.assert_called_with(aws_access_key_id='ABCDEF', aws_secret_access_key='shhh', region_name=None) ((bucket, path), _), *_ = boto3_mock().resource().Object.call_args_list self.assertEqual(bucket, 's3_bucket') self.assertTrue(dsets['rel_path'][0].split('.')[0] in path) self.assertTrue(dsets.index[0][1] in path, 'Dataset UUID not in filepath') # Should fall back to usual method if any datasets do not exist on AWS dsets['exists_aws'] = False with mock.patch.object(self.one, '_download_dataset') as fallback_method: self.one._download_datasets(dsets) fallback_method.assert_called()
[docs] def test_list_aggregates(self): """Test OneAlyx.list_aggregates""" # Test listing by relation datasets = self.one.list_aggregates('subjects') self.assertTrue(all(datasets['rel_path'].str.startswith('aggregates/Subjects'))) self.assertTrue(self.one.list_aggregates('foobar').empty) # Test filtering with an identifier datasets = self.one.list_aggregates('subjects', 'ZM_1085') expected = 'aggregates/Subjects/mainenlab/ZM_1085' self.assertTrue(all(datasets['rel_path'].str.startswith(expected))) self.assertTrue(self.one.list_aggregates('subjects', 'foobar').empty) # Test that additional parts of data path are correctly removed # specifically /public in FI openalyx file rec mock_ds = [ {'url': '3ef042c6-82a4-426c-9aa9-be3b48d86652', 'session': None, 'file_size': None, 'hash': '', 'file_records': [{'data_url': 'https://ibl-brain-wide-map-public.s3.amazonaws.com/' 'aggregates/Subjects/cortexlab/KS051/' 'trials.table.3ef042c6-82a4-426c-9aa9-be3b48d86652.pqt', 'data_repository': 'aws_aggregates', 'exists': True}], 'default_dataset': True, 'qc': 'NOT_SET'}, {'url': '7bdb08d6-b166-43d8-8981-816cf616d291', 'session': None, 'file_size': None, 'hash': '', 'file_records': [{'data_url': 'https://ibl.flatironinstitute.org/' 'public/aggregates/Subjects/mrsicflogellab/IBL_005/' 'trials.table.7bdb08d6-b166-43d8-8981-816cf616d291.pqt', 'data_repository': 'flatiron_aggregates', 'exists': True}], 'default_dataset': True, 'qc': 'NOT_SET'}, ] with mock.patch.object(self.one.alyx, 'rest', return_value=mock_ds): dsets = self.one.list_aggregates('subjects') self.assertEqual(len(dsets), 2) # Should handle null file_size values correctly self.assertIsInstance(dsets.file_size.dtype, pd.UInt64Dtype) self.assertTrue(dsets.file_size.isna().all())
[docs] def test_load_aggregate(self): """Test OneAlyx.load_aggregate""" # Test object not found on disk assert self.one.offline with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_aggregate('subjects', 'ZM_1085', '_ibl_subjectTraining.table.pqt') # Touch a file to ensure that we do not try downloading expected = self.one.cache_dir.joinpath( 'aggregates/Subjects/mainenlab/ZM_1085/_ibl_subjectTraining.table.pqt') expected.parent.mkdir(parents=True), expected.touch() # Test loading with different input dataset formats datasets = ['_ibl_subjectTraining.table.pqt', '_ibl_subjectTraining.table', {'object': 'subjectTraining', 'attribute': 'table'}] for dset in datasets: with self.subTest(dataset=dset): file = self.one.load_aggregate('subjects', 'ZM_1085', dset, download_only=True) self.assertEqual(expected, file) # Test object not found with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_aggregate('subjects', 'ZM_1085', 'foo.bar')
[docs] @classmethod def tearDownClass(cls) -> None: cls.tempdir.cleanup()
[docs] @unittest.skipIf(OFFLINE_ONLY, 'online only test') class TestOneRemote(unittest.TestCase): """Test remote queries using OpenAlyx"""
[docs] def setUp(self) -> None: self.one = OneAlyx(**TEST_DB_2, mode='auto') self.eid = '4ecb5d24-f5cc-402c-be28-9d0f7cb14b3a' self.pid = 'da8dfec1-d265-44e8-84ce-6ae9c109b8bd' # Set cache directory to a temp dir to ensure that we re-download files self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) self.one.alyx._par = self.one.alyx._par.set('CACHE_DIR', Path(self.tempdir.name))
[docs] def test_online_repr(self): """Tests OneAlyx.__repr__""" self.assertTrue('online' in str(self.one)) self.assertTrue(TEST_DB_2['base_url'] in str(self.one))
[docs] def test_list_datasets(self): """Test OneAlyx.list_datasets""" # Test list for eid # Ensure remote by making local datasets table empty self.addCleanup(self.one.load_cache) self.one._cache['datasets'] = self.one._cache['datasets'].iloc[0:0].copy() dsets = self.one.list_datasets(self.eid, details=True, query_type='remote') self.assertEqual(183, len(dsets)) # this may change after a BWM release or patch self.assertEqual(1, dsets.index.nlevels, 'details data frame should be without eid index') # Test keep_eid_index dsets = self.one.list_datasets( self.eid, details=True, query_type='remote', keep_eid_index=True) self.assertEqual(2, dsets.index.nlevels, 'details data frame should be with eid index') # Test missing eid dsets = self.one.list_datasets('FMR019/2021-03-18/008', details=True, query_type='remote') self.assertIsInstance(dsets, pd.DataFrame) self.assertEqual(len(dsets), 0) # Test empty datasets with mock.patch('one.api.ses2records', return_value=(pd.DataFrame(), pd.DataFrame())): dsets = self.one.list_datasets(self.eid, details=True, query_type='remote') self.assertIsInstance(dsets, pd.DataFrame) self.assertEqual(len(dsets), 0) # Test details=False, with eid dsets = self.one.list_datasets(self.eid, details=False, query_type='remote') self.assertIsInstance(dsets, list) self.assertEqual(183, len(dsets)) # this may change after a BWM release or patch # Test with other filters dsets = self.one.list_datasets(self.eid, collection='*probe*', filename='*channels*', details=False, query_type='remote') self.assertEqual(24, len(dsets)) self.assertTrue(all(x in y for x in ('probe', 'channels') for y in dsets)) with self.assertWarns(Warning): self.one.list_datasets(query_type='remote')
[docs] def test_search_insertions(self): """Test OneAlyx.search_insertion method in remote mode.""" # Test search on subject pids = self.one.search_insertions(subject='SWC_043', query_type='remote') self.assertIn(self.pid, list(pids)) # Test search on session with details pids, det = self.one.search_insertions(session=self.eid, query_type='remote', details=True) self.assertIn(self.pid, list(pids)) correct = len(det) == len(pids) and 'id' in det[0] and det[0]['id'] == self.pid self.assertTrue(correct) # Test search on session and insertion name pids = self.one.search_insertions(session=self.eid, name='probe00', query_type='remote') self.assertEqual(pids[0], self.pid) # Test search with acronym # Should work in auto mode but with debug message with self.assertLogs('one.api', 10): pids = self.one.search_insertions(atlas_acronym='STR', query_type='auto') self.assertIn(self.pid, list(pids)) # Expect this list of acronyms to return nothing pids = self.one.search_insertions(atlas_acronym=['STR', 'CA3'], query_type='remote') self.assertEqual(0, len(pids)) # Test 'special' params (these have different names on the REST side) # - full 'laboratory' word # - 'dataset' as singular with fuzzy match # - 'number' -> 'experiment_number' lab = 'cortexlab' _, det = self.one.search_insertions( laboratory=lab, number=1, dataset='_ibl_log.info', atlas_acronym='STR', query_type='remote', details=True) self.assertEqual(14, len(det)) self.assertEqual({lab}, {x['session_info']['lab'] for x in det}) # Test mode and field validation self.assertRaises(NotImplementedError, self.one.search_insertions, query_type='local') self.assertRaises(TypeError, self.one.search_insertions, dataset=['wheel.times'], query_type='remote')
[docs] def test_search_terms(self): """Test OneAlyx.search_terms.""" assert self.one.mode != 'remote' search1 = self.one.search_terms() self.assertIn('dataset', search1) search2 = self.one.search_terms(endpoint='sessions') self.assertEqual(search1, search2) search3 = self.one.search_terms(query_type='remote', endpoint='sessions') self.assertIn('django', search3) search4 = self.one.search_terms(endpoint='insertions') self.assertIsNone(search4) search5 = self.one.search_terms(query_type='remote', endpoint='insertions') self.assertIn('model', search5)
[docs] def test_load_dataset(self): """Test OneAlyx.load_dataset.""" file = self.one.load_dataset(self.eid, '_spikeglx_sync.channels.npy', collection='raw_ephys_data', query_type='remote', download_only=True) self.assertIsInstance(file, Path) self.assertTrue(file.as_posix().endswith('raw_ephys_data/_spikeglx_sync.channels.npy')) # Test validations with self.assertRaises(alferr.ALFMultipleCollectionsFound): self.one.load_dataset(self.eid, 'spikes.clusters', query_type='remote') with self.assertRaises(alferr.ALFMultipleObjectsFound): self.one.load_dataset(self.eid, '_iblrig_*Camera.raw', query_type='remote') with self.assertRaises(alferr.ALFObjectNotFound): self.one.load_dataset(self.eid, '_iblrig_encoderEvents.raw.ssv', collection='alf', query_type='remote')
[docs] def test_load_object(self): """Test OneAlyx.load_object.""" files = self.one.load_object(self.eid, 'wheel', collection='alf', query_type='remote', download_only=True) self.assertIsInstance(files[0], Path) self.assertTrue( files[0].as_posix().endswith('SWC_043/2020-09-21/001/alf/_ibl_wheel.position.npy') )
[docs] def test_get_details(self): """Test OneAlyx.get_details.""" det = self.one.get_details(self.eid, query_type='remote') self.assertIsInstance(det, dict) self.assertEqual('SWC_043', det['subject']) self.assertEqual('ibl_neuropixel_brainwide_01', det['projects']) self.assertEqual('2020-09-21', str(det['date'])) self.assertEqual(1, det['number']) self.assertNotIn('data_dataset_session_related', det) # Test list det = self.one.get_details([self.eid, self.eid], full=True) self.assertIsInstance(det, list) self.assertIn('data_dataset_session_related', det[0])
[docs] @unittest.skipIf(OFFLINE_ONLY, 'online only test') class TestOneDownload(unittest.TestCase): """Test downloading datasets using OpenAlyx.""" tempdir = None one = None
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.patch = mock.patch('one.params.iopar.getfile', new=partial(util.get_file, self.tempdir.name)) self.patch.start() self.one = OneAlyx(**TEST_DB_2, cache_dir=self.tempdir.name, mode='auto') self.fid = '6f175a7a-e20b-4622-81fc-08947a4fd1d3' # File record of wiring.json self.eid = 'aad23144-0e52-4eac-80c5-c4ee2decb198' self.did = 'd693fbf9-2f90-4123-839e-41474c44742d'
[docs] def test_download_datasets(self): """Test OneAlyx._download_dataset, _download_file and _dset2url.""" det = self.one.get_details(self.eid, True) rec = next(x for x in det['data_dataset_session_related'] if x['id'] == self.did) # FIXME hack because data_url may be AWS rec['data_url'] = self.one.alyx.rel_path2url(get_alf_path(rec['data_url'])) file = self.one._download_dataset(rec) self.assertIsInstance(file, Path) self.assertTrue(file.exists()) url = rec['data_url'] file = self.one._download_dataset(url) self.assertIsNotNone(file) rec = self.one.alyx.get(rec['url']) file = self.one._download_dataset(rec) self.assertIsNotNone(file) # Check behaviour when hash mismatch self.one.alyx.silent = False # So we can check for warning file_hash = rec['hash'].replace('a', 'd') # Check three things: # 1. The mismatch should be logged at the debug level # 2. As we don't have permission to update this db we should see a failure warning with self.assertLogs(logging.getLogger('one.api'), logging.DEBUG), \ self.assertWarns(Warning, msg=f'files/{self.fid}'): self.one._download_dataset(rec, hash=file_hash) self.one.alyx.silent = True # Remove console clutter # Check JSON field added # 3. The files endpoint should be called with a 'mismatch_hash' json key fr = [{'url': f'files/{self.fid}', 'json': None}] with mock.patch.object(self.one.alyx, '_generic_request', return_value=fr) as patched: self.one._download_dataset(rec, hash=file_hash) args, kwargs = patched.call_args self.assertEqual(kwargs.get('data', {}), {'json': {'mismatch_hash': True}}) # Check keep_uuid kwarg # FIXME Another hack: for this to work the file records order must be correct. fi = next(i for i, x in enumerate(rec['file_records']) if x['data_url'].startswith(self.one.alyx._par.HTTP_DATA_SERVER)) if fi != 0: rec['file_records'] = [rec['file_records'].pop(fi), *rec['file_records']] file = self.one._download_dataset(rec, keep_uuid=True) self.assertEqual(file.stem.split('.')[-1], rec['url'].split('/')[-1]) # Check list input recs = [rec, sorted(det['data_dataset_session_related'], key=lambda x: x['file_size'])[1]] files = self.one._download_dataset(recs) self.assertIsInstance(files, list) self.assertTrue(all(isinstance(x, Path) for x in files)) # Check Series input r_ = datasets2records(rec).squeeze() file = self.one._download_dataset(r_) self.assertIn('imec0.wiring.json', file.name) # Check behaviour when URL invalid did = rec['url'].split('/')[-1] self.assertTrue(self.one._cache.datasets.loc[(slice(None), did), 'exists'].all()) for fr in rec['file_records']: fr['data_url'] = None file = self.one._download_dataset(rec) self.assertIsNone(file) self.assertFalse(self.one._cache.datasets.loc[(slice(None), did), 'exists'].all()) # With multiple dsets files = self.one._download_dataset([rec, rec]) self.assertTrue(all(x is None for x in files)) self.one._cache.datasets.loc[(slice(None), did), 'exists'] = True # Reset values # Check with invalid path path = self.one.cache_dir.joinpath('lab', 'Subjects', 'subj', '2020-01-01', '001', 'spikes.times.npy') with self.assertLogs(logging.getLogger('one.api'), logging.WARNING): file = self.one._download_dataset(path) self.assertIsNone(file) # Check data frame record rec = self.one.list_datasets(self.eid, details=True) rec = rec[rec.rel_path.str.contains('00/_spikeglx_ephysData_g0_t0.imec0.wiring')] rec.loc[self.did, 'exist_aws'] = False # Ensure we use FlatIron for this rec = pd.concat({str(self.eid): rec}, names=['eid']) # Add back eid index files = self.one._download_datasets(rec) self.assertFalse(None in files) # Check behaviour when dataset missing with mock.patch.object(self.one, 'record2url', return_value=None): self.assertIsNone(self.one._download_dataset(rec.squeeze()))
[docs] def test_download_aws(self): """Test for OneAlyx._download_aws method.""" # Test download datasets via AWS dsets = self.one.list_datasets( self.eid, filename='*wiring.json', collection='raw_ephys_data/probe??', details=True) dsets = pd.concat({str(self.eid): dsets}, names=['eid']) assert len(dsets) == 2 file = self.one.eid2path(self.eid) / dsets['rel_path'].values[0] with mock.patch('one.remote.aws.get_s3_from_alyx', return_value=(None, None)), \ mock.patch('one.remote.aws.s3_download_file', return_value=file) as method: self.one._download_datasets(dsets) self.assertEqual(len(dsets), method.call_count) # Check output filename _, local = method.call_args.args self.assertTrue(local.as_posix().startswith(self.one.cache_dir.as_posix())) self.assertTrue(local.as_posix().endswith(dsets.iloc[-1]['rel_path'])) # Check keep_uuid = True self.one._download_datasets(dsets, keep_uuid=True) _, local = method.call_args.args self.assertIn(dsets.iloc[-1].name[1], local.name) # Test behaviour when dataset not remotely accessible dsets = dsets[:1].copy() rec = self.one.alyx.rest('datasets', 'read', id=dsets.index[0][1]) # need to find the index of matching aws repo, this is not constant across releases iaws = list(map(lambda x: x['data_repository'].startswith('aws'), rec['file_records'])).index(True) rec['file_records'][iaws]['exists'] = False # Set AWS file record to non-existent self.one._cache.datasets['exists_aws'] = True # Only changes column if exists with mock.patch('one.remote.aws.get_s3_from_alyx', return_value=(None, None)), \ mock.patch.object(self.one.alyx, 'rest', return_value=[rec]), \ self.assertLogs('one.api', logging.DEBUG) as log: # should still download file via fallback method self.assertEqual([file], self.one._download_datasets(dsets)) self.assertRegex(log.output[1], 'Updating exists field') datasets = self.one._cache['datasets'] self.assertFalse( datasets.loc[pd.IndexSlice[:, dsets.index[0][1]], 'exists_aws'].any() ) # Check falls back to HTTP when error raised with mock.patch('one.remote.aws.get_s3_from_alyx', side_effect=RuntimeError), \ mock.patch.object(self.one, '_download_dataset') as mock_method: self.one._download_datasets(dsets) mock_method.assert_called_with(dsets, keep_uuid=False) # Test type check (download_aws only works with data frames) with mock.patch.object(self.one, '_download_dataset') as mock_method: self.one._download_datasets(dsets.to_dict('records')) mock_method.assert_called()
[docs] def test_tag_mismatched_file_record(self): """Test for OneAlyx._tag_mismatched_file_record. This method is also tested in test_download_datasets. """ did = '4a1500c2-60f3-418f-afa2-c752bb1890f0' url = f'https://example.com/channels.brainLocationIds_ccf_2017.{did}.npy' data = [{'json': {'mismatch_hash': False}, 'url': f'https://example.com/files/{did}'}] with mock.patch.object(self.one.alyx, 'rest', return_value=data) as mk: self.one._tag_mismatched_file_record(url) data[0]['json']['mismatch_hash'] = True mk.assert_called_with('files', 'partial_update', id=did, data={'json': data[0]['json']})
[docs] def tearDown(self) -> None: try: # In case we did indeed have remote REST permissions, try resetting the json field self.one.alyx.rest('files', 'partial_update', id=self.fid, data={'json': None}) except HTTPError as ex: if ex.errno != 403: raise ex self.patch.stop() self.tempdir.cleanup()
[docs] class TestOneSetup(unittest.TestCase): """Test parameter setup upon ONE instantiation and calling setup methods."""
[docs] def setUp(self) -> None: self.tempdir = tempfile.TemporaryDirectory() self.addCleanup(self.tempdir.cleanup) self.get_file = partial(util.get_file, self.tempdir.name) # Change default cache dir to temporary directory patch = mock.patch('one.params.CACHE_DIR_DEFAULT', Path(self.tempdir.name)) patch.start() self.addCleanup(patch.stop)
[docs] def test_local_cache_setup_prompt(self): """Test One.setup.""" path = Path(self.tempdir.name).joinpath('subject', '2020-01-01', '1', 'spikes.times.npy') path.parent.mkdir(parents=True) path.touch() with mock.patch('builtins.input', return_value=self.tempdir.name): one_obj = One.setup() self.assertCountEqual(one_obj.list_datasets(), ['spikes.times.npy']) # Check prompts warns about cache existing path.parent.joinpath('spikes.clusters.npy').touch() one_obj = One.setup(cache_dir=self.tempdir.name, silent=True) self.assertEqual(1, len(one_obj.list_datasets())) with mock.patch('builtins.input', side_effect=['n', 'y']): one_obj = One.setup(cache_dir=self.tempdir.name) # Reply no to cache overwrite self.assertEqual(1, len(one_obj.list_datasets())) one_obj = One.setup(cache_dir=self.tempdir.name) # Reply yes to cache overwrite self.assertEqual(2, len(one_obj.list_datasets())) # Test with no prompt one_obj = One.setup(cache_dir=self.tempdir.name, silent=True) assert len(one_obj.list_datasets()) == 2
[docs] def test_local_tables_only(self): """Test first time instantiation of ONE with only tables_dir arg. Previously this would have raised and AttributeError as cache_map.DEFAULT doesn't exist on a new install. """ # Expect warning as table_dir empty with mock.patch('iblutil.io.params.getfile', new=self.get_file), \ self.assertWarns(UserWarning): # Should instantiate without error one_obj = One(tables_dir=Path(self.tempdir.name, 'tables')) self.assertEqual(self.tempdir.name, str(one_obj.cache_dir)) self.assertEqual('tables', one_obj._tables_dir.name)
[docs] def test_setup_silent(self): """Test setting up parameters with silent flag. - Mock getfile to return temp dir as param file location - Mock input function as fail safe in case function erroneously prompts user for input """ with mock.patch('iblutil.io.params.getfile', new=self.get_file), \ mock.patch('one.params.input', new=self.assertFalse): one_obj = ONE(silent=True, mode='local', password=TEST_DB_2['password']) self.assertEqual(one_obj.alyx.base_url, one.params.default().ALYX_URL) # Check param files were saved self.assertEqual(len(list(Path(self.tempdir.name).rglob('.caches'))), 1) client_pars = Path(self.tempdir.name).rglob(f'.{one_obj.alyx.base_url.split("/")[-1]}') self.assertEqual(len(list(client_pars)), 1) with mock.patch('iblutil.io.params.getfile', new=self.get_file): # Check uses defaults on second instantiation one_obj = ONE(mode='local') url = one.params.default().ALYX_URL self.assertEqual(url, one_obj.alyx.base_url) # With the default database in silent mode the defaults should be used one.params.save(one_obj.alyx._par.set('ALYX_LOGIN', 'foobar'), url) one.params.setup(url, silent=True) one_obj = ONE(mode='local') self.assertEqual(one.params.default().ALYX_LOGIN, one_obj.alyx._par.ALYX_LOGIN) # Check saves base_url arg with self.subTest('Test setup with base URL'): if OFFLINE_ONLY: self.skipTest('Requires remote db connection') with mock.patch('iblutil.io.params.getfile', new=self.get_file): one_obj = ONE(**TEST_DB_1) self.assertEqual(one_obj.alyx.base_url, TEST_DB_1['base_url']) params_url = one.params.get(client=TEST_DB_1['base_url']).ALYX_URL self.assertEqual(params_url, one_obj.alyx.base_url) # With non-default database in silent mode the previous pars should be used one.params.setup(params_url, silent=True) user = one.params.get(client=params_url).ALYX_LOGIN self.assertEqual(TEST_DB_1['username'], user)
[docs] def test_setup_username(self): """Test setting up parameters with a provided username. - Mock getfile to return temp dir as param file location - Mock input function as fail safe in case function erroneously prompts user for input - Mock requests.post returns a fake user authentication response """ credentials = {'username': 'foobar', 'password': '123'} with mock.patch('iblutil.io.params.getfile', new=self.get_file), \ mock.patch('one.params.input', new=self.assertFalse), \ mock.patch('one.webclient.requests.post') as req_mock: req_mock().json.return_value = {'token': 'shhh'} # In remote mode the cache endpoint will not be queried one_obj = ONE(base_url='https://test.alyx.internationalbrainlab.org', silent=True, mode='remote', **credentials) params_username = one.params.get(client=TEST_DB_1['base_url']).ALYX_LOGIN self.assertEqual(params_username, one_obj.alyx.user) self.assertEqual(credentials['username'], one_obj.alyx.user) self.assertEqual(req_mock.call_args.kwargs.get('data', {}), credentials) # Reinstantiate as a different user one_obj = ONE(base_url='https://test.alyx.internationalbrainlab.org', username='baz', password='123', mode='remote') self.assertEqual(one_obj.alyx.user, 'baz') self.assertEqual(one_obj.alyx.user, one_obj.alyx._par.ALYX_LOGIN) # After initial set up the username in the pars file should remain unchanged params_username = one.params.get(client=TEST_DB_1['base_url']).ALYX_LOGIN self.assertNotEqual(one_obj.alyx.user, params_username)
[docs] @unittest.skipIf(OFFLINE_ONLY, 'online only test') def test_static_setup(self): """Test OneAlyx.setup.""" with mock.patch('iblutil.io.params.getfile', new=self.get_file), \ mock.patch('one.webclient.getpass', return_value='international'): one_obj = OneAlyx.setup(silent=True) self.assertEqual(one_obj.alyx.base_url, one.params.default().ALYX_URL)
[docs] def test_setup(self): """Test one.params.setup.""" url = TEST_DB_1['base_url'] def mock_input(prompt): if prompt.lower().startswith('warning'): if not getattr(mock_input, 'conflict_warn', False): # Checks both responses mock_input.conflict_warn = True return 'y' return 'n' elif 'download cache' in prompt.lower(): return Path(self.tempdir.name).joinpath('downloads').as_posix() elif 'url' in prompt.lower(): return url else: return 'mock_input' one.params.input = mock_input one.params.getpass = lambda prompt: 'mock_pwd' one.params.print = lambda text: 'mock_print' # Mock getfile function to return a path to non-existent file instead of usual one pars with mock.patch('iblutil.io.params.getfile', new=self.get_file): one_obj = OneAlyx(mode='local', username=TEST_DB_1['username'], password=TEST_DB_1['password']) pars = one.params.get(url) self.assertFalse('ALYX_PWD' in pars.as_dict()) self.assertEqual(one_obj.alyx._par.ALYX_URL, url) client = f'.{one_obj.alyx.base_url.split("/")[-1]}'.replace(':', '_') client_pars = Path(self.tempdir.name).rglob(client) self.assertEqual(len(list(client_pars)), 1) # Save ALYX_PWD into params and see if setup modifies it with mock.patch('iblutil.io.params.getfile', new=self.get_file): one.params.save(pars.set('ALYX_PWD', 'foobar'), url) one.params.setup(url) self.assertEqual(one.params.get(url).ALYX_PWD, 'mock_pwd') # Check conflict warning with mock.patch('iblutil.io.params.getfile', new=self.get_file): OneAlyx(mode='local', base_url=TEST_DB_2['base_url'], username=TEST_DB_2['username'], password=TEST_DB_2['password']) self.assertTrue(getattr(mock_input, 'conflict_warn', False))
[docs] def test_patch_params(self): """Test patching legacy params to the new location.""" # Save some old-style params old_pars = one.params.default().set('HTTP_DATA_SERVER', 'openalyx.org') # Save a REST query in the old location old_rest = Path(self.tempdir.name, '.one', '.rest', old_pars.ALYX_URL[8:], 'https') old_rest.mkdir(parents=True, exist_ok=True) old_rest.joinpath('1baff95c2d0e31059720a3716ad5b5a34b61a207').touch() with mock.patch('iblutil.io.params.getfile', new=self.get_file): one.params.setup(silent=True) one.params.save(old_pars, old_pars.ALYX_URL) one_obj = ONE(base_url=old_pars.ALYX_URL, mode='local') self.assertEqual(one_obj.alyx._par.HTTP_DATA_SERVER, one.params.default().HTTP_DATA_SERVER) self.assertFalse(Path(self.tempdir.name, '.rest').exists()) self.assertTrue(any(one_obj.alyx.cache_dir.joinpath('.rest').glob('*')))
[docs] def test_one_factory(self): """Tests the ONE class factory.""" with mock.patch('iblutil.io.params.getfile', new=self.get_file), \ mock.patch('one.params.input', new=self.assertFalse): # Cache dir not in client cache map; use One (light) one_obj = ONE(cache_dir=self.tempdir.name) self.assertIsInstance(one_obj, One) # The offline param was given, raise deprecation warning (via log) with self.assertWarns(FutureWarning): ONE(offline=True, cache_dir=self.tempdir.name) # Test setup with virtual ONE method assert ONE.cache_info().currsize > 0 ONE.setup(silent=True, make_default=True) self.assertFalse(ONE.cache_info().currsize, 'failed to reset LRU cache') with self.subTest('ONE setup with database URL'): if OFFLINE_ONLY: self.skipTest('Requires remote db connection') # No cache dir provided; use OneAlyx (silent setup mode) one_obj = ONE(silent=True, mode='local', password=TEST_DB_2['password']) self.assertIsInstance(one_obj, OneAlyx) # The cache dir is in client cache map; use OneAlyx one_obj = ONE(cache_dir=one_obj.alyx.cache_dir, mode='local') self.assertIsInstance(one_obj, OneAlyx) # A db URL was provided; use OneAlyx # mode = 'local' ensures we don't download cache (could also set cache_dir) one_obj = ONE(**TEST_DB_1, mode='local') self.assertIsInstance(one_obj, OneAlyx)
[docs] class TestOneMisc(unittest.TestCase): """Test functions in one.util."""
[docs] def test_validate_date_range(self): """Test one.util.validate_date_range.""" # Single string date actual = validate_date_range('2020-01-01') # On this day expected = (pd.Timestamp('2020-01-01 00:00:00'), pd.Timestamp('2020-01-01 23:59:59.999000')) self.assertEqual(actual, expected) # Single datetime.date object actual = validate_date_range(pd.Timestamp('2020-01-01 00:00:00').date()) self.assertEqual(actual, expected) # Single pandas Timestamp actual = validate_date_range(pd.Timestamp(2020, 1, 1)) self.assertEqual(actual, expected) # Array of two datetime64 actual = validate_date_range(np.array(['2022-01-30', '2022-01-30'], dtype='datetime64[D]')) expected = (pd.Timestamp('2022-01-30 00:00:00'), pd.Timestamp('2022-01-30 00:00:00')) self.assertEqual(actual, expected) # From date (lower bound) actual = validate_date_range(['2020-01-01']) # from date self.assertEqual(actual[0], pd.Timestamp('2020-01-01 00:00:00')) dt = actual[1] - pd.Timestamp.now() self.assertTrue(dt.days > 10 * 365) actual = validate_date_range(['2020-01-01', None]) # from date self.assertEqual(actual[0], pd.Timestamp('2020-01-01 00:00:00')) dt = actual[1] - pd.Timestamp.now() self.assertTrue(dt.days > 10 * 365) # Upper bound at least 60 years in the future # To date (upper bound) actual = validate_date_range([None, '2020-01-01']) # up to date self.assertEqual(actual[1], pd.Timestamp('2020-01-01 00:00:00')) dt = pd.Timestamp.now().date().year - actual[0].date().year self.assertTrue(dt > 60) # Lower bound at least 60 years in the past self.assertIsNone(validate_date_range(None)) with self.assertRaises(ValueError): validate_date_range(['2020-01-01', '2019-09-06', '2021-10-04'])
[docs] def test_index_last_before(self): """Test one.util.index_last_before.""" revisions = ['2021-01-01', '2020-08-01', '', '2020-09-30'] verifiable = index_last_before(revisions, '2021-01-01') self.assertEqual(0, verifiable) verifiable = index_last_before(revisions, '2020-09-15') self.assertEqual(1, verifiable) verifiable = index_last_before(revisions, '') self.assertEqual(2, verifiable) self.assertIsNone(index_last_before([], '2009-01-01')) verifiable = index_last_before(revisions, None) self.assertEqual(0, verifiable, 'should return most recent')
[docs] def test_collection_spec(self): """Test one.util._collection_spec.""" # Test every combination of input inputs = [] _collection = {None: '({collection}/)?', '': '', '-': '{collection}/'} _revision = {None: '(#{revision}#/)?', '': '', '-': '#{revision}#/'} combs = combinations_with_replacement((None, '', '-'), 2) [inputs.extend(set(permutations(x))) for x in combs] for collection, revision in inputs: with self.subTest(collection=collection, revision=revision): verifiable = _collection_spec(collection, revision) expected = _collection[collection] + _revision[revision] self.assertEqual(expected, verifiable)
[docs] def test_revision_last_before(self): """Test one.util.filter_revision_last_before.""" datasets = util.revisions_datasets_table() df = datasets[datasets.rel_path.str.startswith('alf/probe00')].copy() verifiable = filter_revision_last_before(df, revision='2020-09-01', assert_unique=False) self.assertTrue(len(verifiable) == 2) # Remove one of the datasets' revisions to test assert unique on mixed revisions df_mixed = df.drop((df['revision'] == '2020-01-08').idxmax()) with self.assertRaises(alferr.ALFMultipleRevisionsFound): filter_revision_last_before(df_mixed, revision='2020-09-01', assert_consistent=True) # Test with default revisions df['default_revision'] = False with self.assertWarnsRegex(alferr.ALFWarning, 'No default revision for dataset'): verifiable = filter_revision_last_before(df.copy(), assert_unique=False) self.assertTrue(len(verifiable) == 2) # Should have fallen back on lexicographical ordering self.assertTrue(verifiable.rel_path.str.contains('#2021-07-06#').all()) with self.assertRaises(alferr.ALFError): filter_revision_last_before(df.copy(), assert_unique=True) # Add unique default revisions df.iloc[[0, 4], -1] = True # Should log mixed revisions with self.assertWarnsRegex(alferr.ALFWarning, 'Multiple revisions'): verifiable = filter_revision_last_before(df.copy(), assert_unique=True) self.assertEqual(2, len(verifiable)) self.assertCountEqual(verifiable['rel_path'], df['rel_path'].iloc[[0, 4]]) # Add multiple default revisions df['default_revision'] = True with self.assertRaises(alferr.ALFMultipleRevisionsFound): filter_revision_last_before(df.copy(), assert_unique=True)
[docs] def test_parse_id(self): """Test one.util.parse_id.""" obj = unittest.mock.MagicMock() # Mock object to decorate obj.to_eid.return_value = 'parsed_id' # Method to be called input = 'subj/date/num' # Input id to pass to `to_eid` parse_id(obj.method)(obj, input) obj.to_eid.assert_called_with(input) obj.method.assert_called_with(obj, 'parsed_id') # Test raises value error when None returned obj.to_eid.return_value = None # Simulate failure to parse id with self.assertRaises(ValueError): parse_id(obj.method)(obj, input)
[docs] def test_autocomplete(self): """Test one.util.autocomplete.""" search_terms = ('subject', 'date_range', 'dataset', 'dataset_type') self.assertEqual('subject', autocomplete('Subj', search_terms)) self.assertEqual('dataset', autocomplete('dataset', search_terms)) with self.assertRaises(ValueError): autocomplete('dtypes', search_terms) with self.assertRaises(ValueError): autocomplete('dat', search_terms)
[docs] def test_LazyID(self): """Test one.util.LazyID.""" uuids = [ 'c1a2758d-3ce5-4fa7-8d96-6b960f029fa9', '0780da08-a12b-452a-b936-ebc576aa7670', 'ff812ca5-ce60-44ac-b07e-66c2c37e98eb' ] ses = [{'url': f'https://website.org/foo/{x}'} for x in uuids] ez = LazyId(ses) self.assertEqual(len(uuids), len(ez)) self.assertCountEqual(map(str, ez), uuids) self.assertEqual(ez[0], uuids[0]) self.assertEqual(ez[0:2], uuids[0:2]) ez = LazyId([{'id': x} for x in uuids]) self.assertCountEqual(map(str, ez), uuids)
[docs] def test_ensure_list(self): """Test one.util.ensure_list. This function has now moved therefore we simply check for deprecation warning. """ with self.assertWarns(DeprecationWarning): self.assertEqual(['123'], ensure_list('123')) self.assertIs(x := ['123'], ensure_list(x))