one.remote.globus

A module for handling file operations through the Globus SDK.

Setup

To set up Globus simply instantiate the Globus class for the first time and follow the prompts. Providing a client name string to the constructor allows one to set up multiple Globus clients (i.e. when switching between different Globus client IDs).

In order to use this function you need:

  1. The client ID of an existing Globus Client (see this tutorial).

  2. Set up Global Connect on your local device.

  3. Register your local device as an endpoint in your Globus Client.

To modify the settings for a pre-established client, call the Globus.setup method with the client name:

>>> globus = Globus.setup('default')

You can update the list of endpoints using the fetch_endpoints_from_alyx method:

>>> globus = Globus('admin')
>>> remote_endpoints = globus.fetch_endpoints_from_alyx(alyx=AlyxClient())

The endpoints are stored in the endpoints property

>>> print(globus.endpoints.keys())
>>> print(globus.endpoints['local'])

Examples

Get the full Globus file path

>>> relative_path = 'subject/2020-01-01/001/alf/_ibl_trials.table.pqt'
>>> full_path = globus.to_address(relative_path, 'flatiron_cortexlab')

Log in with a limited time token

>>> globus = Globus('admin')
>>> globus.login(stay_logged_in=False)

Log out of Globus, revoking and deleting all tokens

>>> globus.logout()
>>> assert not globus.is_logged_in

Asynchronously transfer data between Alyx repositories

>>> alyx = AlyxClient()
>>> glo = Globus('admin')
>>> glo.add_endpoint('flatiron_cortexlab', alyx=alyx)
>>> glo.add_endpoint('cortex_lab_SR', alyx=alyx)
>>> task_id = glo.transfer_data('path/to/file', 'flatiron_cortexlab', 'cortex_lab_SR')

Synchronously transfer data to an alternate local location

>>> from functools import partial
>>> root_path = '/path/to/new/location'
>>> glo.add_endpoint(get_local_endpoint_id(), label='alternate_local', root_path=root_path)
>>> folder = 'camera/ZFM-01867/2021-03-23/002'  # An example folder to download
>>> task = partial(glo.transfer_data, folder, 'integration', 'integration_local',
...                label='alternate data', recursive=True)
>>> task_id = glo.run_task(task)  # Submit task to Globus and await completion

Temporarily change local data root path and synchronously download file

>>> glo.endpoints['local']['root_path'] = '/path/to/new/location'
>>> file = glo.download_file('path/to/file.ext', 'source_endpoint')
Path('/path/to/new/location/path/to/file.ext')

Await multiple tasks to complete by passing a list of Globus tranfer IDs

>>> import asyncio
>>> tasks = [asyncio.create_task(globus.task_wait_async(task_id))) for task_id in task_ids]
>>> success = asyncio.run(asyncio.gather(*tasks))

Module attributes

CLIENT_KEY

The default key in the remote settings file

DEFAULT_PAR

The default Globus parameter fields

STATUS_MAP

A map of Globus status to "nice" status

Functions

as_globus_path

Convert a path into one suitable for the Globus TransferClient.

create_globus_client

Creates a Globus transfer client based on existing parameter file.

ensure_logged_in

Decorator for the Globus methods.

get_lab_from_endpoint_id

Extracts lab names associated with a given an endpoint UUID.

get_local_endpoint_id

Extracts the ID of the local Globus Connect endpoint.

get_local_endpoint_paths

Extracts the local endpoint paths accessible by Globus Connect.

get_token

Get a Globus authentication token.

Classes

Globus

class Globus(client_name='default', connect=True, headless=False)[source]

Bases: DownloadClient

property is_logged_in

Check if client exists and is authenticated

Type:

bool

login(stay_logged_in=None)[source]

Authenticate Globus client.

Parameters:

stay_logged_in (bool, optional) – If True, use refresh token to remain logged in for longer. If False, use an auth token without the option of refreshing when expired. If not specified, uses the refresh token if available.

logout()[source]

Revoke any tokens and delete them from the client and parameter file.

fetch_endpoints_from_alyx(alyx=None, overwrite=False)[source]

Update endpoints property with Alyx Globus data repositories.

Parameters:
  • alyx (one.webclient.AlyxClient) – An optional AlyxClient.

  • overwrite (bool) – Whether existing endpoint with the same label should be replaced.

Returns:

The endpoints added from Alyx.

Return type:

dict

to_address(data_path, endpoint)[source]

Get full path for a given endpoint.

Parameters:
  • data_path (Path, PurePath, str) – An absolute or relative POSIX path

  • endpoint (str, uuid.UUID) – An endpoint label or UUID.

Returns:

A complete path string formatted for Globus.

Return type:

str

Examples

>>> glo = Globus()
>>> glo.add_endpoint('0ec47586-3a19-11eb-b173-0ee0d5d9299f',
...                  label='foobar', root_path='/foo')
>>> glo.to_address('bar/baz.ext', 'foobar')
'/foo/bar/baz.ext'
download_file(file_address, source_endpoint, recursive=False, **kwargs)[source]

Download one or more files via Globus.

Parameters:
  • file_address (str, list of str) – One or more relative POSIX paths to download.

  • source_endpoint (str, uuid.UUID) – The source endpoint name or uuid.

  • recursive (bool) – If true, transfer the contents of nested directories (NB: all data_paths must be directories).

  • **kwargs – See Globus.transfer_data.

Returns:

The downloaded file path(s). If recursive is True, a list is always returned.

Return type:

pathlib.Path, list of pathlib.Path

Notes

  • Assumes that the local endpoint root path is NOT POSIX style on Windows.

TODO Return None for failed files

Examples

Download a single file

>>> file = Globus().download_file('path/to/file', '0ec47586-3a19-11eb-b173-0ee0d5d9299f')

Download multiple files and verify checksum

>>> files = ['relative/file/path.ext', 'foo.bar']
>>> files = Globus().download_file(files, 'source_endpoint_name', verify_checksum=True)

Download a folder

>>> files = Globus().download_file('folder/path', 'source_endpoint_name', recursive=True)
static setup(client_name='default', **kwargs)[source]

Setup a Globus client.

In order to use this function you need:

  1. The client ID of an existing Globus Client (see this tutorial).

  2. Set up Global Connect on your local device.

  3. Register your local device as an endpoint in your Globus Client.

Parameters:
  • client_name (str) – Parameter profile name to set up e.g. ‘default’, ‘admin’.

  • **kwargs – Optional Globus constructor arguments.

Returns:

A new Globus client object.

Return type:

Globus

add_endpoint(endpoint, label=None, root_path=None, overwrite=False, alyx=None)[source]

Add an endpoint to the Globus instance to be used by other functions.

Parameters:
  • endpoint (uuid.UUID, str) – The endpoint UUID or database repository name of the endpoint.

  • label (str) – Label to access the endpoint. If endpoint is UUID this has to be set, otherwise is optional.

  • root_path (str, pathlib.Path, pathlib.PurePath) – File path to be accessed by Globus on the endpoint.

  • overwrite (bool) – Whether existing endpoint with the same label should be replaced.

  • alyx (one.webclient.AlyxClient) – An AlyxClient instance for looking up repository information.

transfer_data(data_path, source_endpoint, destination_endpoint, recursive=False, **kwargs)[source]

Transfer one or more paths between endpoints.

At least one of the endpoints must be a server endpoint. Both file and directory paths may be provided, however if recursive is true, all paths must be directories.

Parameters:
  • data_path (str, list of str) – One or more data paths, relative to the endpoint root path.

  • source_endpoint (str, uuid.UUID) – The name or UUID of the source endpoint.

  • destination_endpoint (str, uuid.UUID) – The name or UUID of the destination endpoint.

  • recursive (bool) – If true, transfer the contents of nested directories (NB: all data_paths must be directories).

  • **kwargs – See globus_sdk.TransferData.

Returns:

The Globus transfer ID.

Return type:

uuid.UUID

Examples

Transfer two files (asynchronous)

>>> glo = Globus()
>>> files = ['file.ext', 'foo.bar']
>>> task_id = glo.transfer_data(files, 'source_endpoint', 'destination_endpoint')

Transfer a file (synchronous) >>> file = ‘file.ext’ >>> task_id = glo.run_task(lambda: glo.transfer_data(file, ‘src_endpoint’, ‘dst_endpoint’))

Transfer a folder (asynchronous)

>>> folder = 'path/to/folder'
>>> task_id = glo.transfer_data(
...    folder, 'source_endpoint', 'destination_endpoint', recursive=True)
delete_data(data_path, endpoint, recursive=False, **kwargs)[source]

Delete one or more paths within an endpoint.

Both file and directory paths may be provided, however if recursive is true, all paths must be directories.

Parameters:
  • data_path (str, list of str) – One or more data paths, relative to the endpoint root path.

  • endpoint (str, uuid.UUID) – The name or UUID of the endpoint.

  • recursive (bool) – If true, delete the contents of nested directories (NB: all data_paths must be directories).

  • **kwargs – See globus_sdk.DeleteData.

Returns:

The Globus transfer ID.

Return type:

uuid.UUID

Examples

Delete two files, ingnoring those that don’t exist (asynchronous)

>>> glo = Globus()
>>> files = ['file.ext', 'foo.bar']
>>> task_id = glo.delete_data(files, 'endpoint_name', ignore_missing=True)

Delete a file (synchronous)

>>> task_id = glo.run_task(lambda: glo.delete_data('file.ext', 'endpoint_name')

Recursively delete a folder (asynchronous)

>>> folder = 'path/to/folder'
>>> task_id = glo.delete_data(folder, 'endpoint_name', recursive=True)
ls(endpoint, path, remove_uuid=False, return_size=False, max_retries=1)[source]

Return the list of (filename, filesize) in a given endpoint directory. NB: If you’re using ls routinely when transferring or deleting files you’re probably doing something wrong!

Parameters:
  • endpoint (uuid.UUID, str) – The Globus endpoint. May be a UUID or a key in the Globus.endpoints attribute.

  • path (Path, PurePath, str) – The absolute or relative Globus path to list. Note: if endpoint is a UUID, the path must be absolute.

  • remove_uuid (bool) – If True, remove the UUID from the returned filenames.

  • return_size (bool) – If True, return the size of each listed file in bytes.

  • max_retries (int) – The number of times to retry the remote operation before raising. Increasing this may mitigate unstable network issues.

Returns:

A list of PurePosixPath objects of the files and folders listed, or if return_size is True, tuples of PurePosixPath objects and the corresponding file sizes.

Return type:

list

mv(source_endpoint, target_endpoint, source_paths, target_paths, timeout=None, **kwargs)[source]

Move files from one endpoint to another.

Parameters:
  • source_endpoint (uuid.UUID, str) – The Globus source endpoint. May be a UUID or a key in the Globus.endpoints attribute.

  • target_endpoint (uuid.UUID, str) – The Globus destination endpoint. May be a UUID or a key in the Globus.endpoints attribute.

  • source_paths (list of str, pathlib.Path or pathlib.PurePath) – The absolute or relative Globus paths of source files to moves. Note: if endpoint is a UUID, the path must be absolute.

  • target_paths (list of str, Path or PurePath) – The absolute or relative Globus paths of destination files to moves. Note: if endpoint is a UUID, the path must be absolute.

  • timeout (int) – Maximum time in seconds to wait for the task to complete.

  • **kwargs – Optional arguments for globus_sdk.TransferData.

Returns:

A Globus task ID.

Return type:

uuid.UUID

run_task(globus_func, retries=3, timeout=None)[source]

Block until a Globus task finishes and retry upon Network or REST Errors. globus_func needs to submit a task to the client and return a task_id.

Parameters:
  • globus_func (function, Callable) – A function that returns a Globus task ID, typically it will submit a transfer.

  • retries (int) – The number of times to call globus_func if it raises a Globus error.

  • timeout (int) – Maximum time in seconds to wait for the task to complete.

Returns:

Globus task ID.

Return type:

uuid.UUID

Raises:
  • IOError – Timed out waiting for task to complete.

  • TODO Add a quick fail option that returns when files missing, etc.

  • TODO Add status logging

task_wait_async(task_id, polling_interval=10, timeout=10)[source]

Asynchronously wait until a Task is complete or fails, with a time limit.

If the task status is ACTIVE after timout, returns False, otherwise returns True.

Parameters:
  • task_id (str, uuid.UUID) – A Globus task UUID to wait on for completion.

  • polling_interval (float) – Number of seconds between queries to Globus about the task status. Minimum 1 second.

  • timeout (float) – Number of seconds to wait in total. Minimum 1 second.

Returns:

True if status not ACTIVE before timeout. False if status still ACTIVE at timeout.

Return type:

bool

Examples

Asynchronously await a task to complete

>>> await Globus().task_wait_async(task_id)
get_lab_from_endpoint_id(endpoint=None, alyx=None)[source]

Extracts lab names associated with a given an endpoint UUID.

Finds the lab names that are associated to data repositories with the provided Globus endpoint UUID.

Parameters:
  • endpoint (uuid.UUID, str) – Endpoint UUID, optional if not given will get attempt to find local endpoint UUID.

  • alyx (one.webclient.AlyxClient) – An instance of AlyxClient to use.

Returns:

The lab names associated with the endpoint UUID.

Return type:

list

as_globus_path(path)[source]

Convert a path into one suitable for the Globus TransferClient.

Parameters:

path (pathlib.Path, pathlib.PurePath, str) – A path to convert to a Globus-complient path string.

Returns:

A formatted path string.

Return type:

str

Notes

  • If using tilda in path, the home folder of your Globus Connect instance must be the same as the OS home dir.

  • If validating a path for another system ensure the input path is a PurePath, in particular, on a Linux computer a remote Windows should first be made into a PureWindowsPath.

Examples

A Windows path (on Windows OS)

>>> as_globus_path('E:\FlatIron\integration')
'/E/FlatIron/integration'

When explicitly a POSIX path, remains unchanged

>>> as_globus_path(PurePosixPath('E:\FlatIron\integration'))
'E:\FlatIron\integration'

A relative POSIX path (on *nix OS)

>>> as_globus_path('../data/integration')
'/mnt/data/integration'

A valid Globus path remains unchanged

>>> as_globus_path('/E/FlatIron/integration')
'/E/FlatIron/integration'