one.remote.globus
A module for handling file operations through the Globus SDK.
Setup
To set up Globus simply instantiate the Globus class for the first time and follow the prompts. Providing a client name string to the constructor allows one to set up multiple Globus clients (i.e. when switching between different Globus client IDs).
In order to use this function you need:
The client ID of an existing Globus Client (see this tutorial).
Set up Global Connect on your local device.
Register your local device as an endpoint in your Globus Client.
To modify the settings for a pre-established client, call the Globus.setup method with the client name:
>>> globus = Globus.setup('default')
You can update the list of endpoints using the fetch_endpoints_from_alyx method:
>>> globus = Globus('admin')
>>> remote_endpoints = globus.fetch_endpoints_from_alyx(alyx=AlyxClient())
The endpoints are stored in the endpoints property
>>> print(globus.endpoints.keys())
>>> print(globus.endpoints['local'])
Examples
Get the full Globus file path
>>> relative_path = 'subject/2020-01-01/001/alf/_ibl_trials.table.pqt'
>>> full_path = globus.to_address(relative_path, 'flatiron_cortexlab')
Log in with a limited time token
>>> globus = Globus('admin')
>>> globus.login(stay_logged_in=False)
Log out of Globus, revoking and deleting all tokens
>>> globus.logout()
>>> assert not globus.is_logged_in
Asynchronously transfer data between Alyx repositories
>>> alyx = AlyxClient()
>>> glo = Globus('admin')
>>> glo.add_endpoint('flatiron_cortexlab', alyx=alyx)
>>> glo.add_endpoint('cortex_lab_SR', alyx=alyx)
>>> task_id = glo.transfer_data('path/to/file', 'flatiron_cortexlab', 'cortex_lab_SR')
Synchronously transfer data to an alternate local location
>>> from functools import partial
>>> root_path = '/path/to/new/location'
>>> glo.add_endpoint(get_local_endpoint_id(), label='alternate_local', root_path=root_path)
>>> folder = 'camera/ZFM-01867/2021-03-23/002' # An example folder to download
>>> task = partial(glo.transfer_data, folder, 'integration', 'integration_local',
... label='alternate data', recursive=True)
>>> task_id = glo.run_task(task) # Submit task to Globus and await completion
Temporarily change local data root path and synchronously download file
>>> glo.endpoints['local']['root_path'] = '/path/to/new/location'
>>> file = glo.download_file('path/to/file.ext', 'source_endpoint')
Path('/path/to/new/location/path/to/file.ext')
Await multiple tasks to complete by passing a list of Globus tranfer IDs
>>> import asyncio
>>> tasks = [asyncio.create_task(globus.task_wait_async(task_id))) for task_id in task_ids]
>>> success = asyncio.run(asyncio.gather(*tasks))
Module attributes
The default key in the remote settings file |
|
The default Globus parameter fields |
|
A map of Globus status to "nice" status |
Functions
Convert a path into one suitable for the Globus TransferClient. |
|
|
Creates a Globus transfer client based on existing parameter file. |
|
Decorator for the Globus methods. |
Extracts lab names associated with a given an endpoint UUID. |
|
|
Extracts the ID of the local Globus Connect endpoint. |
|
Extracts the local endpoint paths accessible by Globus Connect. |
|
Get a Globus authentication token. |
Classes
- class Globus(client_name='default', connect=True, headless=False)[source]
Bases:
DownloadClient
- property is_logged_in
Check if client exists and is authenticated
- Type:
bool
- login(stay_logged_in=None)[source]
Authenticate Globus client.
- Parameters:
stay_logged_in (bool, optional) – If True, use refresh token to remain logged in for longer. If False, use an auth token without the option of refreshing when expired. If not specified, uses the refresh token if available.
- fetch_endpoints_from_alyx(alyx=None, overwrite=False)[source]
Update endpoints property with Alyx Globus data repositories.
- Parameters:
alyx (one.webclient.AlyxClient) – An optional AlyxClient.
overwrite (bool) – Whether existing endpoint with the same label should be replaced.
- Returns:
The endpoints added from Alyx.
- Return type:
dict
- to_address(data_path, endpoint)[source]
Get full path for a given endpoint.
- Parameters:
data_path (Path, PurePath, str) – An absolute or relative POSIX path
endpoint (str, uuid.UUID) – An endpoint label or UUID.
- Returns:
A complete path string formatted for Globus.
- Return type:
str
Examples
>>> glo = Globus() >>> glo.add_endpoint('0ec47586-3a19-11eb-b173-0ee0d5d9299f', ... label='foobar', root_path='/foo') >>> glo.to_address('bar/baz.ext', 'foobar') '/foo/bar/baz.ext'
- download_file(file_address, source_endpoint, recursive=False, **kwargs)[source]
Download one or more files via Globus.
- Parameters:
file_address (str, list of str) – One or more relative POSIX paths to download.
source_endpoint (str, uuid.UUID) – The source endpoint name or uuid.
recursive (bool) – If true, transfer the contents of nested directories (NB: all data_paths must be directories).
**kwargs – See Globus.transfer_data.
- Returns:
The downloaded file path(s). If recursive is True, a list is always returned.
- Return type:
pathlib.Path, list of pathlib.Path
Notes
Assumes that the local endpoint root path is NOT POSIX style on Windows.
TODO Return None for failed files
Examples
Download a single file
>>> file = Globus().download_file('path/to/file', '0ec47586-3a19-11eb-b173-0ee0d5d9299f')
Download multiple files and verify checksum
>>> files = ['relative/file/path.ext', 'foo.bar'] >>> files = Globus().download_file(files, 'source_endpoint_name', verify_checksum=True)
Download a folder
>>> files = Globus().download_file('folder/path', 'source_endpoint_name', recursive=True)
- static setup(client_name='default', **kwargs)[source]
Setup a Globus client.
In order to use this function you need:
The client ID of an existing Globus Client (see this tutorial).
Set up Global Connect on your local device.
Register your local device as an endpoint in your Globus Client.
- Parameters:
client_name (str) – Parameter profile name to set up e.g. ‘default’, ‘admin’.
**kwargs – Optional Globus constructor arguments.
- Returns:
A new Globus client object.
- Return type:
- add_endpoint(endpoint, label=None, root_path=None, overwrite=False, alyx=None)[source]
Add an endpoint to the Globus instance to be used by other functions.
- Parameters:
endpoint (uuid.UUID, str) – The endpoint UUID or database repository name of the endpoint.
label (str) – Label to access the endpoint. If endpoint is UUID this has to be set, otherwise is optional.
root_path (str, pathlib.Path, pathlib.PurePath) – File path to be accessed by Globus on the endpoint.
overwrite (bool) – Whether existing endpoint with the same label should be replaced.
alyx (one.webclient.AlyxClient) – An AlyxClient instance for looking up repository information.
- transfer_data(data_path, source_endpoint, destination_endpoint, recursive=False, **kwargs)[source]
Transfer one or more paths between endpoints.
At least one of the endpoints must be a server endpoint. Both file and directory paths may be provided, however if recursive is true, all paths must be directories.
- Parameters:
data_path (str, list of str) – One or more data paths, relative to the endpoint root path.
source_endpoint (str, uuid.UUID) – The name or UUID of the source endpoint.
destination_endpoint (str, uuid.UUID) – The name or UUID of the destination endpoint.
recursive (bool) – If true, transfer the contents of nested directories (NB: all data_paths must be directories).
**kwargs – See globus_sdk.TransferData.
- Returns:
The Globus transfer ID.
- Return type:
uuid.UUID
Examples
Transfer two files (asynchronous)
>>> glo = Globus() >>> files = ['file.ext', 'foo.bar'] >>> task_id = glo.transfer_data(files, 'source_endpoint', 'destination_endpoint')
Transfer a file (synchronous) >>> file = ‘file.ext’ >>> task_id = glo.run_task(lambda: glo.transfer_data(file, ‘src_endpoint’, ‘dst_endpoint’))
Transfer a folder (asynchronous)
>>> folder = 'path/to/folder' >>> task_id = glo.transfer_data( ... folder, 'source_endpoint', 'destination_endpoint', recursive=True)
- delete_data(data_path, endpoint, recursive=False, **kwargs)[source]
Delete one or more paths within an endpoint.
Both file and directory paths may be provided, however if recursive is true, all paths must be directories.
- Parameters:
data_path (str, list of str) – One or more data paths, relative to the endpoint root path.
endpoint (str, uuid.UUID) – The name or UUID of the endpoint.
recursive (bool) – If true, delete the contents of nested directories (NB: all data_paths must be directories).
**kwargs – See globus_sdk.DeleteData.
- Returns:
The Globus transfer ID.
- Return type:
uuid.UUID
Examples
Delete two files, ingnoring those that don’t exist (asynchronous)
>>> glo = Globus() >>> files = ['file.ext', 'foo.bar'] >>> task_id = glo.delete_data(files, 'endpoint_name', ignore_missing=True)
Delete a file (synchronous)
>>> task_id = glo.run_task(lambda: glo.delete_data('file.ext', 'endpoint_name')
Recursively delete a folder (asynchronous)
>>> folder = 'path/to/folder' >>> task_id = glo.delete_data(folder, 'endpoint_name', recursive=True)
- ls(endpoint, path, remove_uuid=False, return_size=False, max_retries=1)[source]
Return the list of (filename, filesize) in a given endpoint directory. NB: If you’re using ls routinely when transferring or deleting files you’re probably doing something wrong!
- Parameters:
endpoint (uuid.UUID, str) – The Globus endpoint. May be a UUID or a key in the Globus.endpoints attribute.
path (Path, PurePath, str) – The absolute or relative Globus path to list. Note: if endpoint is a UUID, the path must be absolute.
remove_uuid (bool) – If True, remove the UUID from the returned filenames.
return_size (bool) – If True, return the size of each listed file in bytes.
max_retries (int) – The number of times to retry the remote operation before raising. Increasing this may mitigate unstable network issues.
- Returns:
A list of PurePosixPath objects of the files and folders listed, or if return_size is True, tuples of PurePosixPath objects and the corresponding file sizes.
- Return type:
list
- mv(source_endpoint, target_endpoint, source_paths, target_paths, timeout=None, **kwargs)[source]
Move files from one endpoint to another.
- Parameters:
source_endpoint (uuid.UUID, str) – The Globus source endpoint. May be a UUID or a key in the Globus.endpoints attribute.
target_endpoint (uuid.UUID, str) – The Globus destination endpoint. May be a UUID or a key in the Globus.endpoints attribute.
source_paths (list of str, pathlib.Path or pathlib.PurePath) – The absolute or relative Globus paths of source files to moves. Note: if endpoint is a UUID, the path must be absolute.
target_paths (list of str, Path or PurePath) – The absolute or relative Globus paths of destination files to moves. Note: if endpoint is a UUID, the path must be absolute.
timeout (int) – Maximum time in seconds to wait for the task to complete.
**kwargs – Optional arguments for globus_sdk.TransferData.
- Returns:
A Globus task ID.
- Return type:
uuid.UUID
- run_task(globus_func, retries=3, timeout=None)[source]
Block until a Globus task finishes and retry upon Network or REST Errors. globus_func needs to submit a task to the client and return a task_id.
- Parameters:
globus_func (function, Callable) – A function that returns a Globus task ID, typically it will submit a transfer.
retries (int) – The number of times to call globus_func if it raises a Globus error.
timeout (int) – Maximum time in seconds to wait for the task to complete.
- Returns:
Globus task ID.
- Return type:
uuid.UUID
- Raises:
IOError – Timed out waiting for task to complete.
TODO Add a quick fail option that returns when files missing, etc. –
TODO Add status logging –
- task_wait_async(task_id, polling_interval=10, timeout=10)[source]
Asynchronously wait until a Task is complete or fails, with a time limit.
If the task status is ACTIVE after timout, returns False, otherwise returns True.
- Parameters:
task_id (str, uuid.UUID) – A Globus task UUID to wait on for completion.
polling_interval (float) – Number of seconds between queries to Globus about the task status. Minimum 1 second.
timeout (float) – Number of seconds to wait in total. Minimum 1 second.
- Returns:
True if status not ACTIVE before timeout. False if status still ACTIVE at timeout.
- Return type:
bool
Examples
Asynchronously await a task to complete
>>> await Globus().task_wait_async(task_id)
- get_lab_from_endpoint_id(endpoint=None, alyx=None)[source]
Extracts lab names associated with a given an endpoint UUID.
Finds the lab names that are associated to data repositories with the provided Globus endpoint UUID.
- Parameters:
endpoint (uuid.UUID, str) – Endpoint UUID, optional if not given will get attempt to find local endpoint UUID.
alyx (one.webclient.AlyxClient) – An instance of AlyxClient to use.
- Returns:
The lab names associated with the endpoint UUID.
- Return type:
list
- as_globus_path(path)[source]
Convert a path into one suitable for the Globus TransferClient.
- Parameters:
path (pathlib.Path, pathlib.PurePath, str) – A path to convert to a Globus-complient path string.
- Returns:
A formatted path string.
- Return type:
str
Notes
If using tilda in path, the home folder of your Globus Connect instance must be the same as the OS home dir.
If validating a path for another system ensure the input path is a PurePath, in particular, on a Linux computer a remote Windows should first be made into a PureWindowsPath.
Examples
A Windows path (on Windows OS)
>>> as_globus_path('E:\FlatIron\integration') '/E/FlatIron/integration'
When explicitly a POSIX path, remains unchanged
>>> as_globus_path(PurePosixPath('E:\FlatIron\integration')) 'E:\FlatIron\integration'
A relative POSIX path (on *nix OS)
>>> as_globus_path('../data/integration') '/mnt/data/integration'
A valid Globus path remains unchanged
>>> as_globus_path('/E/FlatIron/integration') '/E/FlatIron/integration'