Source code for ibllib.oneibl.patcher

"""A module for ad-hoc dataset modification and registration.

Unlike the DataHandler class in oneibl.data_handlers, the Patcher class allows one to fully remove
datasets (delete them from the database and repositories), and to overwrite datasets on both the
main repositories and the local servers.  Additionally the Patcher can handle datasets from
multiple sessions at once.

Examples
--------
Delete a dataset from Alyx and all associated repositories.

>>> dataset_id = 'f4aafe6c-a7ab-4390-82cd-2c0e245322a5'
>>> task_ids, files_by_repo = IBLGlobusPatcher(AlyxClient(), 'admin').delete_dataset(dataset_id)

Patch some local datasets using Globus

>>> from one.api import ONE
>>> patcher = GlobusPatcher('admin', ONE(), label='UCLA audio times patch')
>>> responses = patcher.patch_datasets(file_paths)  # register the new datasets to Alyx
>>> patcher.launch_transfers(local_servers=True)  # transfer to all remote repositories

"""
import abc
import ftplib
from pathlib import Path, PurePosixPath, WindowsPath
from collections import defaultdict
from itertools import starmap
from subprocess import Popen, PIPE, STDOUT
import subprocess
import logging
from getpass import getpass
import shutil

import globus_sdk
import iblutil.io.params as iopar
from iblutil.util import ensure_list
from one.alf.path import get_session_path, add_uuid_string, full_path_parts
from one.alf.spec import is_uuid_string, is_uuid
from one import params
from one.webclient import AlyxClient
from one.converters import path_from_dataset
from one.remote import globus
from one.remote.aws import url2uri, get_s3_from_alyx

from ibllib.oneibl.registration import register_dataset

_logger = logging.getLogger(__name__)

FLATIRON_HOST = 'ibl.flatironinstitute.org'
FLATIRON_PORT = 61022
FLATIRON_USER = 'datauser'
FLATIRON_MOUNT = '/mnt/ibl'
FTP_HOST = 'test.alyx.internationalbrainlab.org'
FTP_PORT = 21
DMZ_REPOSITORY = 'ibl_patcher'  # in alyx, the repository name containing the patched filerecords
SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl')
SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp')


def _run_command(cmd, dry=True):
    _logger.info(cmd)
    if dry:
        return 0, '', ''
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    info, error = p.communicate()
    if p.returncode != 0:
        _logger.error(error)
        raise RuntimeError(error)
    return p.returncode, info, error


[docs] def sdsc_globus_path_from_dataset(dset): """ :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint Returns SDSC globus file path from a dset record or a list of dsets records from REST """ return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True)
[docs] def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH): """ Returns sdsc file path from a dset record or a list of dsets records from REST :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param root_path: (optional) the prefix path such as one download directory or SDSC root """ return path_from_dataset(dset, root_path=root_path, uuid=True)
[docs] def globus_path_from_dataset(dset, repository=None, uuid=False): """ Returns local one file path from a dset record or a list of dsets records from REST :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param repository: (optional) repository name of the file record (if None, will take the first filerecord with a URL) """ return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid)
[docs] class Patcher(abc.ABC): def __init__(self, one=None): assert one self.one = one def _patch_dataset(self, path, dset_id=None, revision=None, dry=False, ftp=False): """ This private methods gets the dataset information from alyx, computes the local and remote paths and initiates the file copy """ path = Path(path) if dset_id is None: dset_id = path.name.split('.')[-2] if not is_uuid_string(dset_id): dset_id = None assert dset_id assert is_uuid_string(dset_id) # If the revision is not None then we need to add the revision into the path. Note the moving of the file # is handled by one registration client if revision and f'#{revision}' not in str(path): path = path.parent.joinpath(f'#{revision}#', path.name) assert path.exists() dset = self.one.alyx.rest('datasets', 'read', id=dset_id) fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) remote_path = add_uuid_string(remote_path, dset_id).as_posix() if remote_path.startswith('/'): full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) else: full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path) if isinstance(path, WindowsPath) and not ftp: # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/ path = globus.as_globus_path(path) status = self._scp(path, full_remote_path, dry=dry)[0] return status
[docs] def register_dataset(self, file_list, **kwargs): """ Registers a set of files belonging to a session only on the server :param file_list: (list of pathlib.Path) :param created_by: (string) name of user in Alyx (defaults to 'root') :param repository: optional: (string) name of the server repository in Alyx :param versions: optional (list of strings): versions tags (defaults to ibllib version) :param dry: (bool) False by default :return: """ return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs)
[docs] def register_datasets(self, file_list, **kwargs): """ Same as register_dataset but works with files belonging to different sessions """ register_dict = {} # creates a dictionary of sessions with one file list per session for f in file_list: session_path = get_session_path(f) label = '_'.join(session_path.parts[-3:]) if label in register_dict: register_dict[label]['files'].append(f) else: register_dict[label] = {'session_path': session_path, 'files': [f]} responses = [] nses = len(register_dict) for i, label in enumerate(register_dict): _files = register_dict[label]['files'] _logger.info(f"{i + 1}/{nses} {label}, registering {len(_files)} files") responses.append(self.register_dataset(_files, **kwargs)) return responses
[docs] def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): """ Creates a new dataset on FlatIron and uploads it from arbitrary location. Rules for creation/patching are the same that apply for registration via Alyx as this uses the registration endpoint to get the dataset. An existing file (same session and path relative to session) will be patched. :param path: full file path. Must be within an ALF session folder (subject/date/number) can also be a list of full file paths belonging to the same session. :param server_repository: Alyx server repository name :param created_by: alyx username for the dataset (optional, defaults to root) :param ftp: flag for case when using ftppatcher. Don't adjust windows path in _patch_dataset when ftp=True :return: the registrations response, a list of dataset records """ # first register the file if not isinstance(file_list, list): file_list = [Path(file_list)] assert len(set(map(get_session_path, file_list))) == 1 assert all(Path(f).exists() for f in file_list) response = ensure_list(self.register_dataset(file_list, dry=dry, **kwargs)) if dry: return # from the dataset info, set flatIron flag to exists=True for p, d in zip(file_list, response): self._patch_dataset(p, dset_id=d['id'], revision=d['revision'], dry=dry, ftp=ftp) return response
[docs] def patch_datasets(self, file_list, **kwargs): """Same as create_dataset method but works with several sessions.""" register_dict = {} # creates a dictionary of sessions with one file list per session for f in file_list: session_path = get_session_path(f) label = '_'.join(session_path.parts[-3:]) if label in register_dict: register_dict[label]['files'].append(f) else: register_dict[label] = {'session_path': session_path, 'files': [f]} responses = [] nses = len(register_dict) for i, label in enumerate(register_dict): _files = register_dict[label]['files'] _logger.info(f'{i + 1}/{nses} {label}, registering {len(_files)} files') responses.extend(self.patch_dataset(_files, **kwargs)) return responses
@abc.abstractmethod def _scp(self, *args, **kwargs): pass @abc.abstractmethod def _rm(self, *args, **kwargs): pass
[docs] class GlobusPatcher(Patcher, globus.Globus): """ Requires GLOBUS keys access """ def __init__(self, client_name='default', one=None, label='ibllib patch'): assert one and not one.offline Patcher.__init__(self, one=one) globus.Globus.__init__(self, client_name) self.label = label # get a dictionary of data repositories from Alyx (with globus ids) self.fetch_endpoints_from_alyx(one.alyx) flatiron_id = self.endpoints['flatiron_cortexlab']['id'] if 'flatiron' not in self.endpoints: self.add_endpoint(flatiron_id, 'flatiron', root_path='/') self.endpoints['flatiron'] = self.endpoints['flatiron_cortexlab'] # transfers/delete from the current computer to the flatiron: mandatory and executed first local_id = self.endpoints['local']['id'] self.globus_transfer = globus_sdk.TransferData( self.client, local_id, flatiron_id, verify_checksum=True, sync_level='checksum', label=label) self.globus_delete = globus_sdk.DeleteData(self.client, flatiron_id, label=label) # transfers/delete from flatiron to optional third parties to synchronize / delete self.globus_transfers_locals = {} self.globus_deletes_locals = {} super().__init__(one=one) def _scp(self, local_path, remote_path, dry=True): remote_path = PurePosixPath('/').joinpath( remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) ) _logger.info(f"Globus copy {local_path} to {remote_path}") local_path = globus.as_globus_path(local_path) if not dry: if isinstance(self.globus_transfer, globus_sdk.TransferData): self.globus_transfer.add_item(local_path, remote_path.as_posix()) else: self.globus_transfer.path_src.append(local_path) self.globus_transfer.path_dest.append(remote_path.as_posix()) return 0, '' def _rm(self, flatiron_path, dry=True): flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT))) _logger.info(f'Globus del {flatiron_path}') if not dry: if isinstance(self.globus_delete, globus_sdk.DeleteData): self.globus_delete.add_item(flatiron_path.as_posix()) else: self.globus_delete.path.append(flatiron_path.as_posix()) return 0, ''
[docs] def patch_datasets(self, file_list, **kwargs): """ Calls the super method that registers and updates the current computer to Python transfer Then, creates individual transfer items for each local server so that after the update on Flatiron, local server files are also updated :param file_list: :param kwargs: :return: """ responses = super().patch_datasets(file_list, **kwargs) for dset in responses: # get the flatiron path fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() flatiron_path = self.to_address(relative_path, fr['data_repository']) # loop over the remaining repositories (local servers) and create a transfer # from flatiron to the local server for fr in dset['file_records']: if fr['data_repository'] == DMZ_REPOSITORY: continue if fr['data_repository'] not in self.endpoints: continue repo_gid = self.endpoints[fr['data_repository']]['id'] flatiron_id = self.endpoints['flatiron']['id'] if repo_gid == flatiron_id: continue # if there is no transfer already created, initialize it if repo_gid not in self.globus_transfers_locals: self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( self.client, flatiron_id, repo_gid, verify_checksum=True, sync_level='checksum', label=f"{self.label} on {fr['data_repository']}") # get the local server path and create the transfer item local_server_path = self.to_address(fr['relative_path'], fr['data_repository']) self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) return responses
[docs] def launch_transfers(self, local_servers=False): """ patcher.launch_transfers() Launches the globus transfer and delete from the local patch computer to the flat-rion :param: local_servers (False): if True, sync the local servers after the main transfer :return: None """ gtc = self.client def _wait_for_task(resp): # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9') # on a good status: # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on a checksum error # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on a finished task # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on an errored task # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa while True: tinfo = gtc.get_task(task_id=resp['task_id']) if tinfo and tinfo['completion_time'] is not None: break _ = gtc.task_wait(task_id=resp['task_id'], timeout=30) if tinfo and tinfo['fatal_error'] is not None: raise ConnectionError(f"Globus transfer failed \n {tinfo}") # handles the transfers first if len(self.globus_transfer['DATA']) > 0: # launch the transfer _wait_for_task(gtc.submit_transfer(self.globus_transfer)) # re-initialize the globus_transfer property self.globus_transfer = globus_sdk.TransferData( gtc, self.globus_transfer['source_endpoint'], self.globus_transfer['destination_endpoint'], label=self.globus_transfer['label'], verify_checksum=True, sync_level='checksum') # do the same for deletes if len(self.globus_delete['DATA']) > 0: _wait_for_task(gtc.submit_delete(self.globus_delete)) self.globus_delete = globus_sdk.DeleteData( gtc, endpoint=self.globus_delete['endpoint'], label=self.globus_delete['label']) # launch the local transfers and local deletes if local_servers: self.launch_transfers_secondary()
[docs] def launch_transfers_secondary(self): """ patcher.launch_transfer_secondary() Launches the globus transfers from flatiron to third-party repositories (local servers) This should run after the the main transfer from patch computer to the flatiron :return: None """ for lt in self.globus_transfers_locals: transfer = self.globus_transfers_locals[lt] if len(transfer['DATA']) > 0: self.client.submit_transfer(transfer) for ld in self.globus_deletes_locals: delete = self.globus_deletes_locals[ld] if len(transfer['DATA']) > 0: self.client.submit_delete(delete)
[docs] class IBLGlobusPatcher(Patcher, globus.Globus): """This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class. The GlobusPatcher class is more complicated but has the advantage of being able to launch transfers independently to registration, although it remains to be seen whether this is useful. """ def __init__(self, alyx=None, client_name='default'): """ Parameters ---------- alyx : one.webclient.AlyxClient An instance of Alyx to use. client_name : str, default='default' The Globus client name. """ self.alyx = alyx or AlyxClient() globus.Globus.__init__(client_name=client_name) # NB we don't init Patcher as we're not using ONE
[docs] def delete_dataset(self, dataset, dry=False): """ Delete a dataset off Alyx and remove file record from all Globus repositories. Parameters ---------- dataset : uuid.UUID, str, dict The dataset record or ID to delete. dry : bool If true, dataset is not deleted and file paths that would be removed are returned. Returns ------- list of uuid.UUID A list of Globus delete task IDs if dry is false. dict of str A map of data repository names and relative paths of the deleted files. """ if is_uuid(dataset): did = dataset dataset = self.alyx.rest('datasets', 'read', id=did) else: did = dataset['url'].split('/')[-1] def is_aws(repository_name): return repository_name.startswith('aws_') files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath] s3_files = [] file_records = filter(lambda x: x['exists'], dataset['file_records']) for record in file_records: repo = self.repo_from_alyx(record['data_repository'], self.alyx) # Handle S3 files if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver': if is_aws(repo['name']): s3_files.append(url2uri(record['data_url'])) files_by_repo[repo['name']].append(PurePosixPath(record['relative_path'])) else: _logger.error('Unable to delete from %s', repo['name']) else: # Handle Globus files if repo['name'] not in self.endpoints: self.add_endpoint(repo['name'], alyx=self.alyx) filepath = PurePosixPath(record['relative_path']) if 'flatiron' in repo['name']: filepath = add_uuid_string(filepath, did) files_by_repo[repo['name']].append(filepath) # Remove S3 files if s3_files: cmd = ['aws', 's3', 'rm', *s3_files, '--profile', 'ibladmin'] if dry: cmd.append('--dryrun') if _logger.level > logging.DEBUG: log_function = _logger.error cmd.append('--only-show-errors') # Suppress verbose output else: log_function = _logger.debug cmd.append('--no-progress') # Suppress progress info, estimated time, etc. _logger.debug(' '.join(cmd)) process = Popen(cmd, stdout=PIPE, stderr=STDOUT) with process.stdout: for line in iter(process.stdout.readline, b''): log_function(line.decode().strip()) assert process.wait() == 0 if dry: return [], files_by_repo # Remove Globus files globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items()) task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map))) # Delete the dataset from Alyx self.alyx.rest('datasets', 'delete', id=did) return task_ids, files_by_repo
[docs] class SSHPatcher(Patcher): """ Requires SSH keys access on the FlatIron """ def __init__(self, one=None): res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls") if res[0] != 0: raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys") super().__init__(one=one) def _scp(self, local_path, remote_path, dry=True): cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \ f" mkdir -p {remote_path.parent}; " cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}" return _run_command(cmd, dry=dry) def _rm(self, flatiron_path, dry=True): cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}" return _run_command(cmd, dry=dry)
[docs] class FTPPatcher(Patcher): """ This is used to register from anywhere without write access to FlatIron """ def __init__(self, one=None): super().__init__(one=one) alyx = self.one.alyx if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): alyx._par = self.setup(par=alyx._par, silent=alyx.silent) login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1 # self.ftp.auth() self.ftp.prot_p() self.ftp.login(login, pwd) # pre-fetch the repositories so as not to query them for every file registered self.repositories = self.one.alyx.rest("data-repository", "list")
[docs] @staticmethod def setup(par=None, silent=False): """ Set up (and save) FTP login parameters :param par: A parameters object to modify, if None the default Webclient parameters are loaded :param silent: If true, the defaults are used with no user input prompt :return: the modified parameters object """ DEFAULTS = { "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org", "FTP_DATA_SERVER_LOGIN": "iblftp", "FTP_DATA_SERVER_PWD": None } if par is None: par = params.get(silent=silent) par = iopar.as_dict(par) if silent: DEFAULTS.update(par) par = DEFAULTS else: for k in DEFAULTS.keys(): cpar = par.get(k, DEFAULTS[k]) # Iterate through non-password pars; skip url if client url already provided if 'PWD' not in k: par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar else: prompt = f'Param {k} (leave empty to leave unchanged):' par[k] = getpass(prompt) or cpar # Get the client key client = par.get('ALYX_URL', None) client_key = params._key_from_url(client) if client else params.get_default_client() # Save the parameters params.save(par, client_key) # Client params return iopar.from_dict(par)
[docs] def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY, **kwargs): # overrides the superclass just to remove the server repository argument response = super().patch_dataset(path, created_by=created_by, dry=dry, repository=repository, ftp=True, **kwargs) # need to patch the file records to be consistent for ds in response: frs = ds['file_records'] fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and fr['relative_path'] == fr_server['relative_path'], frs)) reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'], self.repositories)) relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath( PurePosixPath(fr_ftp['relative_path'])))[1:] # 1) if there was already a file, the registration created a duplicate fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and fr['relative_path'] == relative_path, frs)) # NOQA if len(fr_2del) == 1: self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id']) # 2) the patch ftp file needs to be prepended with the server repository path self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'], data={'relative_path': relative_path, 'exists': True}) # 3) the server file is labeled as not existing self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], data={'exists': False}) return response
def _scp(self, local_path, remote_path, dry=True): # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001 remote_path = PurePosixPath('/').joinpath( remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) ) # local_path self.mktree(remote_path.parent) self.ftp.pwd() _logger.info(f"FTP upload {local_path}") with open(local_path, 'rb') as fid: self.ftp.storbinary(f'STOR {local_path.name}', fid) return 0, ''
[docs] def mktree(self, remote_path): """ Browse to the tree on the ftp server, making directories on the way""" if str(remote_path) != '.': try: self.ftp.cwd(str(remote_path)) except ftplib.error_perm: self.mktree(PurePosixPath(remote_path.parent)) self.ftp.mkd(str(remote_path)) self.ftp.cwd(str(remote_path))
def _rm(self, flatiron_path, dry=True): raise PermissionError("This Patcher does not have admin permissions to remove data " "from the FlatIron server. ")
[docs] class SDSCPatcher(Patcher): """ This is used to patch data on the SDSC server """ def __init__(self, one=None): assert one super().__init__(one=one)
[docs] def patch_datasets(self, file_list, **kwargs): response = super().patch_datasets(file_list, **kwargs) # TODO check the file records to see if they have local server ones # If they do then need to remove file record and delete file from local server?? return response
def _scp(self, local_path, remote_path, dry=True): _logger.info(f"Copy {local_path} to {remote_path}") if not dry: if not Path(remote_path).parent.exists(): Path(remote_path).parent.mkdir(exist_ok=True, parents=True) shutil.copy(local_path, remote_path) return 0, '' def _rm(self, flatiron_path, dry=True): raise PermissionError("This Patcher does not have admin permissions to remove data " "from the FlatIron server")
[docs] class S3Patcher(Patcher): def __init__(self, one=None): assert one super().__init__(one=one) self.s3_repo = 's3_patcher' self.s3_path = 'patcher' # Instantiate boto connection self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo)
[docs] def check_datasets(self, file_list): # Here we want to check if the datasets exist, if they do we don't want to patch unless we force. exists = [] for file in file_list: collection = full_path_parts(file, as_dict=True)['collection'] dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name, collection=collection, clobber=True) if len(dset) > 0: exists.append(file) return exists
[docs] def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs): exists = self.check_datasets(file_list) if len(exists) > 0 and not force: _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True') return response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) # TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires # changing the the alyx.data.register_view for ds in response: frs = ds['file_records'] fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) # Update the flatiron file record to be false self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], data={'exists': False})
def _scp(self, local_path, remote_path, dry=True): aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT)) _logger.info(f'Transferring file {local_path} to {aws_remote_path}') self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) return 0, '' def _rm(self, *args, **kwargs): raise PermissionError("This Patcher does not have admin permissions to remove data.")