"""A backend to download IBL data from AWS Buckets.
Examples
--------
Without any credentials, to download a public file from the IBL public bucket:
>>> from one.remote import aws
... source = 'caches/unit_test/cache_info.json'
... destination = '/home/olivier/scratch/cache_info.json'
... aws.s3_download_file(source, destination)
For a folder, the following:
>>> source = 'caches/unit_test'
>>> destination = '/home/olivier/scratch/caches/unit_test'
>>> local_files = aws.s3_download_folder(source, destination)
"""
import re
from pathlib import Path, PurePosixPath
import logging
import urllib.parse
from tqdm import tqdm
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
_logger = logging.getLogger(__name__)
REPO_DEFAULT = 'aws_cortexlab'
S3_BUCKET_IBL = 'ibl-brain-wide-map-public'
REGION_NAME = 'us-east-1'
def _callback_hook(t):
"""A callback hook for boto3.download_file to update the progress bar.
Parameters
----------
t : tqdm.tqdm
An tqdm instance used as the progress bar.
See Also
--------
https://gist.github.com/wy193777/e7607d12fad13459e8992d4f69b53586
For example that uses actual file size:
https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/s3/transfer.html
"""
def inner(bytes_amount):
t.update(bytes_amount)
return inner
[docs]
def get_s3_virtual_host(uri, region) -> str:
"""
Convert a given bucket URI to a generic Amazon virtual host URL.
URI may be the bucket (+ path) or a full URI starting with 's3://'
.. _S3 documentation
https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html#virtual-host-style-url-ex
Parameters
----------
uri : str
The bucket name or full path URI.
region : str
The region, e.g. eu-west-1.
Returns
-------
str
The Web URL (virtual host name and https scheme).
"""
assert region and re.match(r'\w{2}-\w+-[1-3]', region), 'Invalid region'
parsed = urllib.parse.urlparse(uri) # remove scheme if necessary
key = parsed.path.strip('/').split('/')
bucket = parsed.netloc or key.pop(0)
hostname = f"{bucket}.{parsed.scheme or 's3'}.{region}.amazonaws.com"
return 'https://' + '/'.join((hostname, *key))
[docs]
def url2uri(data_path, return_location=False):
"""
Convert a generic Amazon virtual host URL to an S3 URI.
Parameters
----------
data_path : str
An Amazon virtual host URL to convert.
return_location : bool
If true, additionally returns the location string.
Returns
-------
str
An S3 URI with scheme 's3://'.
str
If return_location is true, returns the bucket location, e.g. 'eu-east-1'.
"""
parsed = urllib.parse.urlparse(data_path)
assert parsed.netloc and parsed.scheme and parsed.path
bucket_name, _, loc, *_ = parsed.netloc.split('.')
uri = f's3://{bucket_name}{parsed.path}'
return (uri, loc) if return_location else uri
[docs]
def is_folder(obj_summery) -> bool:
"""
Given an S3 ObjectSummery instance, returns true if the associated object is a directory.
Parameters
----------
obj_summery : s3.ObjectSummery
An S3 ObjectSummery instance to test.
Returns
-------
bool
True if object is a directory.
"""
return obj_summery.key.endswith('/') and obj_summery.size == 0
[docs]
def get_aws_access_keys(alyx, repo_name=REPO_DEFAULT):
"""
Query Alyx database to get credentials in the json field of an aws repository.
Parameters
----------
alyx : one.webclient.AlyxInstance
An instance of alyx.
repo_name : str
The data repository name in Alyx from which to fetch the S3 access keys.
Returns
-------
dict
The API access keys and region name to use with boto3.
str
The name of the S3 bucket associated with the Alyx data repository.
"""
repo_json = alyx.rest('data-repository', 'read', id=repo_name)['json']
bucket_name = repo_json['bucket_name']
session_keys = {
'aws_access_key_id': repo_json.get('Access key ID', None),
'aws_secret_access_key': repo_json.get('Secret access key', None),
'region_name': repo_json.get('region_name', None)
}
return session_keys, bucket_name
[docs]
def get_s3_public():
"""
Retrieve the IBL public S3 service resource.
Returns
-------
s3.ServiceResource
An S3 ServiceResource instance with the provided.
str
The name of the S3 bucket.
"""
session = boto3.Session(region_name=REGION_NAME)
s3 = session.resource('s3', config=Config(signature_version=UNSIGNED))
return s3, S3_BUCKET_IBL
[docs]
def get_s3_allen():
"""
Retrieve the Allen public S3 service resource.
Returns
-------
s3.ServiceResource
An S3 ServiceResource instance with the provided.
str
The name of the S3 bucket.
"""
S3_BUCKET_ALLEN = 'allen-brain-cell-atlas'
session = boto3.Session(region_name='us-west-2')
s3 = session.resource('s3', config=Config(signature_version=UNSIGNED))
return s3, S3_BUCKET_ALLEN
[docs]
def get_s3_from_alyx(alyx, repo_name=REPO_DEFAULT):
"""
Create an S3 resource instance using credentials from an Alyx data repository.
Parameters
----------
alyx : one.webclient.AlyxInstance
An instance of alyx.
repo_name : str
The data repository name in Alyx from which to fetch the S3 access keys.
Returns
-------
s3.ServiceResource
An S3 ServiceResource instance with the provided.
str
The name of the S3 bucket.
Notes
-----
- If no credentials are present in the database, boto3 will use environment config or default
AWS profile settings instead.
- If there are no credentials for the bucket and the bucket has 'public' in the name, the
returned resource will use an unsigned signature.
"""
session_keys, bucket_name = get_aws_access_keys(alyx, repo_name)
no_creds = not any(filter(None, (v for k, v in session_keys.items() if 'key' in k.lower())))
session = boto3.Session(**session_keys)
if no_creds and 'public' in bucket_name.lower():
config = Config(signature_version=UNSIGNED)
else:
config = None
s3 = session.resource('s3', config=config)
return s3, bucket_name
[docs]
def s3_download_file(source, destination, s3=None, bucket_name=None, overwrite=False):
"""
Downloads a file from an S3 instance to a local folder.
Parameters
----------
source : str, pathlib.Path, pathlib.PurePosixPath
Relative path (key) within the bucket, for example: 'atlas/dorsal_cortex_50.nrrd'.
destination : str, pathlib.Path
The full file path on local machine.
s3 : s3.ServiceResource
An S3 ServiceResource instance. Defaults to the IBL public instance.
bucket_name : str
The name of the bucket to access. Defaults to the public IBL repository.
overwrite : bool
If True, will re-download files even if the file sizes match.
Returns
-------
pathlib.Path
The local file path of the downloaded file.
"""
destination = Path(destination)
destination.parent.mkdir(parents=True, exist_ok=True)
if s3 is None:
s3, bucket_name = get_s3_public()
try:
file_object = s3.Object(bucket_name, Path(source).as_posix())
filesize = file_object.content_length
if not overwrite and destination.exists() and filesize == destination.stat().st_size:
_logger.debug(f"{destination} exists and match size -- skipping")
return destination
with tqdm(total=filesize, unit='B',
unit_scale=True, desc=f'(S3) {destination}') as t:
file_object.download_file(Filename=str(destination), Callback=_callback_hook(t))
except (NoCredentialsError, PartialCredentialsError) as ex:
raise ex # Credentials need updating in Alyx # pragma: no cover
except ClientError as ex:
if ex.response.get('Error', {}).get('Code', None) == '404':
_logger.error(f'File {source} not found on {bucket_name}')
return None
else:
raise ex
return destination
[docs]
def s3_download_folder(source, destination, s3=None, bucket_name=S3_BUCKET_IBL, overwrite=False):
"""
Downloads S3 folder content to a local folder.
Parameters
----------
source : str
Relative path (key) within the bucket, for example: 'spikesorting/benchmark'.
destination : str, pathlib.Path
Local folder path. Note: The contents of the source folder will be downloaded to
`destination`, not the folder itself.
s3 : s3.ServiceResource
An S3 ServiceResource instance. Defaults to the IBL public instance.
bucket_name : str
The name of the bucket to access. Defaults to the public IBL repository.
overwrite : bool
If True, will re-download files even if the file sizes match.
Returns
-------
list of pathlib.Path
The local file paths.
"""
destination = Path(destination)
if destination.exists():
assert destination.is_dir(), 'destination must be a folder'
if s3 is None:
s3, bucket_name = get_s3_public()
local_files = []
objects = s3.Bucket(name=bucket_name).objects.filter(Prefix=source)
for obj_summary in filter(lambda x: not is_folder(x), objects):
# we can only filter an object collection by prefix, so we need to make sure the file
# is in the subpath of the source folder
# for example, if source is '/toto/tata' and obj_summary.key is
# '/toto/tata_alaternate/titi.txt', we need to exclude it
if not PurePosixPath(source) in PurePosixPath(obj_summary.key).parents:
continue
local_file = Path(destination).joinpath(Path(obj_summary.key).relative_to(source))
lf = s3_download_file(obj_summary.key, local_file, s3=s3, bucket_name=bucket_name,
overwrite=overwrite)
local_files.append(lf)
return local_files