"""A module for ad-hoc dataset modification and registration.
Unlike the DataHandler class in oneibl.data_handlers, the Patcher class allows one to fully remove
datasets (delete them from the database and repositories), and to overwrite datasets on both the
main repositories and the local servers. Additionally the Patcher can handle datasets from
multiple sessions at once.
Examples
--------
Delete a dataset from Alyx and all associated repositories.
>>> dataset_id = 'f4aafe6c-a7ab-4390-82cd-2c0e245322a5'
>>> task_ids, files_by_repo = IBLGlobusPatcher(AlyxClient(), 'admin').delete_dataset(dataset_id)
Patch some local datasets using Globus
>>> from one.api import ONE
>>> patcher = GlobusPatcher('admin', ONE(), label='UCLA audio times patch')
>>> responses = patcher.patch_datasets(file_paths) # register the new datasets to Alyx
>>> patcher.launch_transfers(local_servers=True) # transfer to all remote repositories
"""
import abc
import ftplib
from pathlib import Path, PurePosixPath, WindowsPath
from collections import defaultdict
from itertools import starmap
from subprocess import Popen, PIPE, STDOUT
import subprocess
import logging
from getpass import getpass
import shutil
import globus_sdk
import iblutil.io.params as iopar
from iblutil.util import ensure_list
from one.alf.path import get_session_path, add_uuid_string, full_path_parts
from one.alf.spec import is_uuid_string, is_uuid
from one import params
from one.webclient import AlyxClient
from one.converters import path_from_dataset
from one.remote import globus
from one.remote.aws import url2uri, get_s3_from_alyx
from ibllib.oneibl.registration import register_dataset
_logger = logging.getLogger(__name__)
FLATIRON_HOST = 'ibl.flatironinstitute.org'
FLATIRON_PORT = 61022
FLATIRON_USER = 'datauser'
FLATIRON_MOUNT = '/mnt/ibl'
FTP_HOST = 'test.alyx.internationalbrainlab.org'
FTP_PORT = 21
DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords
SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl')
SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp')
def _run_command(cmd, dry=True):
_logger.info(cmd)
if dry:
return 0, '', ''
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
info, error = p.communicate()
if p.returncode != 0:
_logger.error(error)
raise RuntimeError(error)
return p.returncode, info, error
[docs]
def sdsc_globus_path_from_dataset(dset):
"""
:param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
Returns SDSC globus file path from a dset record or a list of dsets records from REST
"""
return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True)
[docs]
def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH):
"""
Returns sdsc file path from a dset record or a list of dsets records from REST
:param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
:param root_path: (optional) the prefix path such as one download directory or SDSC root
"""
return path_from_dataset(dset, root_path=root_path, uuid=True)
[docs]
def globus_path_from_dataset(dset, repository=None, uuid=False):
"""
Returns local one file path from a dset record or a list of dsets records from REST
:param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
:param repository: (optional) repository name of the file record (if None, will take
the first filerecord with a URL)
"""
return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid)
[docs]
class Patcher(abc.ABC):
def __init__(self, one=None):
assert one
self.one = one
def _patch_dataset(self, path, dset_id=None, revision=None, dry=False, ftp=False):
"""
This private methods gets the dataset information from alyx, computes the local
and remote paths and initiates the file copy
"""
path = Path(path)
if dset_id is None:
dset_id = path.name.split('.')[-2]
if not is_uuid_string(dset_id):
dset_id = None
assert dset_id
assert is_uuid_string(dset_id)
# If the revision is not None then we need to add the revision into the path. Note the moving of the file
# is handled by one registration client
if revision and f'#{revision}' not in str(path):
path = path.parent.joinpath(f'#{revision}#', path.name)
assert path.exists()
dset = self.one.alyx.rest('datasets', 'read', id=dset_id)
fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path'])
remote_path = add_uuid_string(remote_path, dset_id).as_posix()
if remote_path.startswith('/'):
full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path)
else:
full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path)
if isinstance(path, WindowsPath) and not ftp:
# On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/
path = globus.as_globus_path(path)
status = self._scp(path, full_remote_path, dry=dry)[0]
return status
[docs]
def register_dataset(self, file_list, **kwargs):
"""
Registers a set of files belonging to a session only on the server
:param file_list: (list of pathlib.Path)
:param created_by: (string) name of user in Alyx (defaults to 'root')
:param repository: optional: (string) name of the server repository in Alyx
:param versions: optional (list of strings): versions tags (defaults to ibllib version)
:param dry: (bool) False by default
:return:
"""
return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs)
[docs]
def register_datasets(self, file_list, **kwargs):
"""
Same as register_dataset but works with files belonging to different sessions
"""
register_dict = {}
# creates a dictionary of sessions with one file list per session
for f in file_list:
session_path = get_session_path(f)
label = '_'.join(session_path.parts[-3:])
if label in register_dict:
register_dict[label]['files'].append(f)
else:
register_dict[label] = {'session_path': session_path, 'files': [f]}
responses = []
nses = len(register_dict)
for i, label in enumerate(register_dict):
_files = register_dict[label]['files']
_logger.info(f"{i + 1}/{nses} {label}, registering {len(_files)} files")
responses.append(self.register_dataset(_files, **kwargs))
return responses
[docs]
def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs):
"""
Creates a new dataset on FlatIron and uploads it from arbitrary location.
Rules for creation/patching are the same that apply for registration via Alyx
as this uses the registration endpoint to get the dataset.
An existing file (same session and path relative to session) will be patched.
:param path: full file path. Must be within an ALF session folder (subject/date/number)
can also be a list of full file paths belonging to the same session.
:param server_repository: Alyx server repository name
:param created_by: alyx username for the dataset (optional, defaults to root)
:param ftp: flag for case when using ftppatcher. Don't adjust windows path in
_patch_dataset when ftp=True
:return: the registrations response, a list of dataset records
"""
# first register the file
if not isinstance(file_list, list):
file_list = [Path(file_list)]
assert len(set(map(get_session_path, file_list))) == 1
assert all(Path(f).exists() for f in file_list)
response = ensure_list(self.register_dataset(file_list, dry=dry, **kwargs))
if dry:
return
# from the dataset info, set flatIron flag to exists=True
for p, d in zip(file_list, response):
self._patch_dataset(p, dset_id=d['id'], revision=d['revision'], dry=dry, ftp=ftp)
return response
[docs]
def patch_datasets(self, file_list, **kwargs):
"""Same as create_dataset method but works with several sessions."""
register_dict = {}
# creates a dictionary of sessions with one file list per session
for f in file_list:
session_path = get_session_path(f)
label = '_'.join(session_path.parts[-3:])
if label in register_dict:
register_dict[label]['files'].append(f)
else:
register_dict[label] = {'session_path': session_path, 'files': [f]}
responses = []
nses = len(register_dict)
for i, label in enumerate(register_dict):
_files = register_dict[label]['files']
_logger.info(f'{i + 1}/{nses} {label}, registering {len(_files)} files')
responses.extend(self.patch_dataset(_files, **kwargs))
return responses
@abc.abstractmethod
def _scp(self, *args, **kwargs):
pass
@abc.abstractmethod
def _rm(self, *args, **kwargs):
pass
[docs]
class GlobusPatcher(Patcher, globus.Globus):
"""
Requires GLOBUS keys access
"""
def __init__(self, client_name='default', one=None, label='ibllib patch'):
assert one and not one.offline
Patcher.__init__(self, one=one)
globus.Globus.__init__(self, client_name)
self.label = label
# get a dictionary of data repositories from Alyx (with globus ids)
self.fetch_endpoints_from_alyx(one.alyx)
flatiron_id = self.endpoints['flatiron_cortexlab']['id']
if 'flatiron' not in self.endpoints:
self.add_endpoint(flatiron_id, 'flatiron', root_path='/')
self.endpoints['flatiron'] = self.endpoints['flatiron_cortexlab']
# transfers/delete from the current computer to the flatiron: mandatory and executed first
local_id = self.endpoints['local']['id']
self.globus_transfer = globus_sdk.TransferData(
self.client, local_id, flatiron_id, verify_checksum=True, sync_level='checksum', label=label)
self.globus_delete = globus_sdk.DeleteData(self.client, flatiron_id, label=label)
# transfers/delete from flatiron to optional third parties to synchronize / delete
self.globus_transfers_locals = {}
self.globus_deletes_locals = {}
super().__init__(one=one)
def _scp(self, local_path, remote_path, dry=True):
remote_path = PurePosixPath('/').joinpath(
remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
)
_logger.info(f"Globus copy {local_path} to {remote_path}")
local_path = globus.as_globus_path(local_path)
if not dry:
if isinstance(self.globus_transfer, globus_sdk.TransferData):
self.globus_transfer.add_item(local_path, remote_path.as_posix())
else:
self.globus_transfer.path_src.append(local_path)
self.globus_transfer.path_dest.append(remote_path.as_posix())
return 0, ''
def _rm(self, flatiron_path, dry=True):
flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT)))
_logger.info(f'Globus del {flatiron_path}')
if not dry:
if isinstance(self.globus_delete, globus_sdk.DeleteData):
self.globus_delete.add_item(flatiron_path.as_posix())
else:
self.globus_delete.path.append(flatiron_path.as_posix())
return 0, ''
[docs]
def patch_datasets(self, file_list, **kwargs):
"""
Calls the super method that registers and updates the current computer to Python transfer
Then, creates individual transfer items for each local server so that after the
update on Flatiron, local server files are also updated
:param file_list:
:param kwargs:
:return:
"""
responses = super().patch_datasets(file_list, **kwargs)
for dset in responses:
# get the flatiron path
fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix()
flatiron_path = self.to_address(relative_path, fr['data_repository'])
# loop over the remaining repositories (local servers) and create a transfer
# from flatiron to the local server
for fr in dset['file_records']:
if fr['data_repository'] == DMZ_REPOSITORY:
continue
if fr['data_repository'] not in self.endpoints:
continue
repo_gid = self.endpoints[fr['data_repository']]['id']
flatiron_id = self.endpoints['flatiron']['id']
if repo_gid == flatiron_id:
continue
# if there is no transfer already created, initialize it
if repo_gid not in self.globus_transfers_locals:
self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData(
self.client, flatiron_id, repo_gid, verify_checksum=True,
sync_level='checksum', label=f"{self.label} on {fr['data_repository']}")
# get the local server path and create the transfer item
local_server_path = self.to_address(fr['relative_path'], fr['data_repository'])
self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path)
return responses
[docs]
def launch_transfers(self, local_servers=False):
"""
patcher.launch_transfers()
Launches the globus transfer and delete from the local patch computer to the flat-rion
:param: local_servers (False): if True, sync the local servers after the main transfer
:return: None
"""
gtc = self.client
def _wait_for_task(resp):
# patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9')
# on a good status:
# Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
# on a checksum error
# Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
# on a finished task
# Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
# on an errored task
# Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
while True:
tinfo = gtc.get_task(task_id=resp['task_id'])
if tinfo and tinfo['completion_time'] is not None:
break
_ = gtc.task_wait(task_id=resp['task_id'], timeout=30)
if tinfo and tinfo['fatal_error'] is not None:
raise ConnectionError(f"Globus transfer failed \n {tinfo}")
# handles the transfers first
if len(self.globus_transfer['DATA']) > 0:
# launch the transfer
_wait_for_task(gtc.submit_transfer(self.globus_transfer))
# re-initialize the globus_transfer property
self.globus_transfer = globus_sdk.TransferData(
gtc,
self.globus_transfer['source_endpoint'],
self.globus_transfer['destination_endpoint'],
label=self.globus_transfer['label'],
verify_checksum=True, sync_level='checksum')
# do the same for deletes
if len(self.globus_delete['DATA']) > 0:
_wait_for_task(gtc.submit_delete(self.globus_delete))
self.globus_delete = globus_sdk.DeleteData(
gtc,
endpoint=self.globus_delete['endpoint'],
label=self.globus_delete['label'])
# launch the local transfers and local deletes
if local_servers:
self.launch_transfers_secondary()
[docs]
def launch_transfers_secondary(self):
"""
patcher.launch_transfer_secondary()
Launches the globus transfers from flatiron to third-party repositories (local servers)
This should run after the the main transfer from patch computer to the flatiron
:return: None
"""
for lt in self.globus_transfers_locals:
transfer = self.globus_transfers_locals[lt]
if len(transfer['DATA']) > 0:
self.client.submit_transfer(transfer)
for ld in self.globus_deletes_locals:
delete = self.globus_deletes_locals[ld]
if len(transfer['DATA']) > 0:
self.client.submit_delete(delete)
[docs]
class IBLGlobusPatcher(Patcher, globus.Globus):
"""This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class.
The GlobusPatcher class is more complicated but has the advantage of being able to launch
transfers independently to registration, although it remains to be seen whether this is useful.
"""
def __init__(self, alyx=None, client_name='default'):
"""
Parameters
----------
alyx : one.webclient.AlyxClient
An instance of Alyx to use.
client_name : str, default='default'
The Globus client name.
"""
self.alyx = alyx or AlyxClient()
globus.Globus.__init__(client_name=client_name) # NB we don't init Patcher as we're not using ONE
[docs]
def delete_dataset(self, dataset, dry=False):
"""
Delete a dataset off Alyx and remove file record from all Globus repositories.
Parameters
----------
dataset : uuid.UUID, str, dict
The dataset record or ID to delete.
dry : bool
If true, dataset is not deleted and file paths that would be removed are returned.
Returns
-------
list of uuid.UUID
A list of Globus delete task IDs if dry is false.
dict of str
A map of data repository names and relative paths of the deleted files.
"""
if is_uuid(dataset):
did = dataset
dataset = self.alyx.rest('datasets', 'read', id=did)
else:
did = dataset['url'].split('/')[-1]
def is_aws(repository_name):
return repository_name.startswith('aws_')
files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath]
s3_files = []
file_records = filter(lambda x: x['exists'], dataset['file_records'])
for record in file_records:
repo = self.repo_from_alyx(record['data_repository'], self.alyx)
# Handle S3 files
if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver':
if is_aws(repo['name']):
s3_files.append(url2uri(record['data_url']))
files_by_repo[repo['name']].append(PurePosixPath(record['relative_path']))
else:
_logger.error('Unable to delete from %s', repo['name'])
else:
# Handle Globus files
if repo['name'] not in self.endpoints:
self.add_endpoint(repo['name'], alyx=self.alyx)
filepath = PurePosixPath(record['relative_path'])
if 'flatiron' in repo['name']:
filepath = add_uuid_string(filepath, did)
files_by_repo[repo['name']].append(filepath)
# Remove S3 files
if s3_files:
cmd = ['aws', 's3', 'rm', *s3_files, '--profile', 'ibladmin']
if dry:
cmd.append('--dryrun')
if _logger.level > logging.DEBUG:
log_function = _logger.error
cmd.append('--only-show-errors') # Suppress verbose output
else:
log_function = _logger.debug
cmd.append('--no-progress') # Suppress progress info, estimated time, etc.
_logger.debug(' '.join(cmd))
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
with process.stdout:
for line in iter(process.stdout.readline, b''):
log_function(line.decode().strip())
assert process.wait() == 0
if dry:
return [], files_by_repo
# Remove Globus files
globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items())
task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map)))
# Delete the dataset from Alyx
self.alyx.rest('datasets', 'delete', id=did)
return task_ids, files_by_repo
[docs]
class SSHPatcher(Patcher):
"""
Requires SSH keys access on the FlatIron
"""
def __init__(self, one=None):
res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls")
if res[0] != 0:
raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys")
super().__init__(one=one)
def _scp(self, local_path, remote_path, dry=True):
cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \
f" mkdir -p {remote_path.parent}; "
cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}"
return _run_command(cmd, dry=dry)
def _rm(self, flatiron_path, dry=True):
cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}"
return _run_command(cmd, dry=dry)
[docs]
class FTPPatcher(Patcher):
"""
This is used to register from anywhere without write access to FlatIron
"""
def __init__(self, one=None):
super().__init__(one=one)
alyx = self.one.alyx
if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False):
alyx._par = self.setup(par=alyx._par, silent=alyx.silent)
login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD)
self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd)
# self.ftp.ssl_version = ssl.PROTOCOL_TLSv1
# self.ftp.auth()
self.ftp.prot_p()
self.ftp.login(login, pwd)
# pre-fetch the repositories so as not to query them for every file registered
self.repositories = self.one.alyx.rest("data-repository", "list")
[docs]
@staticmethod
def setup(par=None, silent=False):
"""
Set up (and save) FTP login parameters
:param par: A parameters object to modify, if None the default Webclient parameters are
loaded
:param silent: If true, the defaults are used with no user input prompt
:return: the modified parameters object
"""
DEFAULTS = {
"FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org",
"FTP_DATA_SERVER_LOGIN": "iblftp",
"FTP_DATA_SERVER_PWD": None
}
if par is None:
par = params.get(silent=silent)
par = iopar.as_dict(par)
if silent:
DEFAULTS.update(par)
par = DEFAULTS
else:
for k in DEFAULTS.keys():
cpar = par.get(k, DEFAULTS[k])
# Iterate through non-password pars; skip url if client url already provided
if 'PWD' not in k:
par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar
else:
prompt = f'Param {k} (leave empty to leave unchanged):'
par[k] = getpass(prompt) or cpar
# Get the client key
client = par.get('ALYX_URL', None)
client_key = params._key_from_url(client) if client else params.get_default_client()
# Save the parameters
params.save(par, client_key) # Client params
return iopar.from_dict(par)
[docs]
def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY,
**kwargs):
# overrides the superclass just to remove the server repository argument
response = super().patch_dataset(path, created_by=created_by, dry=dry,
repository=repository, ftp=True, **kwargs)
# need to patch the file records to be consistent
for ds in response:
frs = ds['file_records']
fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
fr['relative_path'] == fr_server['relative_path'], frs))
reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'],
self.repositories))
relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath(
PurePosixPath(fr_ftp['relative_path'])))[1:]
# 1) if there was already a file, the registration created a duplicate
fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
fr['relative_path'] == relative_path, frs)) # NOQA
if len(fr_2del) == 1:
self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id'])
# 2) the patch ftp file needs to be prepended with the server repository path
self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'],
data={'relative_path': relative_path, 'exists': True})
# 3) the server file is labeled as not existing
self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
data={'exists': False})
return response
def _scp(self, local_path, remote_path, dry=True):
# remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001
remote_path = PurePosixPath('/').joinpath(
remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
)
# local_path
self.mktree(remote_path.parent)
self.ftp.pwd()
_logger.info(f"FTP upload {local_path}")
with open(local_path, 'rb') as fid:
self.ftp.storbinary(f'STOR {local_path.name}', fid)
return 0, ''
[docs]
def mktree(self, remote_path):
""" Browse to the tree on the ftp server, making directories on the way"""
if str(remote_path) != '.':
try:
self.ftp.cwd(str(remote_path))
except ftplib.error_perm:
self.mktree(PurePosixPath(remote_path.parent))
self.ftp.mkd(str(remote_path))
self.ftp.cwd(str(remote_path))
def _rm(self, flatiron_path, dry=True):
raise PermissionError("This Patcher does not have admin permissions to remove data "
"from the FlatIron server. ")
[docs]
class SDSCPatcher(Patcher):
"""
This is used to patch data on the SDSC server
"""
def __init__(self, one=None):
assert one
super().__init__(one=one)
[docs]
def patch_datasets(self, file_list, **kwargs):
response = super().patch_datasets(file_list, **kwargs)
# TODO check the file records to see if they have local server ones
# If they do then need to remove file record and delete file from local server??
return response
def _scp(self, local_path, remote_path, dry=True):
_logger.info(f"Copy {local_path} to {remote_path}")
if not dry:
if not Path(remote_path).parent.exists():
Path(remote_path).parent.mkdir(exist_ok=True, parents=True)
shutil.copy(local_path, remote_path)
return 0, ''
def _rm(self, flatiron_path, dry=True):
raise PermissionError("This Patcher does not have admin permissions to remove data "
"from the FlatIron server")
[docs]
class S3Patcher(Patcher):
def __init__(self, one=None):
assert one
super().__init__(one=one)
self.s3_repo = 's3_patcher'
self.s3_path = 'patcher'
# Instantiate boto connection
self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo)
[docs]
def check_datasets(self, file_list):
# Here we want to check if the datasets exist, if they do we don't want to patch unless we force.
exists = []
for file in file_list:
collection = full_path_parts(file, as_dict=True)['collection']
dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name,
collection=collection, clobber=True)
if len(dset) > 0:
exists.append(file)
return exists
[docs]
def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs):
exists = self.check_datasets(file_list)
if len(exists) > 0 and not force:
_logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True')
return
response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs)
# TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires
# changing the the alyx.data.register_view
for ds in response:
frs = ds['file_records']
fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
# Update the flatiron file record to be false
self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
data={'exists': False})
def _scp(self, local_path, remote_path, dry=True):
aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT))
_logger.info(f'Transferring file {local_path} to {aws_remote_path}')
self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path)))
return 0, ''
def _rm(self, *args, **kwargs):
raise PermissionError("This Patcher does not have admin permissions to remove data.")