Source code for oneibl.patcher

import abc
import ftplib
from pathlib import Path, PurePosixPath, WindowsPath
import subprocess
import logging

import globus_sdk

from ibllib.io import globus
import alf.io
from oneibl.registration import register_dataset

_logger = logging.getLogger('ibllib')

FLAT_IRON_GLOBUS_ID = 'ab2d064c-413d-11eb-b188-0ee0d5d9299f'
FLATIRON_HOST = 'ibl.flatironinstitute.org'
FLATIRON_PORT = 61022
FLATIRON_USER = 'datauser'
FLATIRON_MOUNT = '/mnt/ibl'
FTP_HOST = 'test.alyx.internationalbrainlab.org'
FTP_PORT = 21
DMZ_REPOSITORY = 'ibl_patcher'  # in alyx, the repository name containing the patched filerecords


def _run_command(cmd, dry=True):
    _logger.info(cmd)
    if dry:
        return 0, '', ''
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    info, error = p.communicate()
    if p.returncode != 0:
        _logger.error(error)
        raise RuntimeError(error)
    return p.returncode, info, error


[docs]class Patcher(abc.ABC): def __init__(self, one=None): assert one self.one = one def _patch_dataset(self, path, dset_id=None, dry=False, ftp=False): """ This private methods gets the dataset information from alyx, computes the local and remote paths and initiates the file copy """ path = Path(path) if dset_id is None: dset_id = path.name.split('.')[-2] if not alf.io.is_uuid_string(dset_id): dset_id = None assert dset_id assert alf.io.is_uuid_string(dset_id) assert path.exists() dset = self.one.alyx.rest('datasets', "read", id=dset_id) fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) remote_path = alf.io.add_uuid_string(remote_path, dset_id).as_posix() if remote_path.startswith('/'): full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) else: full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path) if isinstance(path, WindowsPath) and not ftp: # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/ path = '/~/' + path.as_posix().replace(':', '') status = self._scp(path, full_remote_path, dry=dry)[0] return status
[docs] def register_dataset(self, file_list, **kwargs): """ Registers a set of files belonging to a session only on the server :param file_list: (list of pathlib.Path) :param created_by: (string) name of user in Alyx (defaults to 'root') :param repository: optional: (string) name of the server repository in Alyx :param versions: optional (list of strings): versions tags (defaults to ibllib version) :param dry: (bool) False by default :return: """ return register_dataset(file_list, one=self.one, server_only=True, **kwargs)
[docs] def register_datasets(self, file_list, **kwargs): """ Same as register_dataset but works with files belonging to different sessions """ register_dict = {} # creates a dictionary of sessions with one file list per session for f in file_list: session_path = alf.io.get_session_path(f) label = '_'.join(session_path.parts[-3:]) if label in register_dict: register_dict[label]['files'].append(f) else: register_dict[label] = {'session_path': session_path, 'files': [f]} responses = [] nses = len(register_dict) for i, label in enumerate(register_dict): _files = register_dict[label]['files'] _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files") responses.append(self.register_dataset(_files, **kwargs)) return responses
[docs] def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): """ Creates a new dataset on FlatIron and uploads it from arbitrary location. Rules for creation/patching are the same that apply for registration via Alyx as this uses the registration endpoint to get the dataset. An existing file (same session and path relative to session) will be patched. :param path: full file path. Must be whithin an ALF session folder (subject/date/number) can also be a list of full file pathes belonging to the same session. :param server_repository: Alyx server repository name :param created_by: alyx username for the dataset (optional, defaults to root) :param ftp: flag for case when using ftppatcher. Don't adjust windows path in _patch_dataset when ftp=True :return: the registrations response, a list of dataset records """ # first register the file if not isinstance(file_list, list): file_list = [Path(file_list)] assert len(set([alf.io.get_session_path(f) for f in file_list])) == 1 assert all([Path(f).exists() for f in file_list]) response = self.register_dataset(file_list, dry=dry, **kwargs) if dry: return # from the dataset info, set flatIron flag to exists=True for p, d in zip(file_list, response): self._patch_dataset(p, dset_id=d['id'], dry=dry, ftp=ftp) return response
[docs] def patch_datasets(self, file_list, **kwargs): """ Same as create_dataset method but works with several sessions """ register_dict = {} # creates a dictionary of sessions with one file list per session for f in file_list: session_path = alf.io.get_session_path(f) label = '_'.join(session_path.parts[-3:]) if label in register_dict: register_dict[label]['files'].append(f) else: register_dict[label] = {'session_path': session_path, 'files': [f]} responses = [] nses = len(register_dict) for i, label in enumerate(register_dict): _files = register_dict[label]['files'] _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files") responses.extend(self.patch_dataset(_files, **kwargs)) return responses
@abc.abstractmethod def _scp(self, *args, **kwargs): pass @abc.abstractmethod def _rm(self, *args, **kwargs): pass
[docs]class GlobusPatcher(Patcher): """ Requires GLOBUS keys access """ def __init__(self, one=None, globus_client_id=None, local_endpoint=None, label='ibllib patch'): assert globus_client_id assert one self.local_endpoint = local_endpoint or globus.get_local_endpoint() self.label = label self.transfer_client = globus.login_auto( globus_client_id=globus_client_id, str_app='globus/admin') # transfers/delete from the current computer to the flatiron: mandatory and executed first self.globus_transfer = globus_sdk.TransferData( self.transfer_client, self.local_endpoint, FLAT_IRON_GLOBUS_ID, verify_checksum=True, sync_level='checksum', label=label) self.globus_delete = globus_sdk.DeleteData( self.transfer_client, FLAT_IRON_GLOBUS_ID, verify_checksum=True, sync_level='checksum', label=label) # get a dictionary of data repositories from Alyx (with globus ids) self.repos = {r['name']: r for r in one.alyx.rest('data-repository', 'list')} # transfers/delete from flatiron to optional third parties to synchronize / delete self.globus_transfers_locals = {} self.globus_deletes_locals = {} super().__init__(one=one) def _scp(self, local_path, remote_path, dry=True): remote_path = PurePosixPath('/').joinpath( remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) ) _logger.info(f"Globus copy {local_path} to {remote_path}") if not dry: if isinstance(self.globus_transfer, globus_sdk.transfer.data.TransferData): self.globus_transfer.add_item(local_path, remote_path) else: self.globus_transfer.path_src.append(local_path) self.globus_transfer.path_dest.append(remote_path) return 0, '' def _rm(self, flatiron_path, dry=True): flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT))) _logger.info(f"Globus del {flatiron_path}") if not dry: if isinstance(self.globus_delete, globus_sdk.transfer.data.DeleteData): self.globus_delete.add_item(flatiron_path) else: self.globus_delete.path.append(flatiron_path) return 0, ''
[docs] def patch_datasets(self, file_list, **kwargs): """ Calls the super method that registers and updates the current computer to Python transfer Then, creates individual transfer items for each local server so that after the update on Flatiron, local server files are also updated :param file_list: :param kwargs: :return: """ responses = super().patch_datasets(file_list, **kwargs) for dset in responses: # get the flatiron path fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) flatiron_path = self.repos[fr['data_repository']]['globus_path'] flatiron_path = Path(flatiron_path).joinpath(fr['relative_path']) flatiron_path = alf.io.add_uuid_string(flatiron_path, dset['id']).as_posix() # loop over the remaining repositories (local servers) and create a transfer # from flatiron to the local server for fr in dset['file_records']: if fr['data_repository'] == DMZ_REPOSITORY: continue repo_gid = self.repos[fr['data_repository']]['globus_endpoint_id'] if repo_gid == FLAT_IRON_GLOBUS_ID: continue # if there is no transfer already created, initialize it if repo_gid not in self.globus_transfers_locals: self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( self.transfer_client, FLAT_IRON_GLOBUS_ID, repo_gid, verify_checksum=True, sync_level='checksum', label=f"{self.label} on {fr['data_repository']}") # get the local server path and create the transfer item local_server_path = self.repos[fr['data_repository']]['globus_path'] local_server_path = Path(local_server_path).joinpath(fr['relative_path']) self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) return responses
[docs] def launch_transfers(self, local_servers=False): """ patcher.launch_transfers() Launches the globus transfer and delete from the local patch computer to the flat-rion :param: local_servers (False): if True, sync the local servers after the main transfer :return: None """ gtc = self.transfer_client def _wait_for_task(resp): # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9') # on a good status: # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on a checksum error # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on a finished task # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa # on an errored task # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa while True: tinfo = gtc.get_task(task_id=resp['task_id'])['completion_time'] if tinfo['completion_time'] is not None: break _ = gtc.task_wait(task_id=resp['task_id'], timeout=30) if tinfo['fatal_error'] is not None: raise ConnectionError(f"Globus transfer failed \n {tinfo}") # handles the transfers first if len(self.globus_transfer['DATA']) > 0: # launch the transfer _wait_for_task(gtc.submit_transfer(self.globus_transfer)) # re-initialize the globus_transfer property self.globus_transfer = globus_sdk.TransferData( gtc, self.globus_transfer['source_endpoint'], self.globus_transfer['destination_endpoint'], label=self.globus_transfer['label'], verify_checksum=True, sync_level='checksum') # do the same for deletes if len(self.globus_delete['DATA']) > 0: _wait_for_task(gtc.submit_delete(self.globus_delete)) self.globus_delete = globus_sdk.DeleteData( gtc, endpoint=self.globus_delete['endpoint'], label=self.globus_delete['label'], verify_checksum=True, sync_level='checksum') # launch the local transfers and local deletes if local_servers: self.launch_transfers_secondary()
[docs] def launch_transfers_secondary(self): """ patcher.launch_transfer_secondary() Launches the globus transfers from flatiron to third-party repositories (local servers) This should run after the the main transfer from patch computer to the flatiron :return: None """ for lt in self.globus_transfers_locals: transfer = self.globus_transfers_locals[lt] if len(transfer['DATA']) > 0: self.transfer_client.submit_transfer(transfer) for ld in self.globus_deletes_locals: delete = self.globus_deletes_locals[ld] if len(transfer['DATA']) > 0: self.transfer_client.submit_delete(delete)
[docs]class SSHPatcher(Patcher): """ Requires SSH keys access on the FlatIron """ def __init__(self, one=None, globus_client=None): res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls") if res[0] != 0: raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys") super().__init__(one=one) def _scp(self, local_path, remote_path, dry=True): cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \ f" mkdir -p {remote_path.parent}; " cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}" return _run_command(cmd, dry=dry) def _rm(self, flatiron_path, dry=True): cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}" return _run_command(cmd, dry=dry)
[docs]class FTPPatcher(Patcher): """ This is used to register from anywhere without write access to FlatIron """ def __init__(self, one=None, globus_client=None): super().__init__(one=one) self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=one._par.FTP_DATA_SERVER_LOGIN, passwd=one._par.FTP_DATA_SERVER_PWD) # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1 # self.ftp.auth() self.ftp.prot_p() self.ftp.login(one._par.FTP_DATA_SERVER_LOGIN, one._par.FTP_DATA_SERVER_PWD) # pre-fetch the repositories so as not to query them for every file registered self.repositories = self.one.alyx.rest("data-repository", "list")
[docs] def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY): # overrides the superclass just to remove the server repository argument response = super().patch_dataset(path, created_by=created_by, dry=dry, repository=repository, ftp=True) # need to patch the file records to be consistent for ds in response: frs = ds['file_records'] fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and fr['relative_path'] == fr_server['relative_path'], frs)) reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'], self.repositories)) relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath( PurePosixPath(fr_ftp['relative_path'])))[1:] # 1) if there was already a file, the registration created a duplicate fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and fr['relative_path'] == relative_path, frs)) # NOQA if len(fr_2del) == 1: self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id']) # 2) the patch ftp file needs to be prepended with the server repository path self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'], data={'relative_path': relative_path, 'exists': True}) # 3) the server file is labeled as not existing self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], data={'exists': False}) return response
def _scp(self, local_path, remote_path, dry=True): # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001 remote_path = PurePosixPath('/').joinpath( remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) ) # local_path self.mktree(remote_path.parent) self.ftp.pwd() _logger.info(f"FTP upload {local_path}") with open(local_path, 'rb') as fid: self.ftp.storbinary(f'STOR {local_path.name}', fid) return 0, ''
[docs] def mktree(self, remote_path): """ Browse to the tree on the ftp server, making directories on the way""" if str(remote_path) != '.': try: self.ftp.cwd(str(remote_path)) except ftplib.error_perm: self.mktree(PurePosixPath(remote_path.parent)) self.ftp.mkd(str(remote_path)) self.ftp.cwd(str(remote_path))
def _rm(self, flatiron_path, dry=True): raise PermissionError("This Patcher does not have admin permissions to remove data " "from the FlatIron server. ")