Source code for one.tests.remote.test_globus

import logging
import tempfile
import unittest
from unittest import mock
from pathlib import Path, PurePosixPath, PureWindowsPath
import shutil
from functools import partial
from tempfile import TemporaryDirectory
from datetime import datetime
import io
import sys
import uuid

try:
    import globus_sdk
    import globus_sdk.services.auth
except ModuleNotFoundError:
    raise unittest.skip('globus_sdk module not installed')
from iblutil.io import params as iopar

from one.tests.util import get_file, setup_rest_cache
from one.tests import TEST_DB_1
from one.webclient import AlyxClient
from one.remote import globus

ENDPOINT_ID = uuid.uuid1()


[docs] class TestGlobus(unittest.TestCase): """Tests for the one.remote.globus module.""" """unittest.mock._patch: Mock object for setting parameter location as temporary directory.""" path_mock = None """tempfile.TemporaryDirectory: The temporary location of remote parameters file.""" tempdir = None
[docs] @classmethod def setUpClass(cls) -> None: cls.tempdir = TemporaryDirectory() cls.path_mock = mock.patch('one.remote.base.iopar.getfile', new=partial(get_file, cls.tempdir.name))
[docs] def setUp(self) -> None: self.path_mock.start()
[docs] @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) def test_setup(self, _): """Tests for one.remote.globus._setup function.""" local_id = str(ENDPOINT_ID) gc_id = str(uuid.uuid4()) # Check behaviour when no parameters file exists, local endpoint ID found ans = ('', gc_id, '', 'new_path/to/thing', 'c') with mock.patch('builtins.input', side_effect=ans), \ mock.patch('one.remote.globus.get_local_endpoint_id', return_value=local_id): globus._setup(login=False) p = globus.load_client_params('globus.default').as_dict() expected = { 'GLOBUS_CLIENT_ID': gc_id, 'local_endpoint': local_id, 'local_path': 'new_path/to/thing' } self.assertDictEqual(p, expected) # Set up again with globus login ans = ('', '', '', '', 'abc') d = dict(refresh_token=1, access_token=2, expires_at_seconds=3) with mock.patch('builtins.input', side_effect=ans), \ mock.patch('one.remote.globus.globus_sdk.NativeAppAuthClient') as client, \ mock.patch('one.remote.globus.get_local_endpoint_id'): ( client(). oauth2_exchange_code_for_tokens(). by_resource_server. __getitem__ ).return_value = d globus._setup() p = globus.load_client_params('globus.default').as_dict() expected.update(d) self.assertDictEqual(p, expected) # Check for input validations # 1. New profile and no Globus ID inputted with mock.patch('builtins.input', side_effect=['']), \ self.assertRaises(ValueError) as ex: globus._setup(par_id='foo') self.assertIn('Globus client ID', str(ex)) # 2. New profile, Globus ID invalid with mock.patch('builtins.input', side_effect=['bar', '123']), \ self.assertRaises(ValueError) as ex: globus._setup(par_id='foo') self.assertIn('Invalid Globus client ID', str(ex)) # 3. New profile, no local endpoint ID found and none inputted with mock.patch('builtins.input', side_effect=['foo', gc_id, '']), \ mock.patch('one.remote.globus.get_local_endpoint_id', side_effect=AssertionError), \ self.assertRaises(ValueError) as ex, self.assertWarns(Warning): globus._setup() self.assertIn('local endpoint ID', str(ex))
[docs] def test_as_globus_path(self): """Tests for one.remote.globus.as_globus_path.""" # A Windows path # "/E/FlatIron/integration" # Only test this on windows if sys.platform == 'win32': actual = globus.as_globus_path('/foo/bar') self.assertEqual(actual, f'/{Path.cwd().drive[0]}/foo/bar') # On all systems an explicit Windows path should be converted to a POSIX one actual = globus.as_globus_path(PureWindowsPath('E:\\FlatIron\\integration')) self.assertTrue(actual.startswith('/E/')) # On all systems an explicit POSIX path should be left unchanged actual = globus.as_globus_path(PurePosixPath('E:\\FlatIron\\integration')) self.assertEqual(actual, 'E:\\FlatIron\\integration') # A valid globus path should be unchanged path = '/mnt/ibl' actual = globus.as_globus_path(PurePosixPath(path)) self.assertEqual(actual, path) path = '/E/FlatIron/integration' actual = globus.as_globus_path(path) self.assertEqual(actual, path)
[docs] def test_get_local_endpoint_id(self): """Test for one.remote.globus.get_local_endpoint_id function.""" def _check_path(x): self.assertTrue(str(x).endswith('client-id.txt')) return True # Function should check for path existence with self.assertRaises(AssertionError), \ mock.patch.object(Path, 'exists', return_value=False): globus.get_local_endpoint_id() # Function should look for 'client-id.txt' file and return contents with mock.patch.object(Path, 'exists', _check_path), \ mock.patch.object(Path, 'read_text', return_value=f' {ENDPOINT_ID} '): x = globus.get_local_endpoint_id() self.assertEqual(ENDPOINT_ID, x)
[docs] def test_get_local_endpoint_paths(self): """Tests for one.remote.globus.get_local_endpoint_paths function.""" with mock.patch('one.remote.globus.sys.platform', 'win32'): self.assertEqual([], globus.get_local_endpoint_paths()) with mock.patch('one.remote.globus.sys.platform', 'linux'), \ mock.patch.object(Path, 'exists', return_value=False), self.assertWarns(Warning): self.assertEqual([], globus.get_local_endpoint_paths()) expected = [Path('path', 'one'), Path('path', 'two')] with mock.patch('one.remote.globus.sys.platform', 'linux'), \ mock.patch.object(Path, 'exists', return_value=True), \ mock.patch.object(Path, 'read_text', return_value='path/one,path/two '): self.assertCountEqual(expected, globus.get_local_endpoint_paths())
[docs] def test_get_lab_from_endpoint_id(self): """Tests for one.remote.globus.get_lab_from_endpoint_id function.""" # Set up REST cache fixtures ac = AlyxClient(**TEST_DB_1) setup_rest_cache(ac.cache_dir) endpoint_id = '2dc8ccc6-2f8e-11e9-9351-0e3d676669f4' name = globus.get_lab_from_endpoint_id(endpoint_id, ac)[0] self.assertEqual(name, 'mainenlab') # Check behaviour when unknown UUID with mock.patch.object(ac, 'rest', return_value=[]): self.assertIsNone(globus.get_lab_from_endpoint_id('123', ac)) # Check behaviour when multiple labs returned with mock.patch.object(ac, 'rest', return_value=[{'name': 'lab_A'}, {'name': 'lab_B'}]): self.assertEqual(len(globus.get_lab_from_endpoint_id('123', ac)), 2) # Check behaviour when no input ID returned with mock.patch('one.remote.globus.get_local_endpoint_id', return_value=endpoint_id): name = globus.get_lab_from_endpoint_id(alyx=ac)[0] self.assertEqual(name, 'mainenlab')
[docs] @mock.patch('one.remote.globus.globus_sdk') def test_create_globus_client(self, globus_mock): """Tests for one.remote.globus.create_globus_client function.""" # Check setup run when no params exist, check raises exception when missing params gc_id = str(uuid.uuid4()) incomplete_pars = iopar.from_dict({'GLOBUS_CLIENT_ID': gc_id}) with mock.patch('one.remote.globus._setup') as setup_mock, \ self.assertRaises(ValueError), \ mock.patch('one.remote.base.load_client_params', side_effect=[AssertionError, incomplete_pars]): globus.create_globus_client() setup_mock.assert_called() # Check behaviour with complete params pars = iopar.from_dict({'GLOBUS_CLIENT_ID': gc_id, 'refresh_token': 456}) with mock.patch('one.remote.globus.load_client_params', return_value=pars) as par_mock: client = globus.create_globus_client('admin') par_mock.assert_called_once_with('globus.admin') globus_mock.NativeAppAuthClient.assert_called_once_with(gc_id) globus_mock.RefreshTokenAuthorizer.assert_called() self.assertEqual(client, globus_mock.TransferClient()) # Check without refresh tokens pars = pars.set('refresh_token', None).set('access_token', 456) globus_mock.RefreshTokenAuthorizer.reset_mock() with mock.patch('one.remote.globus.load_client_params', return_value=pars) as par_mock: client = globus.create_globus_client('admin') par_mock.assert_called_once_with('globus.admin') globus_mock.AccessTokenAuthorizer.assert_called_once_with(456) globus_mock.RefreshTokenAuthorizer.assert_not_called() self.assertEqual(client, globus_mock.TransferClient())
[docs] def test_remove_token_fields(self): """Test for one.remote.globus._remove_token_fields function.""" par = iopar.from_dict({ 'local_path': 'foo', 'GLOBUS_CLIENT_ID': ENDPOINT_ID, 'refresh_token': None, 'access_token': str(uuid.uuid4()), 'expires_at_seconds': 12345678}) newpar = globus._remove_token_fields(par) self.assertTrue(hasattr(newpar, '_fields')) self.assertEqual(newpar._fields, ('local_path', 'GLOBUS_CLIENT_ID')) # Check works with a dict newpar = globus._remove_token_fields(par.as_dict()) self.assertTrue(hasattr(newpar, '_fields')) self.assertEqual(newpar._fields, ('local_path', 'GLOBUS_CLIENT_ID')) self.assertIsNone(globus._remove_token_fields(None))
[docs] def test_get_token(self): """Test for one.remote.globus.get_token function.""" auth_code = 'a1b2c3d4e5f6g7h8' # Test without refresh tokens with mock.patch('builtins.input', return_value=auth_code), \ mock.patch('one.remote.globus.globus_sdk.NativeAppAuthClient') as client: token = globus.get_token(str(ENDPOINT_ID), refresh_tokens=False) client().oauth2_start_flow.assert_called_with(refresh_tokens=False) client().oauth2_exchange_code_for_tokens.assert_called_with('a1b2c3d4e5f6g7h8') self.assertIsInstance(token, dict) expected = ('refresh_token', 'access_token', 'expires_at_seconds') self.assertCountEqual(expected, token.keys()) # Test with refresh tokens with mock.patch('builtins.input', return_value=auth_code), \ mock.patch('one.remote.globus.globus_sdk.NativeAppAuthClient') as client: token = globus.get_token(str(ENDPOINT_ID), refresh_tokens=True) client().oauth2_start_flow.assert_called_with(refresh_tokens=True) # Test cancel with mock.patch('builtins.input', return_value='c '), \ mock.patch('one.remote.globus.globus_sdk.NativeAppAuthClient') as client: token = globus.get_token(str(ENDPOINT_ID), refresh_tokens=True) client().oauth2_exchange_code_for_tokens.assert_not_called() self.assertCountEqual(token.keys(), expected) self.assertFalse(any(token.values()))
[docs] def tearDown(self) -> None: par_path = Path(iopar.getfile('.one')) assert str(par_path).startswith(self.tempdir.name) if par_path.exists(): shutil.rmtree(par_path) self.path_mock.stop()
[docs] @classmethod def tearDownClass(cls) -> None: cls.tempdir.cleanup()
class _GlobusClientTest(unittest.TestCase): """Globus Client test setup routines.""" """unittest.mock._patch: Mock object for globus_sdk package.""" globus_sdk_mock = None @mock.patch('one.remote.globus._setup') def setUp(self, _) -> None: self.tempdir = tempfile.TemporaryDirectory() # The github CI root dir contains an alias/symlink so we must resolve it self.root_path = Path(self.tempdir.name).resolve() self.addCleanup(self.tempdir.cleanup) self. pars = iopar.from_dict({ 'GLOBUS_CLIENT_ID': '123', 'refresh_token': '456', 'local_endpoint': str(ENDPOINT_ID), 'local_path': str(self.root_path), 'access_token': 'abc', 'expires_at_seconds': datetime.now().timestamp() + 60**2 }) self.globus_sdk_mock = mock.patch('one.remote.globus.globus_sdk') self.globus_sdk_mock.start() self.addCleanup(self.globus_sdk_mock.stop) with mock.patch('one.remote.globus.load_client_params', return_value=self.pars): self.globus = globus.Globus()
[docs] class TestGlobusClient(_GlobusClientTest): """Tests for the GlobusClient class."""
[docs] def test_constructor(self): """Test for Globus.__init__ method.""" # self.assertEqual(self.client.client, self.globus_sdk_mock.TransferClient()) expected = {'local': {'id': ENDPOINT_ID, 'root_path': str(self.root_path)}} self.assertDictEqual(self.globus.endpoints, expected)
[docs] def test_setup(self): """Test for Globus.setup static method. TestGlobus.test_setup tests the setup function. Here we just check it's called. """ with mock.patch('one.remote.globus._setup') as setup_mock, \ mock.patch('one.remote.globus.create_globus_client'), \ mock.patch('one.remote.globus.load_client_params', return_value=self.pars): self.assertIsInstance(globus.Globus.setup(), globus.Globus) setup_mock.assert_called_once()
[docs] def test_to_address(self): """Test for Globus.to_address method.""" # Check with Windows path root_path = PureWindowsPath(r'C:\root\path') uid = uuid.uuid1() # Check works with endpoint label self.globus.add_endpoint(uid, label='data-repo', root_path=root_path) addr = self.globus.to_address('path/to/file', 'data-repo') self.assertEqual(addr, '/C/root/path/path/to/file') # Check works with UUID addr = self.globus.to_address('foo/bar', str(uid)) self.assertEqual(addr, '/C/root/path/foo/bar')
[docs] def test_add_endpoint(self): """Test for Globus.add_endpoint method.""" # Test with UUID # 1. Should raise exception when label not defined endpoint_id = '2dc8ccc6-2f8e-11e9-9351-0e3d676669f4' with self.assertRaises(ValueError): self.globus.add_endpoint(endpoint_id) # 2. Should add UUID to endpoints along with root path name = 'lab1' self.globus.add_endpoint(endpoint_id, label=name, root_path='/mnt') self.assertIn(name, self.globus.endpoints) expected = {'id': uuid.UUID(endpoint_id), 'root_path': '/mnt'} self.assertDictEqual(self.globus.endpoints[name], expected) # Test with Alyx repo name # Set up REST cache fixtures ac = AlyxClient(**TEST_DB_1) setup_rest_cache(ac.cache_dir) name = 'mainenlab' self.globus.add_endpoint(name, alyx=ac) self.assertIn(name, self.globus.endpoints) expected = { 'id': uuid.UUID('0b6f5a7c-a7a9-11e8-96fa-0a6d4e044368'), 'root_path': '/mnt/globus/mainenlab/Subjects' } self.assertDictEqual(self.globus.endpoints[name], expected) # Test behaviour when label exists with self.assertLogs(logging.getLogger('one.remote.globus'), logging.ERROR): self.globus.add_endpoint(name, root_path='/', alyx=ac) self.assertNotEqual(self.globus.endpoints[name]['root_path'], '/') self.globus.add_endpoint(name, root_path='/', overwrite=True, alyx=ac) self.assertEqual(self.globus.endpoints[name]['root_path'], '/')
[docs] def test_fetch_endpoints_from_alyx(self): """Test for Globus.fetch_endpoints_from_alyx method.""" alyx = AlyxClient(**TEST_DB_1) uid = uuid.uuid1() repos = [{'name': 'foo', 'globus_endpoint_id': '', 'globus_path': '/some/path'}, {'name': 'bar', 'globus_endpoint_id': str(uid), 'globus_path': '/foo/path'}] with mock.patch.object(alyx, 'rest', return_value=repos): added = self.globus.fetch_endpoints_from_alyx(alyx) # Repos without an endpoint ID should have been ignored self.assertEqual(added, {'bar': {'id': uid, 'root_path': '/foo/path'}}) self.assertIn('bar', self.globus.endpoints)
[docs] def test_endpoint_path(self): """Test for Globus._endpoint_path method.""" expected = PurePosixPath('/mnt/foo/bar') self.assertEqual(str(expected), self.globus._endpoint_path(expected)) expected = '/foo/bar/baz' self.assertEqual(expected, self.globus._endpoint_path('bar/baz', root_path='/foo')) with self.assertRaises(ValueError): self.globus._endpoint_path('bar', root_path='foo')
[docs] def test_endpoint_id_root(self): """Test for Globus._endpoint_id_root method.""" id, path = self.globus._endpoint_id_root('local') self.assertEqual(ENDPOINT_ID, id) self.assertEqual(path, globus.as_globus_path(self.root_path)) # Check behaviour when endpoint not in list expected = uuid.uuid4() id, path = self.globus._endpoint_id_root(expected) self.assertEqual(expected, id) self.assertIsNone(path) # Should warn when ambiguous self.globus.add_endpoint(ENDPOINT_ID, label='foo', root_path='/foo/bar') with self.assertWarns(UserWarning): id, path = self.globus._endpoint_id_root(ENDPOINT_ID) self.assertEqual(ENDPOINT_ID, id) self.assertEqual(path, globus.as_globus_path(self.root_path)) with self.assertRaises(ValueError): self.globus._endpoint_id_root('remote')
[docs] def test_ls(self): """Test for Globus.ls method.""" response = dict( name=Path(self.root_path, f'some.{uuid.uuid4()}.file').as_posix(), type='file', size=1024) err = globus_sdk.GlobusConnectionError('', ConnectionError) self.globus.client.operation_ls.side_effect = (err, [response]) path = globus.as_globus_path(self.root_path) out = self.globus.ls('local', path) self.assertEqual(2, self.globus.client.operation_ls.call_count) self.assertIsInstance(out[0], PurePosixPath) self.assertEqual(response['name'], out[0].as_posix()) # Remove uuid and return size args self.globus.client.operation_ls.side_effect = ([response],) out, = self.globus.ls('local', path, remove_uuid=True, return_size=True) self.assertEqual(response['size'], out[1]) self.assertNotIn(response['name'].split('.')[-2], str(out[0])) self.globus.client.operation_ls.reset_mock() self.globus.client.operation_ls.side_effect = err with self.assertRaises(globus_sdk.GlobusConnectionError): self.globus.ls('local', path, max_retries=2) self.assertEqual(3, self.globus.client.operation_ls.call_count)
[docs] def test_mv(self): """Test for Globus.mv and Globus.run_task methods.""" source = ('some.file', 'some2.file') destination = ('new.file', 'new2.file') # Mock transfer output task_id = uuid.uuid1() self.globus.client.submit_transfer.return_value = {'task_id': str(task_id)} self.globus.client.get_task.return_value = {'status': 'SUCCEEDED'} self.globus.client.task_successful_transfers.return_value = \ [dict(source_path=src, destination_path=dst) for src, dst in zip(source, destination)] self.globus.client.task_skipped_errors.return_value = [] task_response = self.globus.mv('local', 'local', source, destination) self.assertEqual(task_id, task_response) # Test errors # Check timeout behaviour self.globus.client.task_wait.reset_mock() self.globus.client.task_wait.return_value = False timeout = 10 with self.assertRaises(IOError) as ex: self.globus.mv('local', 'local', source, destination, timeout=timeout) self.assertIn(str(task_id), str(ex)) self.assertEqual(timeout, self.globus.client.task_wait.call_count) # Check status check error behaviour self.globus.client.task_wait.return_value = True self.globus.client.task_successful_transfers.side_effect = \ globus_sdk.TransferAPIError(mock.MagicMock()) with self.assertLogs(logging.getLogger('one.remote.globus'), logging.WARNING): self.globus.mv('local', 'local', source, destination) # Check failed transfer self.globus.client.get_task.return_value = {'status': 'FAILED'} self.globus.client.task_successful_transfers.reset_mock() with self.assertRaises(IOError) as ex: self.globus.mv('local', 'local', source, destination) self.assertIn(self.globus.client.get_task.return_value['status'], str(ex)) # Check submission error behaviour self.globus.client.submit_transfer.side_effect = \ globus_sdk.GlobusConnectionError('', ConnectionError) default_n_retries = 3 with self.assertLogs(logging.getLogger('one.remote.globus')) as log, \ self.assertRaises(globus_sdk.GlobusConnectionError): self.globus.mv('local', 'local', source, destination) warnings = filter(lambda x: x.levelno == 30, log.records) self.assertEqual(default_n_retries, len(list(warnings))) self.assertRegex(log.records[-1].msg, 'Max retries') self.assertEqual('ERROR', log.records[-1].levelname)
[docs] def test_transfer_data(self): """Test for Globus.transfer_data method.""" src_id, dst_id = uuid.uuid1(), uuid.uuid1() self.globus.endpoints['repo_01'] = {'id': dst_id, 'root_path': '/mnt/s0/'} self.globus.endpoints['repo_00'] = {'id': src_id, 'root_path': '/mnt/h0/Data'} sdk_mock, _ = self.globus_sdk_mock.get_original() response_mock = mock.create_autospec(globus_sdk.response.GlobusHTTPResponse) response_mock.data = {'task_id': str(uuid.uuid1())} self.globus.client.submit_transfer.return_value = response_mock out = self.globus.transfer_data('path/to/file', 'repo_00', 'repo_01', foo='bar') # SDK should be called with endpoint IDs and optional kwargs sdk_mock.TransferData.assert_called_once_with( self.globus.client, foo='bar', source_endpoint=src_id, destination_endpoint=dst_id) sdk_mock.TransferData().add_item.assert_called_once_with( '/mnt/h0/Data/path/to/file', '/mnt/s0/path/to/file', recursive=False) self.globus.client.submit_transfer.assert_called_once() self.assertEqual(out, uuid.UUID(response_mock.data['task_id'])) # Test passing list of files sdk_mock.reset_mock() files = ['path/to/file', 'foo/bar.baz'] self.globus.transfer_data(files, 'repo_00', 'repo_01') self.assertEqual(sdk_mock.TransferData().add_item.call_count, len(files))
[docs] def test_download_file(self): """Test for Globus.download_file method.""" self.globus.endpoints['repo_01'] = {'id': ENDPOINT_ID, 'root_path': '/mnt/s0/'} task_id = 'abc123' files = ['foo/bar.file', 'foo/foo/bar.file', 'baz.file'] # create files on disk transferred = [] for f in reversed(files): # reversed to check files reordered full_path = self.root_path / f full_path.parent.mkdir(parents=True, exist_ok=True) full_path.touch() transferred.append({'destination_path': globus.as_globus_path(full_path)}) self.globus.client.task_successful_transfers.return_value = transferred with mock.patch.object(self.globus, 'run_task', return_value=task_id): downloaded = self.globus.download_file(files, 'repo_01') expected = list(map(self.root_path.joinpath, files)) self.assertEqual(downloaded, expected) # asserts order of list identical # Behaviour should be similar when a folder is downloaded (i.e. recursive is True) with mock.patch.object(self.globus, 'run_task', return_value=task_id): downloaded = self.globus.download_file('folder', 'repo_01', recursive=True) self.assertCountEqual(downloaded, expected) # Should return a single element if one file downloaded self.globus.client.task_successful_transfers.return_value = [transferred[0]] with mock.patch.object(self.globus, 'run_task', return_value=task_id): downloaded = self.globus.download_file(files[-1], 'repo_01', recursive=False) self.assertIsInstance(downloaded, Path) # Should raise assertion error if file doesn't exist on disk (unlikely!) self.root_path.joinpath(files[0]).unlink() self.globus.client.task_successful_transfers.return_value = transferred with mock.patch.object(self.globus, 'run_task', return_value=task_id): self.assertRaises(AssertionError, self.globus.download_file, files, 'repo_01')
[docs] def test_delete_data(self): """Test for Globus.delete_data method.""" globus_id = uuid.uuid1() self.globus.endpoints['repo_00'] = {'id': globus_id, 'root_path': '/mnt/h0/Data'} sdk_mock, _ = self.globus_sdk_mock.get_original() response_mock = mock.create_autospec(globus_sdk.response.GlobusHTTPResponse) response_mock.data = {'task_id': str(uuid.uuid1())} self.globus.client.submit_delete.return_value = response_mock out = self.globus.delete_data('path/to/file', 'repo_00', foo='bar') # SDK should be called with endpoint IDs and optional kwargs sdk_mock.DeleteData.assert_called_once_with( self.globus.client, recursive=False, foo='bar', endpoint=globus_id) sdk_mock.DeleteData().add_item.assert_called_once_with('/mnt/h0/Data/path/to/file') self.globus.client.submit_delete.assert_called_once() self.assertEqual(out, uuid.UUID(response_mock.data['task_id'])) # Test passing list of files sdk_mock.reset_mock() files = ['path/to/file', 'foo/bar.baz'] self.globus.delete_data(files, 'repo_00') self.assertEqual(sdk_mock.DeleteData().add_item.call_count, len(files))
[docs] def test_globus_headless(self): """Test for Globus object in headless mode.""" self.assertRaises(RuntimeError, globus.Globus, 'foobar', headless=True) pars = self.globus._pars with mock.patch('one.remote.globus._setup', return_value=pars) as setup_function: globus.Globus('foobar', headless=False, connect=False) setup_function.assert_called()
[docs] def test_login_logout(self): """Test for Globus.login and Globus.logout methods.""" assert self.globus.is_logged_in sdk_mock, _ = self.globus_sdk_mock.get_original() with self.assertLogs('one.remote.globus', 10): # Token validator checks token auth class, which is mocked, so here we set the # RefreshTokenAuthorizer to a MagicMock so that the types match sdk_mock.RefreshTokenAuthorizer = mock.MagicMock self.globus.login() self.globus.client.authorizer.ensure_valid_token.assert_called() # Log out # Change client name in order to avoid overwriting parameters with mock.patch('one.remote.globus.save_client_params') as save_func, \ mock.patch('one.remote.globus.load_client_params', return_value=self.pars): self.globus.logout() save_func.assert_called() (all_pars, *_), _ = save_func.call_args self.assertNotIn('access_token', all_pars[self.globus.client_name]) self.globus.logout() # check repeat calls don't raise errors self.assertIsNone(self.globus.client.authorizer.get_authorization_header()) self.assertFalse(hasattr(self.globus.client.authorizer, 'access_token')) self.assertFalse(hasattr(self.globus._pars, 'access_token')) self.assertFalse(self.globus.is_logged_in) self.assertTrue(self.globus._token_expired) # Test what happens when authenticate called with invalid token self.assertRaises(RuntimeError, self.globus._authenticate) # Check login in headless mode self.globus.headless = True self.assertRaises(RuntimeError, self.globus.login) self.globus.headless = False # Test login cancel with mock.patch('one.remote.globus.load_client_params', return_value=self.pars), \ mock.patch('builtins.input', return_value='c'), \ self.assertLogs('one.remote.globus', 10): self.globus.login() self.assertFalse(self.globus.is_logged_in) token = {'refresh_token': None, 'expires_at_seconds': datetime.now().timestamp() + 60**2, 'access_token': 'a1b2c3d4e5f6g7h8'} # Stop and start mock in order to reset MagicMock attributes self.globus_sdk_mock.stop() self.globus_sdk_mock = self.globus_sdk_mock.start() self.addCleanup(self.globus_sdk_mock.stop) with mock.patch('one.remote.globus.save_client_params') as save_func, \ mock.patch('one.remote.globus.get_token', return_value=token), \ mock.patch('one.remote.globus.load_client_params', return_value=self.pars): # Expected refresh token warning as stay_logged_in is True # In reality this will only happen when loading saved taken where refresh_token = False self.assertWarns(UserWarning, self.globus.login, stay_logged_in=True) self.assertTrue(self.globus.is_logged_in)
[docs] def test_save_refresh_token_callback(self): """Test for Globus._save_refresh_token_callback method.""" assert hasattr(self.globus._pars, 'refresh_token') token = {'refresh_token': '567', 'access_token': 'abc', 'expires_at_seconds': 100000000} res = mock.MagicMock(spec=globus_sdk.services.auth.OAuthTokenResponse) res.by_resource_server = dict(server=token) # Check behaviour when called with Globus auth response with mock.patch('one.remote.globus.save_client_params') as client_params_mock: self.globus._save_refresh_token_callback(res) client_params_mock.assert_called_once() (pars, name), _ = client_params_mock.call_args self.assertEqual(name, 'globus') self.assertIn(self.globus.client_name, pars) self.assertTrue(set(pars[self.globus.client_name]) >= set(globus.DEFAULT_PAR)) for k, v in token.items(): with self.subTest(k): self.assertEqual(pars[self.globus.client_name].get(k), v) # Obj params should be modified par_vals = map(partial(getattr, self.globus._pars), token.keys()) self.assertCountEqual(par_vals, token.values()) # Check behaviour when called with empty auth response res.by_resource_server = dict() with mock.patch('one.remote.globus.save_client_params') as client_params_mock: self.globus._save_refresh_token_callback(res) client_params_mock.assert_not_called()
[docs] class TestGlobusAsync(unittest.IsolatedAsyncioTestCase, _GlobusClientTest): """Asynchronous Globus method tests."""
[docs] async def test_task_wait_async(self): """Test for Globus.task_wait_async method.""" task_id = uuid.uuid4() statuses = ({'status': 'ACTIVE'}, {'status': 'SUCCESSFUL'}) with mock.patch('asyncio.sleep', new_callable=mock.AsyncMock) as sleep_mock, \ mock.patch.object(self.globus.client, 'get_task', side_effect=statuses): self.assertTrue(await self.globus.task_wait_async(task_id, polling_interval=5)) sleep_mock.assert_awaited_once_with(5) # polling_interval value # Check timeout behaviour status = statuses[0] with mock.patch('asyncio.sleep', new_callable=mock.AsyncMock) as sleep_mock, \ mock.patch.object(self.globus.client, 'get_task', return_value=status): self.assertFalse(await self.globus.task_wait_async(task_id, polling_interval=3)) sleep_mock.assert_awaited_with(3) # polling_interval value self.assertEqual(round(10 / 3), sleep_mock.await_count) # timeout = 10 # Check input validation with self.assertRaises(globus_sdk.GlobusSDKUsageError): await self.globus.task_wait_async(task_id, polling_interval=.5) with self.assertRaises(globus_sdk.GlobusSDKUsageError): await self.globus.task_wait_async(task_id, timeout=.5)
if __name__ == '__main__': unittest.main()