Source code for one.remote.globus

"""A module for handling file operations through the Globus SDK.

Setup
-----

To set up Globus simply instantiate the `Globus` class for the first time and follow the prompts.
Providing a client name string to the constructor allows one to set up multiple Globus clients
(i.e. when switching between different Globus client IDs).

In order to use this function you need:

1. The client ID of an existing Globus Client (`see this tutorial`_).
2. Set up `Global Connect`_ on your local device.
3. Register your local device as an `endpoint`_ in your Globus Client.


To modify the settings for a pre-established client, call the `Globus.setup` method with the client
name:

>>> globus = Globus.setup('default')

You can update the list of endpoints using the `fetch_endpoints_from_alyx` method:

>>> globus = Globus('admin')
>>> remote_endpoints = globus.fetch_endpoints_from_alyx(alyx=AlyxClient())

The endpoints are stored in the `endpoints` property

>>> print(globus.endpoints.keys())
>>> print(globus.endpoints['local'])

.. _see this tutorial: https://globus-sdk-python.readthedocs.io/en/stable/tutorial.html
.. _Global Connect: https://www.globus.org/globus-connect-personal
.. _endpoint: https://app.globus.org/


Examples
--------
Get the full Globus file path

>>> relative_path = 'subject/2020-01-01/001/alf/_ibl_trials.table.pqt'
>>> full_path = globus.to_address(relative_path, 'flatiron_cortexlab')

Log in with a limited time token

>>> globus = Globus('admin')
>>> globus.login(stay_logged_in=False)

Log out of Globus, revoking and deleting all tokens

>>> globus.logout()
>>> assert not globus.is_logged_in

Asynchronously transfer data between Alyx repositories

>>> alyx = AlyxClient()
>>> glo = Globus('admin')
>>> glo.add_endpoint('flatiron_cortexlab', alyx=alyx)
>>> glo.add_endpoint('cortex_lab_SR', alyx=alyx)
>>> task_id = glo.transfer_data('path/to/file', 'flatiron_cortexlab', 'cortex_lab_SR')

Synchronously transfer data to an alternate local location

>>> from functools import partial
>>> root_path = '/path/to/new/location'
>>> glo.add_endpoint(get_local_endpoint_id(), label='alternate_local', root_path=root_path)
>>> folder = 'camera/ZFM-01867/2021-03-23/002'  # An example folder to download
>>> task = partial(glo.transfer_data, folder, 'integration', 'integration_local',
...                label='alternate data', recursive=True)
>>> task_id = glo.run_task(task)  # Submit task to Globus and await completion

Temporarily change local data root path and synchronously download file

>>> glo.endpoints['local']['root_path'] = '/path/to/new/location'
>>> file = glo.download_file('path/to/file.ext', 'source_endpoint')
Path('/path/to/new/location/path/to/file.ext')

Await multiple tasks to complete by passing a list of Globus tranfer IDs

>>> import asyncio
>>> tasks = [asyncio.create_task(globus.task_wait_async(task_id))) for task_id in task_ids]
>>> success = asyncio.run(asyncio.gather(*tasks))

"""
import os
import re
import sys
import asyncio
import logging
from uuid import UUID
from datetime import datetime
from pathlib import Path, PurePosixPath, PurePath, PureWindowsPath
import warnings
from functools import partial, wraps

import globus_sdk
from globus_sdk import TransferAPIError, GlobusAPIError, NetworkError, GlobusTimeoutError, \
    GlobusConnectionError, GlobusConnectionTimeoutError, GlobusSDKUsageError, NullAuthorizer
from iblutil.io import params as iopar

from one.alf.spec import is_uuid
from one.alf.files import remove_uuid_string
import one.params
from one.webclient import AlyxClient
from one.util import ensure_list
from .base import DownloadClient, load_client_params, save_client_params

__all__ = ['Globus', 'get_lab_from_endpoint_id', 'as_globus_path']
_logger = logging.getLogger(__name__)
CLIENT_KEY = 'globus'
"""str: The default key in the remote settings file"""

DEFAULT_PAR = {'GLOBUS_CLIENT_ID': None, 'local_endpoint': None, 'local_path': None}
"""dict: The default Globus parameter fields"""

STATUS_MAP = {
    'ACTIVE': ('QUEUED', 'ACTIVE', 'GC_NOT_CONNECTED', 'UNKNOWN'),
    'FAILED': ('ENDPOINT_ERROR', 'PERMISSION_DENIED', 'CONNECT_FAILED'),
    'INACTIVE': 'PAUSED_BY_ADMIN'}
"""dict: A map of Globus status to "nice" status"""


def ensure_logged_in(func):
    """Decorator for the Globus methods.

    Before calling methods that require authentication, attempts to log in. If the user is already
    logged in, the token may be refreshed to extend the session. If the token has expired and not
    in headless mode, the user is prompted to authorize a new session.  If in headless mode and not
    logged in an error is raised.

    Parameters
    ----------
    func : function
        Method to wrap (e.g. Globus.transfer_data).

    Returns
    -------
    function
        Handle to wrapped method.
    """
    @wraps(func)
    def wrapper_decorator(self, *args, **kwargs):
        self.login()
        return func(self, *args, **kwargs)
    return wrapper_decorator


def _setup(par_id=None, login=True, refresh_tokens=True):
    """
    Sets up Globus as a backend for ONE functions.

    Parameters
    ----------
    par_id : str
        Parameter profile name to set up e.g. 'default', 'admin'.

    Returns
    -------
    IBLParams
        A set of Globus parameters.
    """

    print('Setting up Globus parameter file. See docstring for help.')
    if not par_id:
        default_par_id = 'default'
        par_id = input(
            f'Enter name for this client or press Enter to keep value "{default_par_id}": '
        )
        par_id = par_id.strip() or default_par_id

    # Read existing globus params if present
    globus_pars = iopar.as_dict(load_client_params(CLIENT_KEY, assert_present=False) or {})
    pars = {**DEFAULT_PAR, **globus_pars.get(par_id, {})}

    # Set GLOBUS_CLIENT_ID
    current_id = pars['GLOBUS_CLIENT_ID']
    if current_id:
        prompt = (f'Found Globus client ID in parameter file ({current_id}). '
                  'Press Enter to keep it, or enter a new ID here: ')
        pars['GLOBUS_CLIENT_ID'] = input(prompt).strip() or current_id
    else:
        new_id = input('Please enter the Globus client ID: ').strip()
        if not new_id:
            raise ValueError('Globus client ID is a required field')
        pars['GLOBUS_CLIENT_ID'] = new_id
    if not is_uuid(pars['GLOBUS_CLIENT_ID']):
        raise ValueError('Invalid Globus client ID "%s"', pars['GLOBUS_CLIENT_ID'])

    # Find and set local ID
    message = 'Please enter the local endpoint ID'
    try:
        default_endpoint = str(pars['local_endpoint'] or get_local_endpoint_id())
        message += f' (default: {default_endpoint})'
    except AssertionError:
        default_endpoint = ''
        warnings.warn(
            'Cannot find local endpoint ID. Beware that this might mean that Globus Connect '
            'is not set up properly.')
    pars['local_endpoint'] = input(message + ':').strip() or default_endpoint
    if not is_uuid(pars['local_endpoint'], (1, 2)):
        raise ValueError('Globus local endpoint ID must be a UUID version 1 or 2')

    # Check for local path
    message = 'Please enter the local endpoint path'
    local_path = pars['local_path'] or one.params.get(silent=True).CACHE_DIR
    message += f' (default: {local_path})'
    pars['local_path'] = input(message + ':').strip() or local_path

    if login:
        # Log in manually and get refresh token to avoid having to login repeatedly
        token = get_token(pars['GLOBUS_CLIENT_ID'], refresh_tokens=refresh_tokens)
        pars.update(token)

    globus_pars[par_id] = pars
    save_client_params(globus_pars, client_key=CLIENT_KEY)
    print('Finished setup.')
    return iopar.from_dict(pars)


def get_token(client_id, refresh_tokens=True):
    """
    Get a Globus authentication token.

    This step requires the user to login to Globus via a browser.

    Parameters
    ----------
    client_id : str
        A Globus client ID.
    refresh_tokens : bool
        If true, requests a refresh token for repeat logins.

    Returns
    -------
    dict
        A dict containing the keys {'refresh_token', 'access_token', 'expires_at_seconds'}.
    """
    client = globus_sdk.NativeAppAuthClient(client_id)
    client.oauth2_start_flow(refresh_tokens=bool(refresh_tokens))
    authorize_url = client.oauth2_get_authorize_url()
    fields = ('refresh_token', 'access_token', 'expires_at_seconds')
    print('To get a new token, go to this URL and login: {0}'.format(authorize_url))
    auth_code = input('Enter the code you get after login here (press "c" to cancel): ').strip()
    if auth_code and auth_code.lower() != 'c':
        token_response = client.oauth2_exchange_code_for_tokens(auth_code)
        globus_transfer_data = token_response.by_resource_server['transfer.api.globus.org']
        return {k: globus_transfer_data.get(k) for k in fields}
    else:
        return dict.fromkeys(fields)


def create_globus_client(client_name='default'):
    """
    Creates a Globus transfer client based on existing parameter file.

    Parameters
    ----------
    client_name : str
        Defines the parameter name to use (globus.client_name), e.g. 'default', 'admin'.

    Returns
    -------
    globus_sdk.TransferClient
        Globus transfer client instance.
    """
    msg = 'create_globus_client will be removed in the future, use Globus class instead.'
    warnings.warn(msg, FutureWarning)
    try:
        globus_pars = load_client_params(f'{CLIENT_KEY}.{client_name}')
    except (AttributeError, FileNotFoundError):
        _setup(client_name, login=True, refresh_tokens=True)
        globus_pars = load_client_params(f'{CLIENT_KEY}.{client_name}', assert_present=False) or {}

    # Assert valid Globus client ID found
    if not (globus_pars and 'GLOBUS_CLIENT_ID' in iopar.as_dict(globus_pars)
            and is_uuid(globus_pars.GLOBUS_CLIENT_ID)):
        raise ValueError(
            'Invalid Globus client ID in client parameter file. Rerun Globus.setup'
        )

    if iopar.as_dict(globus_pars).get('refresh_token'):
        client = globus_sdk.NativeAppAuthClient(globus_pars.GLOBUS_CLIENT_ID)
        client.oauth2_start_flow(refresh_tokens=True)
        authorizer = globus_sdk.RefreshTokenAuthorizer(globus_pars.refresh_token, client)
        return globus_sdk.TransferClient(authorizer=authorizer)
    else:
        authorizer = globus_sdk.AccessTokenAuthorizer(globus_pars.access_token)
        return globus_sdk.TransferClient(authorizer=authorizer)


def _remove_token_fields(pars):
    """
    Remove the token fields from a parameters object.

    Parameters
    ----------
    pars : IBLParams, dict
        The Globus parameters containing token fields.

    Returns
    -------
    IBLParams
        A copy of the params without the token fields.
    """
    if pars is None:
        return pars
    fields = ('refresh_token', 'access_token', 'expires_at_seconds')
    return iopar.from_dict({k: v for k, v in iopar.as_dict(pars).items() if k not in fields})


def _save_globus_params(pars, client_name):
    """Save Globus client parameters.

    Parameters
    ----------
    pars : IBLParams, dict
        The Globus client parameters to save.
    client_name : str
        The Globus client name, e.g. 'default'.

    """
    globus_pars = iopar.as_dict(load_client_params(CLIENT_KEY, assert_present=False) or {})
    globus_pars[client_name] = iopar.as_dict(pars)
    save_client_params(globus_pars, CLIENT_KEY)


def get_local_endpoint_id():
    """
    Extracts the ID of the local Globus Connect endpoint.

    Returns
    -------
    uuid.UUID
        The local Globus endpoint ID.
    """
    msg = ('Cannot find local endpoint ID, check if Globus Connect is set up correctly, '
           '{} exists and contains a UUID.')
    if sys.platform in ('win32', 'cygwin'):
        id_path = Path(os.environ['LOCALAPPDATA']).joinpath('Globus Connect')
    else:
        id_path = Path.home().joinpath('.globusonline', 'lta')

    id_file = id_path.joinpath('client-id.txt')
    assert id_file.exists(), msg.format(id_file)
    local_id = id_file.read_text().strip()
    assert isinstance(local_id, str), msg.format(id_file)
    _logger.debug(f'Found local endpoint ID in Globus Connect settings {local_id}')
    return UUID(local_id)


def get_local_endpoint_paths():
    """
    Extracts the local endpoint paths accessible by Globus Connect.  NB: This is only supported
    on Linux.

    Returns
    -------
    list of pathlib.Path
        Local endpoint paths set in Globus Connect.
    """
    if sys.platform in ('win32', 'cygwin'):
        print('On windows the local Globus path needs to be entered manually')
        return []
    else:
        path_file = Path.home().joinpath('.globusonline', 'lta', 'config-paths')
        if path_file.exists():
            local_paths = map(Path, filter(None, path_file.read_text().strip().split(',')))
            _logger.debug('Found local endpoint paths in Globus Connect settings')
        else:
            msg = ('Cannot find local endpoint path, check if Globus Connect is set up correctly, '
                   '{} exists and contains a valid path.')
            warnings.warn(msg.format(path_file))
            local_paths = []
        return list(local_paths)


[docs] def get_lab_from_endpoint_id(endpoint=None, alyx=None): """ Extracts lab names associated with a given an endpoint UUID. Finds the lab names that are associated to data repositories with the provided Globus endpoint UUID. Parameters ---------- endpoint : uuid.UUID, str Endpoint UUID, optional if not given will get attempt to find local endpoint UUID. alyx : one.webclient.AlyxClient An instance of AlyxClient to use. Returns ------- list The lab names associated with the endpoint UUID. """ alyx = alyx or AlyxClient(silent=True) if not endpoint: endpoint = get_local_endpoint_id() lab = alyx.rest('labs', 'list', django=f'repositories__globus_endpoint_id,{endpoint}') if len(lab): lab_names = [la['name'] for la in lab] return lab_names
[docs] def as_globus_path(path): """ Convert a path into one suitable for the Globus TransferClient. Parameters ---------- path : pathlib.Path, pathlib.PurePath, str A path to convert to a Globus-complient path string. Returns ------- str A formatted path string. Notes ----- - If using tilda in path, the home folder of your Globus Connect instance must be the same as the OS home dir. - If validating a path for another system ensure the input path is a PurePath, in particular, on a Linux computer a remote Windows should first be made into a PureWindowsPath. Examples -------- A Windows path (on Windows OS) >>> as_globus_path('E:\\FlatIron\\integration') '/E/FlatIron/integration' When explicitly a POSIX path, remains unchanged >>> as_globus_path(PurePosixPath('E:\\FlatIron\\integration')) 'E:\\FlatIron\\integration' A relative POSIX path (on *nix OS) >>> as_globus_path('../data/integration') '/mnt/data/integration' A valid Globus path remains unchanged >>> as_globus_path('/E/FlatIron/integration') '/E/FlatIron/integration' """ is_pure_path = isinstance(path, PurePath) is_win = sys.platform in ('win32', 'cygwin') or isinstance(path, PureWindowsPath) if isinstance(path, str): path = Path(path) if ( re.match(r'/[A-Z]($|/)', path.as_posix()) if is_win else path.is_absolute() ): return path.as_posix() if not is_pure_path: path = path.resolve() if path.drive: path = '/' + str(path.as_posix().replace(':', '', 1)) return str(path)
[docs] class Globus(DownloadClient): def __init__(self, client_name='default', connect=True, headless=False): """ Wrapper for managing files on Globus endpoints. Parameters ---------- client_name : str Parameter profile name to load e.g. 'default', 'admin'. connect : bool Whether to create the Globus SDK client on init. headless : bool If true, raises ValueError if unable to log in automatically. Otherwise the user is prompted to enter information. Examples -------- Instantiate without authentication >>> globus = Globus(connect=False) Instantiate without user prompts >>> globus = Globus('server', headless=True) """ # Setting up transfer client super().__init__() self.client = None self.client_name = client_name self.headless = headless self._pars = load_client_params(f'{CLIENT_KEY}.{client_name}', assert_present=False) # If no parameters, Globus must be set up for this client if self._pars is None: if self.headless: raise RuntimeError(f'Globus not set up for client "{self.client_name}"') self._pars = _setup(self.client_name, login=False) if connect: self.login() # Try adding local endpoint self.endpoints = {'local': {'id': UUID(self._pars.local_endpoint)}} _logger.info('Adding local endpoint.') self.endpoints['local']['root_path'] = self._pars.local_path @property def is_logged_in(self): """bool: Check if client exists and is authenticated""" has_token = self.client and self.client.authorizer.get_authorization_header() is not None return has_token and not self._token_expired @property def _token_expired(self): """bool: True if token absent or expired; False if valid Note the 'expires_at_seconds' may be greater than `Globus.client.authorizer.expires_at` if using refresh tokens. The `login` method will always refresh the token if still valid. """ try: authorizer = getattr(self.client, 'authorizer', None) has_refresh_token = self._pars.as_dict().get('refresh_token') is not None if has_refresh_token and isinstance(authorizer, globus_sdk.RefreshTokenAuthorizer): self.client.authorizer.ensure_valid_token() # Fetch new refresh token if needed except Exception as ex: _logger.debug('Failed to refresh token: %s', ex) expires_at_seconds = getattr(self._pars, 'expires_at_seconds', 0) return expires_at_seconds - datetime.utcnow().timestamp() < 60
[docs] def login(self, stay_logged_in=None): """ Authenticate Globus client. Parameters ---------- stay_logged_in : bool, optional If True, use refresh token to remain logged in for longer. If False, use an auth token without the option of refreshing when expired. If not specified, uses the refresh token if available. """ if self.is_logged_in: _logger.debug('Already logged in') return # Default depends on refresh token stay_logged_in = True if stay_logged_in is None else stay_logged_in expired = bool( self._pars.as_dict().get('refresh_token') is None if stay_logged_in else self._token_expired ) # If no tokens in parameters, Globus must be authenticated required_fields = {'refresh_token', 'access_token', 'expires_at_seconds'} if not required_fields.issubset(iopar.as_dict(self._pars)) or expired: if self.headless: raise RuntimeError(f'Globus not authenticated for client "{self.client_name}"') token = get_token(self._pars.GLOBUS_CLIENT_ID, refresh_tokens=stay_logged_in) if not any(token.values()): _logger.debug('Login cancelled by user') return self._pars = iopar.from_dict({**self._pars.as_dict(), **token}) _save_globus_params(self._pars, self.client_name) # Ready to authenticate self._authenticate(stay_logged_in)
[docs] def logout(self): """Revoke any tokens and delete them from the client and parameter file.""" if self.client and self.client.authorizer and \ not isinstance(self.client.authorizer, NullAuthorizer): self.client.authorizer.auth_client.oauth2_revoke_token() del self.client.authorizer self.client.authorizer = NullAuthorizer() if pars := load_client_params(f'{CLIENT_KEY}.{self.client_name}', assert_present=False): _save_globus_params(_remove_token_fields(pars), self.client_name) self._pars = _remove_token_fields(self._pars)
def _authenticate(self, stay_logged_in=None): """Authenticate and instantiate Globus SDK client.""" if self._pars.as_dict().get('refresh_token') and stay_logged_in is not False: client = globus_sdk.NativeAppAuthClient(self._pars.GLOBUS_CLIENT_ID) client.oauth2_start_flow(refresh_tokens=True) authorizer = globus_sdk.RefreshTokenAuthorizer( self._pars.refresh_token, client, on_refresh=self._save_refresh_token_callback) else: if stay_logged_in is True: warnings.warn('No refresh token. Please log out and back in to remain logged in.') if self._token_expired is not False: raise RuntimeError(f'token no longer valid for client "{self.client_name}"') authorizer = globus_sdk.AccessTokenAuthorizer(self._pars.access_token) self.client = globus_sdk.TransferClient(authorizer=authorizer) def _save_refresh_token_callback(self, res): """ Save a token fetched by the refresh token authorizer. This is a callback for the globus_sdk.RefreshTokenAuthorizer to update the parameters. Parameters ---------- res : globus_sdk.services.auth.OAuthTokenResponse An Open Authorization response object. """ if not res or not (token := next(iter(res.by_resource_server.values()), None)): return token_fields = {'refresh_token', 'access_token', 'expires_at_seconds'} self._pars = iopar.from_dict( {**self._pars.as_dict(), **{k: v for k, v in token.items() if k in token_fields}}) _save_globus_params(self._pars, self.client_name)
[docs] def fetch_endpoints_from_alyx(self, alyx=None, overwrite=False): """ Update endpoints property with Alyx Globus data repositories. Parameters ---------- alyx : one.webclient.AlyxClient An optional AlyxClient. overwrite : bool Whether existing endpoint with the same label should be replaced. Returns ------- dict The endpoints added from Alyx. """ alyx = alyx or AlyxClient() alyx_endpoints = alyx.rest('data-repository', 'list') for endpoint in alyx_endpoints: if not endpoint['globus_endpoint_id']: continue uid = UUID(endpoint['globus_endpoint_id']) self.add_endpoint( uid, label=endpoint['name'], root_path=endpoint['globus_path'], overwrite=overwrite ) endpoint_names = {e['name'] for e in alyx_endpoints} return {k: v for k, v in self.endpoints.items() if k in endpoint_names}
[docs] def to_address(self, data_path, endpoint): """ Get full path for a given endpoint. Parameters ---------- data_path : Path, PurePath, str An absolute or relative POSIX path endpoint : str, uuid.UUID An endpoint label or UUID. Returns ------- str A complete path string formatted for Globus. Examples -------- >>> glo = Globus() >>> glo.add_endpoint('0ec47586-3a19-11eb-b173-0ee0d5d9299f', ... label='foobar', root_path='/foo') >>> glo.to_address('bar/baz.ext', 'foobar') '/foo/bar/baz.ext' """ _, root_path = self._endpoint_id_root(endpoint) return self._endpoint_path(data_path, root_path)
[docs] @ensure_logged_in def download_file(self, file_address, source_endpoint, recursive=False, **kwargs): """ Download one or more files via Globus. Parameters ---------- file_address : str, list of str One or more relative POSIX paths to download. source_endpoint : str, uuid.UUID The source endpoint name or uuid. recursive : bool If true, transfer the contents of nested directories (NB: all data_paths must be directories). **kwargs See Globus.transfer_data. Returns ------- pathlib.Path, list of pathlib.Path The downloaded file path(s). If recursive is True, a list is always returned. Notes ----- - Assumes that the local endpoint root path is NOT POSIX style on Windows. TODO Return None for failed files Examples -------- Download a single file >>> file = Globus().download_file('path/to/file', '0ec47586-3a19-11eb-b173-0ee0d5d9299f') Download multiple files and verify checksum >>> files = ['relative/file/path.ext', 'foo.bar'] >>> files = Globus().download_file(files, 'source_endpoint_name', verify_checksum=True) Download a folder >>> files = Globus().download_file('folder/path', 'source_endpoint_name', recursive=True) """ return_single = isinstance(file_address, str) and recursive is False kwargs['label'] = kwargs.get('label', 'ONE download') task = partial(self.transfer_data, file_address, source_endpoint, 'local', recursive=recursive, **kwargs) task_id = self.run_task(task) files = [] root = Path(self.endpoints['local']['root_path']) idx = len(self._endpoint_path(PurePosixPath(as_globus_path(root)))) for info in self.client.task_successful_transfers(task_id): files.append(info['destination_path'][idx:].strip('/')) if return_single: file = root / files[0] assert file.exists() return file # Order files by input def _best_match(x): """Return the index of the input file that best matches downloaded file.""" spans = [len(frag) / len(x) if frag in x else 0 for frag in ensure_list(file_address)] return spans.index(max(spans)) files = list(map(root.joinpath, sorted(files, key=_best_match))) assert all(map(Path.exists, filter(None, files))) return files
[docs] @staticmethod def setup(client_name='default', **kwargs): """ Setup a Globus client. In order to use this function you need: 1. The client ID of an existing Globus Client (`see this tutorial`_). 2. Set up `Global Connect`_ on your local device. 3. Register your local device as an `endpoint`_ in your Globus Client. .. _see this tutorial: https://globus-sdk-python.readthedocs.io/en/stable/tutorial.html .. _Global Connect: https://www.globus.org/globus-connect-personal .. _endpoint: https://app.globus.org/ Parameters ---------- client_name : str Parameter profile name to set up e.g. 'default', 'admin'. **kwargs Optional Globus constructor arguments. Returns ------- Globus A new Globus client object. """ _setup(client_name, login=False) return Globus(client_name, **kwargs)
[docs] def add_endpoint(self, endpoint, label=None, root_path=None, overwrite=False, alyx=None): """ Add an endpoint to the Globus instance to be used by other functions. Parameters ---------- endpoint : uuid.UUID, str The endpoint UUID or database repository name of the endpoint. label : str Label to access the endpoint. If endpoint is UUID this has to be set, otherwise is optional. root_path : str, pathlib.Path, pathlib.PurePath File path to be accessed by Globus on the endpoint. overwrite : bool Whether existing endpoint with the same label should be replaced. alyx : one.webclient.AlyxClient An AlyxClient instance for looking up repository information. """ if is_uuid(endpoint, versions=(1, 2)): # MAC address UUID if label is None: raise ValueError('If "endpoint" is a UUID, "label" cannot be None.') endpoint_id = self._ensure_uuid(endpoint) else: repo = self.repo_from_alyx(endpoint, alyx=alyx) endpoint_id = UUID(repo['globus_endpoint_id']) root_path = root_path or repo['globus_path'] label = label or endpoint if label in self.endpoints.keys() and overwrite is False: _logger.error(f'An endpoint called "{label}" already exists. Choose a different label ' 'or set overwrite=True') else: self.endpoints[label] = {'id': endpoint_id} if root_path: self.endpoints[label]['root_path'] = root_path
@staticmethod def _endpoint_path(path, root_path=None): """ Given an absolute path or relative path with a root path, return a Globus path str. Note: Paths must be POSIX or Globus-compliant paths. In other words for Windows systems the input root_path or absolute path must be passed through `as_globus_path` before calling this method. TODO include globus_path_from_dataset Parameters ---------- path : Path, PurePath, str An absolute or relative POSIX path root_path : Path, PurePath, str A root path to prepend. Optional if `path` is absolute. Returns ------- str A path string formatted for Globus. See Also -------- as_globus_path Raises ------ ValueError Path was not absolute and no root path was given. An absolute path must start with a slash on *nix systems. """ if isinstance(path, str): path = PurePosixPath(path) if root_path and not str(path).startswith(str(root_path)): path = PurePosixPath(root_path) / path if not path.is_absolute(): raise ValueError(f'{path} is relative and no root_path defined') return as_globus_path(path) @staticmethod def _ensure_uuid(uid): """ Ensures UUID object returned. Parameters ---------- uid : str, uuid.UUID A UUID to cast to UUID object. Returns ------- uuid.UUID A UUID object. """ return UUID(uid) if not isinstance(uid, UUID) else uid def _endpoint_id_root(self, endpoint): """ Return endpoint UUID and root path from a given endpoint identifier. Parameters ---------- endpoint : str, uuid.UUID An endpoint label or UUID. Returns ------- uuid.UUID The endpoint UUID. str, None The POSIX-style endpoint root path (if defined). Warnings -------- UserWarning If endpoint UUID is associated with multiple root paths, it is better to provide the endpoint label to avoid this warning and to ensure the intended root path is returned. See Also -------- Globus._sanitize_local """ root_path = None if endpoint in self.endpoints.keys(): endpoint_id = self.endpoints[endpoint]['id'] if 'root_path' in self.endpoints[endpoint].keys(): root_path = self.endpoints[endpoint]['root_path'] return self._sanitize_local(endpoint_id, root_path) elif is_uuid(endpoint, range(1, 5)): # If a UUID was provided, find the first endpoint with a root path with the UUID endpoint_id = self._ensure_uuid(endpoint) matching = ( k for k, v in self.endpoints.items() if v['id'] == endpoint_id and 'root_path' in v ) if name := next(matching, None): # Warn of ambiguity if multiple endpoints share a UUID if next(matching, None) is not None: warnings.warn( f'Multiple endpoints added with the same UUID, ' f'using root path from "{name}"') root_path = self.endpoints[name]['root_path'] else: root_path = None return self._sanitize_local(endpoint_id, root_path) else: raise ValueError( '"endpoint" must be a UUID or the label of an endpoint registered in this ' 'Globus instance. You can add endpoints via the add_endpoints method') def _sanitize_local(self, endpoint_id, root_path): """ Ensure local root path on Windows is POSIX-style. Parameters ---------- endpoint_id : uuid.UUID The endpoint UUID to determine if root path is local. root_path : pathlib.Path, str, None The root path to sanitize. Returns ------- endpoint_id : uuid.UUID The endpoint UUID, returned unchanged to match `Globus._endpoint_id_root` signature. str, None The root path as a POSIX style string, or None if root_path is None. Examples -------- Providing a local root path on Windows >>> glo = Globus() >>> uid = glo.endpoints['local']['id'] >>> glo._sanitize_local(uid, 'C:\\Data') UUID('50282ed5-3124-11ee-b977-482ae33bf6ca'), '/C/Data' Path left unchanged on *nix systems or when endpoint ID is not local >>> uid = UUID('c7c46cec-3124-11ee-bf50-482ae33bf6ca') >>> glo._sanitize_local(uid, 'C:\\Data') UUID('c7c46cec-3124-11ee-bf50-482ae33bf6ca'), 'C:\\Data' """ if not root_path: return endpoint_id, None # If the local root path is not explicitly a Windows Path and we're on windows, make sure # it's converted correctly to a POSIX style path if isinstance(root_path, str): is_win = sys.platform in ('win32', 'cygwin') if endpoint_id == self.endpoints['local']['id'] and is_win: root_path = PureWindowsPath(root_path) else: root_path = PurePosixPath(root_path) return endpoint_id, as_globus_path(root_path)
[docs] @ensure_logged_in def transfer_data(self, data_path, source_endpoint, destination_endpoint, recursive=False, **kwargs): """ Transfer one or more paths between endpoints. At least one of the endpoints must be a server endpoint. Both file and directory paths may be provided, however if recursive is true, all paths must be directories. Parameters ---------- data_path : str, list of str One or more data paths, relative to the endpoint root path. source_endpoint : str, uuid.UUID The name or UUID of the source endpoint. destination_endpoint : str, uuid.UUID The name or UUID of the destination endpoint. recursive : bool If true, transfer the contents of nested directories (NB: all data_paths must be directories). **kwargs See globus_sdk.TransferData. Returns ------- uuid.UUID The Globus transfer ID. Examples -------- Transfer two files (asynchronous) >>> glo = Globus() >>> files = ['file.ext', 'foo.bar'] >>> task_id = glo.transfer_data(files, 'source_endpoint', 'destination_endpoint') Transfer a file (synchronous) >>> file = 'file.ext' >>> task_id = glo.run_task(lambda: glo.transfer_data(file, 'src_endpoint', 'dst_endpoint')) Transfer a folder (asynchronous) >>> folder = 'path/to/folder' >>> task_id = glo.transfer_data( ... folder, 'source_endpoint', 'destination_endpoint', recursive=True) """ kwargs['source_endpoint'] = (source_endpoint if is_uuid(source_endpoint, versions=(1,)) else self.endpoints.get(source_endpoint)['id']) kwargs['destination_endpoint'] = (destination_endpoint if is_uuid(destination_endpoint, versions=(1,)) else self.endpoints.get(destination_endpoint)['id']) transfer_object = globus_sdk.TransferData(self.client, **kwargs) # add any number of items to the submission data for path in ensure_list(data_path): src = self._endpoint_path(path, self._endpoint_id_root(source_endpoint)[1]) dst = self._endpoint_path(path, self._endpoint_id_root(destination_endpoint)[1]) transfer_object.add_item(src, dst, recursive=recursive) response = self.client.submit_transfer(transfer_object) return UUID(response.data['task_id'])
[docs] @ensure_logged_in def delete_data(self, data_path, endpoint, recursive=False, **kwargs): """ Delete one or more paths within an endpoint. Both file and directory paths may be provided, however if recursive is true, all paths must be directories. Parameters ---------- data_path : str, list of str One or more data paths, relative to the endpoint root path. endpoint : str, uuid.UUID The name or UUID of the endpoint. recursive : bool If true, delete the contents of nested directories (NB: all data_paths must be directories). **kwargs See globus_sdk.DeleteData. Returns ------- uuid.UUID The Globus transfer ID. Examples -------- Delete two files, ingnoring those that don't exist (asynchronous) >>> glo = Globus() >>> files = ['file.ext', 'foo.bar'] >>> task_id = glo.delete_data(files, 'endpoint_name', ignore_missing=True) Delete a file (synchronous) >>> task_id = glo.run_task(lambda: glo.delete_data('file.ext', 'endpoint_name') Recursively delete a folder (asynchronous) >>> folder = 'path/to/folder' >>> task_id = glo.delete_data(folder, 'endpoint_name', recursive=True) """ kwargs['endpoint'] = (endpoint if is_uuid(endpoint, versions=(1,)) else self.endpoints.get(endpoint)['id']) delete_object = globus_sdk.DeleteData(self.client, recursive=recursive, **kwargs) # add any number of items to the submission data for path in ensure_list(data_path): fullpath = self._endpoint_path(path, self._endpoint_id_root(endpoint)[1]) delete_object.add_item(fullpath) response = self.client.submit_delete(delete_object) return UUID(response.data['task_id'])
[docs] @ensure_logged_in def ls(self, endpoint, path, remove_uuid=False, return_size=False, max_retries=1): """ Return the list of (filename, filesize) in a given endpoint directory. NB: If you're using ls routinely when transferring or deleting files you're probably doing something wrong! Parameters ---------- endpoint : uuid.UUID, str The Globus endpoint. May be a UUID or a key in the Globus.endpoints attribute. path : Path, PurePath, str The absolute or relative Globus path to list. Note: if endpoint is a UUID, the path must be absolute. remove_uuid : bool If True, remove the UUID from the returned filenames. return_size : bool If True, return the size of each listed file in bytes. max_retries : int The number of times to retry the remote operation before raising. Increasing this may mitigate unstable network issues. Returns ------- list A list of PurePosixPath objects of the files and folders listed, or if return_size is True, tuples of PurePosixPath objects and the corresponding file sizes. """ # Check if endpoint is a UUID, if not try to get UUID from registered endpoints endpoint_id, root_path = self._endpoint_id_root(endpoint) # Check if root_path should be added and if path is absolute path = self._endpoint_path(path, root_path) # Do the actual listing out = [] response = [] for i in range(max_retries + 1): try: response = self.client.operation_ls(endpoint_id, path=path) break except (GlobusConnectionError, GlobusAPIError) as ex: if i == max_retries: raise ex for entry in response: fn = PurePosixPath(remove_uuid_string(entry['name']) if remove_uuid else entry['name']) if return_size: size = entry['size'] if entry['type'] == 'file' else None out.append((fn, size)) else: out.append(fn) return out
# TODO: allow to move all content of a directory with 'recursive' keyword in add_item
[docs] @ensure_logged_in def mv(self, source_endpoint, target_endpoint, source_paths, target_paths, timeout=None, **kwargs): """ Move files from one endpoint to another. Parameters ---------- source_endpoint : uuid.UUID, str The Globus source endpoint. May be a UUID or a key in the Globus.endpoints attribute. target_endpoint : uuid.UUID, str The Globus destination endpoint. May be a UUID or a key in the Globus.endpoints attribute. source_paths : list of str, pathlib.Path or pathlib.PurePath The absolute or relative Globus paths of source files to moves. Note: if endpoint is a UUID, the path must be absolute. target_paths : list of str, Path or PurePath The absolute or relative Globus paths of destination files to moves. Note: if endpoint is a UUID, the path must be absolute. timeout : int Maximum time in seconds to wait for the task to complete. **kwargs Optional arguments for globus_sdk.TransferData. Returns ------- uuid.UUID A Globus task ID. """ source_endpoint, source_root = self._endpoint_id_root(source_endpoint) target_endpoint, target_root = self._endpoint_id_root(target_endpoint) source_paths = [str(self._endpoint_path(path, source_root)) for path in source_paths] target_paths = [str(self._endpoint_path(path, target_root)) for path in target_paths] tdata = globus_sdk.TransferData(self.client, source_endpoint, target_endpoint, verify_checksum=True, sync_level='checksum', label='ONE globus', **kwargs) for source_path, target_path in zip(source_paths, target_paths): tdata.add_item(source_path, target_path) def wrapper(): """Function to submit Globus transfer and return the resulting task ID""" response = self.client.submit_transfer(tdata) task_id = response.get('task_id', None) return task_id return self.run_task(wrapper, timeout=timeout)
[docs] @ensure_logged_in def run_task(self, globus_func, retries=3, timeout=None): """ Block until a Globus task finishes and retry upon Network or REST Errors. globus_func needs to submit a task to the client and return a task_id. Parameters ---------- globus_func : function, Callable A function that returns a Globus task ID, typically it will submit a transfer. retries : int The number of times to call globus_func if it raises a Globus error. timeout : int Maximum time in seconds to wait for the task to complete. Returns ------- uuid.UUID Globus task ID. Raises ------ IOError Timed out waiting for task to complete. TODO Add a quick fail option that returns when files missing, etc. TODO Add status logging """ try: task_id = globus_func() assert is_uuid(task_id, versions=(1, 2)), 'invalid UUID returned' print(f'Waiting for Globus task {task_id} to complete') # While the task with task is active, print a dot every second. Timeout after timeout i = 0 while not self.client.task_wait(task_id, timeout=5, polling_interval=1): print('.', end='') i += 1 if timeout and i >= timeout: task = self.client.get_task(task_id) raise IOError(f'Globus task {task_id} timed out after {timeout} seconds, ' f'with task status {task["status"]}') task = self.client.get_task(task_id) if task['status'] == 'SUCCEEDED': # Sometime Globus sets the status to SUCCEEDED but doesn't truly finish. # Handle error thrown when querying task_successful_transfers too early try: successful = self.client.task_successful_transfers(task_id) skipped = self.client.task_skipped_errors(task_id) print(f'\nGlobus task {task_id} completed.' f'\nSkipped transfers: {len(list(skipped))}' f'\nSuccessful transfers: {len(list(successful))}') for info in successful: _logger.debug(f'{info["source_path"]} -> {info["destination_path"]}') except TransferAPIError: _logger.warning(f'\nGlobus task {task_id} SUCCEEDED but querying transfers was' f'unsuccessful') else: raise IOError(f'Globus task finished unsuccessfully with status {task["status"]}') return self._ensure_uuid(task_id) except (GlobusAPIError, NetworkError, GlobusTimeoutError, GlobusConnectionError, GlobusConnectionTimeoutError) as e: if retries < 1: _logger.error('\nMax retries exceeded.') raise e else: _logger.debug('\nGlobus experienced a network error', exc_info=True) # if we reach this point without returning or erring, retry _logger.warning('\nGlobus experienced a network error, retrying.') self.run_task(globus_func, retries=(retries - 1), timeout=timeout)
[docs] @ensure_logged_in async def task_wait_async(self, task_id, polling_interval=10, timeout=10): """ Asynchronously wait until a Task is complete or fails, with a time limit. If the task status is ACTIVE after timout, returns False, otherwise returns True. Parameters ---------- task_id : str, uuid.UUID A Globus task UUID to wait on for completion. polling_interval : float Number of seconds between queries to Globus about the task status. Minimum 1 second. timeout : float Number of seconds to wait in total. Minimum 1 second. Returns ------- bool True if status not ACTIVE before timeout. False if status still ACTIVE at timeout. Examples -------- Asynchronously await a task to complete >>> await Globus().task_wait_async(task_id) """ if polling_interval < 1: raise GlobusSDKUsageError('polling_interval must be at least 1 second') if timeout < 1: raise GlobusSDKUsageError('timout must be at least 1 second') polling_interval = min(timeout, polling_interval) waited_time = 0 while True: task = self.client.get_task(task_id) status = task['status'] if status != 'ACTIVE': return True # check if we timed out before sleeping again waited_time += polling_interval if waited_time >= timeout: return False await asyncio.sleep(polling_interval)