Source code for ibllib.oneibl.data_handlers

"""Downloading of task dependent datasets and registration of task output datasets.

The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are
present and to register and upload the output datasets.  For examples on how to run a task using
specific data handlers, see :func:`ibllib.pipes.tasks`.
"""
import logging
import pandas as pd
from pathlib import Path, PurePosixPath
import shutil
import os
import abc
from time import time
from copy import copy

from one.api import ONE
from one.webclient import AlyxClient
from one.util import filter_datasets
from one.alf.path import add_uuid_string, session_path_parts, get_alf_path
from one.alf.cache import _make_datasets_df
from iblutil.util import flatten, ensure_list

from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher


_logger = logging.getLogger(__name__)


[docs] class ExpectedDataset: """An expected input or output dataset.""" inverted = False def __init__(self, name, collection, register=None, revision=None, unique=True): """ An expected input or output dataset. NB: This should not be instantiated directly, but rather via the `input` or `output` static method. Parameters ---------- name : str, None A dataset name or glob pattern. collection : str, None An ALF collection or pattern. register : bool Whether to register the file. Default is False for input files, True for output files. revision : str An optional revision. unique : bool Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not affect the output of `find_files`. """ if not (collection is None or isinstance(collection, str)): collection = '/'.join(collection) self._identifiers = (collection, revision, name) self.operator = None self._register = register or False self.inverted = False self.name = None self.unique = unique @property def register(self): """bool: whether to register the output file.""" return self._register @register.setter def register(self, value): """bool: whether to register the output file.""" if self.operator is not None: raise AttributeError('cannot set register attribute for operator datasets') self._register = value @property def identifiers(self): """tuple: the identifying parts of the dataset. If no operator is applied, the identifiers are (collection, revision, name). If an operator is applied, a tuple of 3-element tuples is returned. """ if self.operator is None: return self._identifiers # Flatten nested identifiers into tuple of 3-element tuples identifiers = [] for x in self._identifiers: add = identifiers.extend if x.operator else identifiers.append add(x.identifiers) return tuple(identifiers) @property def glob_pattern(self): """str, tuple of str: one or more glob patterns.""" if self.operator is None: return str(PurePosixPath(*filter(None, self._identifiers))) else: return tuple(flatten(x.glob_pattern for x in self._identifiers)) def __repr__(self): """Represent the dataset object as a string. If the `name` property is not None, it is returned, otherwise the identifies are used to format the name. """ name = self.__class__.__name__ if self.name: return f'<{name}({self.name})>' if self.operator: sym = {'or': '|', 'and': '&', 'xor': '^'} patterns = [d.__repr__() for d in self._identifiers] pattern = f'{sym[self.operator]:^3}'.join(patterns) if self.inverted: pattern = f'~({pattern})' else: pattern = ('~' if self.inverted else '') + self.glob_pattern return f'<{name}({pattern})>'
[docs] def find_files(self, session_path, register=False): """Find files on disk. Uses glob patterns to find dataset(s) on disk. Parameters ---------- session_path : pathlib.Path, str A session path within which to glob for the dataset(s). register : bool Only return files intended to be registered. Returns ------- bool True if the dataset is found on disk or is optional. list of pathlib.Path A list of matching dataset files. missing, None, str, set of str One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). Notes ----- - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised although this may change in the future. - If `register` is false, all files are returned regardless of whether they are intended to be registered. - If `inverted` is true, and files are found, the glob pattern is returned as missing. - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing patterns. - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. """ session_path = Path(session_path) ok, actual_files, missing = False, [], None if self.operator is None: if register and not self.register: return True, actual_files, missing actual_files = sorted(session_path.rglob(self.glob_pattern)) # If no revision pattern provided and no files found, search for any revision if self._identifiers[1] is None and not any(actual_files): glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) actual_files = sorted(session_path.rglob(glob_pattern)) ok = any(actual_files) != self.inverted if not ok: missing = self.glob_pattern elif self.operator == 'and': assert len(self._identifiers) == 2 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) ok = all(_ok) actual_files = flatten(_actual_files) missing = set(filter(None, flatten(_missing))) elif self.operator == 'or': assert len(self._identifiers) == 2 missing = set() for d in self._identifiers: ok, actual_files, _missing = d.find_files(session_path) if ok: break if missing is not None: missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) elif self.operator == 'xor': assert len(self._identifiers) == 2 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 # Return only those datasets that are complete if OK actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) if ok: missing = set() elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing missing = set(flatten(self.glob_pattern)) elif not any(_ok): # return all missing glob patterns if none present missing = set(filter(None, flatten(_missing))) elif not isinstance(self.operator, str): raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') else: raise NotImplementedError(f'logical {self.operator.upper()} not implemented') return ok, actual_files, missing
[docs] def filter(self, session_datasets, **kwargs): """Filter dataset frame by expected datasets. Parameters ---------- session_datasets : pandas.DataFrame An data frame of session datasets. kwargs Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and ignore_qc_not_set. Returns ------- bool True if the required dataset(s) are present in the data frame. pandas.DataFrame A filtered data frame of containing the expected dataset(s). """ # ok, datasets = False, session_datasets.iloc[0:0] if self.operator is None: collection, revision, file = self._identifiers if self._identifiers[1] is not None: raise NotImplementedError('revisions not yet supported') datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) ok = datasets.empty == self.inverted elif self.operator == 'or': assert len(self._identifiers) == 2 for d in self._identifiers: ok, datasets = d.filter(session_datasets, **kwargs) if ok: break elif self.operator == 'xor': assert len(self._identifiers) == 2 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) ok = sum(_ok) == 1 if ok: # Return only those datasets that are complete. datasets = _datasets[_ok.index(True)] else: datasets = pd.concat(_datasets) elif self.operator == 'and': assert len(self._identifiers) == 2 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) ok = all(_ok) datasets = pd.concat(_datasets) elif not isinstance(self.operator, str): raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') else: raise NotImplementedError(f'logical {self.operator.upper()} not implemented') return ok, datasets
def _apply_op(self, op, other): """Apply an operation between two datasets.""" # Assert both instances of Input or both instances of Output if not isinstance(other, (self.__class__, tuple)): raise TypeError(f'logical operations not supported between objects of type ' f'{self.__class__.__name__} and {other.__class__.__name__}') # Assert operation supported if op not in {'or', 'xor', 'and'}: raise ValueError(op) # Convert tuple to ExpectDataset instance if isinstance(other, tuple): D = (self.input if isinstance(self, Input) else self.output) other = D(*other) # Returned instance should only be optional if both datasets are optional is_input = isinstance(self, Input) if all(isinstance(x, OptionalDataset) for x in (self, other)): D = OptionalInput if is_input else OptionalOutput else: D = Input if is_input else Output # Instantiate 'empty' object d = D(None, None) d._identifiers = (self, other) d.operator = op return d def __invert__(self): """Assert dataset doesn't exist on disk.""" obj = copy(self) obj.inverted = not self.inverted return obj def __or__(self, b): """Assert either dataset exists or another does, or both exist.""" return self._apply_op('or', b) def __xor__(self, b): """Assert either dataset exists or another does, not both.""" return self._apply_op('xor', b) def __and__(self, b): """Assert that a second dataset exists together with the first.""" return self._apply_op('and', b)
[docs] @staticmethod def input(name, collection, required=True, register=False, **kwargs): """ Create an expected input dataset. By default, expected input datasets are not automatically registered. Parameters ---------- name : str A dataset name or glob pattern. collection : str, None An ALF collection or pattern. required : bool Whether file must always be present, or is an optional dataset. Default is True. register : bool Whether to register the input file. Default is False for input files, True for output files. revision : str An optional revision. unique : bool Whether identifier pattern is expected to match a single dataset or several. Returns ------- Input, OptionalInput An instance of an Input dataset if required is true, otherwise an OptionalInput. """ Class = Input if required else OptionalInput obj = Class(name, collection, register=register, **kwargs) return obj
[docs] @staticmethod def output(name, collection, required=True, register=True, **kwargs): """ Create an expected output dataset. By default, expected output datasets are automatically registered. Parameters ---------- name : str A dataset name or glob pattern. collection : str, None An ALF collection or pattern. required : bool Whether file must always be present, or is an optional dataset. Default is True. register : bool Whether to register the output file. Default is False for input files, True for output files. revision : str An optional revision. unique : bool Whether identifier pattern is expected to match a single dataset or several. Returns ------- Output, OptionalOutput An instance of an Output dataset if required is true, otherwise an OptionalOutput. """ Class = Output if required else OptionalOutput obj = Class(name, collection, register=register, **kwargs) return obj
[docs] class OptionalDataset(ExpectedDataset): """An expected dataset that is not strictly required."""
[docs] def find_files(self, session_path, register=False): """Find files on disk. Uses glob patterns to find dataset(s) on disk. Parameters ---------- session_path : pathlib.Path, str A session path within which to glob for the dataset(s). register : bool Only return files intended to be registered. Returns ------- True Always True as dataset is optional. list of pathlib.Path A list of matching dataset files. missing, None, str, set of str One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). Notes ----- - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised although this may change in the future. - If `register` is false, all files are returned regardless of whether they are intended to be registered. - If `inverted` is true, and files are found, the glob pattern is returned as missing. - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing patterns. - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. """ ok, actual_files, missing = super().find_files(session_path, register=register) return True, actual_files, missing
[docs] def filter(self, session_datasets, **kwargs): """Filter dataset frame by expected datasets. Parameters ---------- session_datasets : pandas.DataFrame An data frame of session datasets. kwargs Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, ignore_qc_not_set, and assert_unique. Returns ------- True Always True as dataset is optional. pandas.DataFrame A filtered data frame of containing the expected dataset(s). """ ok, datasets = super().filter(session_datasets, **kwargs) return True, datasets
[docs] class Input(ExpectedDataset): """An expected input dataset.""" pass
[docs] class OptionalInput(Input, OptionalDataset): """An optional expected input dataset.""" pass
[docs] class Output(ExpectedDataset): """An expected output dataset.""" pass
[docs] class OptionalOutput(Output, OptionalDataset): """An optional expected output dataset.""" pass
def _parse_signature(signature): """ Ensure all a signature's expected datasets are instances of ExpectedDataset. Parameters ---------- signature : Dict[str, list] A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or ExpectedDataset instances. Returns ------- Dict[str, list of ExpectedDataset] A dict containing all tuples converted to ExpectedDataset instances. """ I, O = ExpectedDataset.input, ExpectedDataset.output # noqa inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] return {'input_files': inputs, 'output_files': outputs}
[docs] def dataset_from_name(name, datasets): """ From a list of ExpectedDataset instances, return those that match a given name. Parameters ---------- name : str The name of the dataset. datasets : list of ExpectedDataset A list of ExpectedDataset instances. Returns ------- list of ExpectedDataset The ExpectedDataset instances that match the given name. """ matches = [] for dataset in datasets: if dataset.operator is None: if dataset._identifiers[2] == name: matches.append(dataset) else: matches.extend(dataset_from_name(name, dataset._identifiers)) return matches
[docs] def update_collections(dataset, new_collection, substring=None, unique=None): """ Update the collection of a dataset. This updates all nested ExpectedDataset instances with the new collection and returns copies. Parameters ---------- dataset : ExpectedDataset The dataset to update. new_collection : str, list of str The new collection or collections. substring : str, optional An optional substring in the collection to replace with new collection(s). If None, the entire collection will be replaced. Returns ------- ExpectedDataset A copy of the dataset with the updated collection(s). """ after = ensure_list(new_collection) D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output if dataset.operator is None: collection, revision, name = dataset.identifiers if revision is not None: raise NotImplementedError if substring: after = [(collection or '').replace(substring, x) or None for x in after] if unique is None: unique = [not set(name + (x or '')).intersection('*[?') for x in after] else: unique = [unique] * len(after) register = dataset.register updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) if len(after) > 1: for folder, unq in zip(after[1:], unique[1:]): updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) else: updated = copy(dataset) updated._identifiers = [update_collections(dd, new_collection, substring, unique) for dd in updated._identifiers] return updated
[docs] class DataHandler(abc.ABC): def __init__(self, session_path, signature, one=None): """ Base data handler class :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ self.session_path = session_path self.signature = _parse_signature(signature) self.one = one self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records)
[docs] def setUp(self, **kwargs): """Function to optionally overload to download required data to run task.""" pass
[docs] def getData(self, one=None): """Finds the datasets required for task based on input signatures. Parameters ---------- one : one.api.One, optional An instance of ONE to use. Returns ------- pandas.DataFrame, None A data frame of required datasets. An empty frame is returned if no registered datasets are required, while None is returned if no instance of ONE is set. """ if self.one is None and one is None: return one = one or self.one session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates()
[docs] def getOutputFiles(self): """ Return a data frame of output datasets found on disk. Returns ------- pandas.DataFrame A dataset data frame of datasets on disk that were specified in signature['output_files']. """ assert self.session_path # Next convert datasets to frame # Create dataframe of all ALF datasets df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id']) # Filter outputs if len(self.signature['output_files']) == 0: return pd.DataFrame() present = [file.filter(df)[1] for file in self.signature['output_files']] return pd.concat(present).droplevel('eid')
[docs] def uploadData(self, outputs, version): """ Function to optionally overload to upload and register data :param outputs: output files from task to register :param version: ibllib version :return: """ if isinstance(outputs, list): versions = [version for _ in outputs] else: versions = [version] return versions
[docs] def cleanUp(self, **kwargs): """Function to optionally overload to clean up files after running task.""" pass
[docs] class LocalDataHandler(DataHandler): def __init__(self, session_path, signatures, one=None): """ Data handler for running tasks locally, with no architecture or db connection :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ super().__init__(session_path, signatures, one=one)
[docs] class ServerDataHandler(DataHandler): def __init__(self, session_path, signatures, one=None): """ Data handler for running tasks on lab local servers when all data is available locally :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ super().__init__(session_path, signatures, one=one)
[docs] def uploadData(self, outputs, version, clobber=False, **kwargs): """ Upload and/or register output data. This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`. Parameters ---------- outputs : list of pathlib.Path A set of ALF paths to register to Alyx. version : str, list of str The version of ibllib used to generate these output files. clobber : bool If True, re-upload outputs that have already been passed to this method. kwargs Optional keyword arguments for one.registration.RegistrationClient.register_files. Returns ------- list of dicts, dict A list of newly created Alyx dataset records or the registration data if dry. """ versions = super().uploadData(outputs, version) data_repo = get_local_data_repository(self.one.alyx) # If clobber = False, do not re-upload the outputs that have already been processed outputs = ensure_list(outputs) to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] if kwargs.get('dry', False): return records # Store processed outputs self.processed.update({k: v for k, v in zip(to_upload, records) if v}) return [self.processed[x] for x in outputs if x in self.processed]
[docs] def cleanUp(self, **_): """Empties and returns the processed dataset mep.""" super().cleanUp() processed = self.processed self.processed = {} return processed
[docs] class ServerGlobusDataHandler(DataHandler): def __init__(self, session_path, signatures, one=None): """ Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus :param session_path: path to session :param signatures: input and output file signatures :param one: ONE instance """ from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa super().__init__(session_path, signatures, one=one) self.globus = Globus(client_name='server', headless=True) # on local servers set up the local root path manually as some have different globus config paths self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects' # Find the lab self.lab = get_lab(self.session_path, self.one.alyx) # For cortex lab we need to get the endpoint from the ibl alyx if self.lab == 'cortexlab': alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None) self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx) else: self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx) self.local_paths = []
[docs] def setUp(self, **_): """Function to download necessary data to run tasks using globus-sdk.""" if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode)) else: df = super().getData(one=self.one) if len(df) == 0: # If no datasets found in the cache only work off local file system do not attempt to # download any missing data using Globus return # Check for space on local server. If less that 500 GB don't download new data space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2] if space_free < 500e9: _logger.warning('Space left on server is < 500GB, won\'t re-download new data') return rel_sess_path = '/'.join(self.session_path.parts[-3:]) target_paths = [] source_paths = [] for i, d in df.iterrows(): sess_path = Path(rel_sess_path).joinpath(d['rel_path']) full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path) if not full_local_path.exists(): uuid = i self.local_paths.append(full_local_path) target_paths.append(sess_path) source_paths.append(add_uuid_string(sess_path, uuid)) if len(target_paths) != 0: ts = time() for sp, tp in zip(source_paths, target_paths): _logger.info(f'Downloading {sp} to {tp}') self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths) _logger.debug(f'Complete. Time elapsed {time() - ts}')
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ versions = super().uploadData(outputs, version) data_repo = get_local_data_repository(self.one.alyx) return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs)
[docs] def cleanUp(self, **_): """Clean up, remove the files that were downloaded from Globus once task has completed.""" for file in self.local_paths: os.unlink(file)
[docs] class RemoteEC2DataHandler(DataHandler): def __init__(self, session_path, signature, one=None): """ Data handler for running tasks on remote compute node. Will download missing data via http using ONE :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ super().__init__(session_path, signature, one=one)
[docs] def setUp(self, **_): """ Function to download necessary data to run tasks using ONE :return: """ df = super().getData() self.one._check_filesystem(df)
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task via S3 patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ versions = super().uploadData(outputs, version) s3_patcher = S3Patcher(one=self.one) return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, versions=versions, **kwargs)
[docs] class RemoteHttpDataHandler(DataHandler): def __init__(self, session_path, signature, one=None): """ Data handler for running tasks on remote compute node. Will download missing data via http using ONE :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ super().__init__(session_path, signature, one=one)
[docs] def setUp(self, **_): """ Function to download necessary data to run tasks using ONE :return: """ df = super().getData() self.one._check_filesystem(df)
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task via FTP patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ versions = super().uploadData(outputs, version) ftp_patcher = FTPPatcher(one=self.one) return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, versions=versions, **kwargs)
[docs] class RemoteAwsDataHandler(DataHandler): def __init__(self, session_path, signature, one=None): """ Data handler for running tasks on remote compute node. This will download missing data from the private IBL S3 AWS data bucket. New datasets are uploaded via Globus. :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ super().__init__(session_path, signature, one=one) self.local_paths = []
[docs] def setUp(self, **_): """Function to download necessary data to run tasks using AWS boto3.""" df = super().getData() self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows()))
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task via FTP patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ # Set up Globus from one.remote.globus import Globus # noqa self.globus = Globus(client_name='server', headless=True) self.lab = session_path_parts(self.session_path, as_dict=True)['lab'] if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: base_url = 'https://alyx.internationalbrainlab.org' _logger.warning('Changing Alyx client to %s', base_url) ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode) else: ac = self.one.alyx self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac) # register datasets versions = super().uploadData(outputs, version) response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs) # upload directly via globus source_paths = [] target_paths = [] collections = {} for dset, out in zip(response, outputs): assert Path(out).name == dset['name'] # set flag to false fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) collection = '/'.join(fr['relative_path'].split('/')[:-1]) if collection in collections.keys(): collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}) else: collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}} # Set all exists status to false for server file records self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False}) source_paths.append(out) target_paths.append(add_uuid_string(fr['relative_path'], dset['id'])) if len(target_paths) != 0: ts = time() for sp, tp in zip(source_paths, target_paths): _logger.info(f'Uploading {sp} to {tp}') self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths) _logger.debug(f'Complete. Time elapsed {time() - ts}') for collection, files in collections.items(): globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True) file_names = [str(gl[0]) for gl in globus_files] file_sizes = [gl[1] for gl in globus_files] for name, details in files.items(): try: idx = file_names.index(name) size = file_sizes[idx] if size == details['size']: # update the file record if sizes match self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True}) else: _logger.warning(f'File {name} found on SDSC but sizes do not match') except ValueError: _logger.warning(f'File {name} not found on SDSC') return response
# ftp_patcher = FTPPatcher(one=self.one) # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, # versions=versions, **kwargs)
[docs] def cleanUp(self, task): """Clean up, remove the files that were downloaded from globus once task has completed.""" if task.status == 0: for file in self.local_paths: os.unlink(file)
[docs] class RemoteGlobusDataHandler(DataHandler): """ Data handler for running tasks on remote compute node. Will download missing data using Globus. :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ def __init__(self, session_path, signature, one=None): super().__init__(session_path, signature, one=one)
[docs] def setUp(self, **_): """Function to download necessary data to run tasks using globus.""" # TODO pass
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task via FTP patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ versions = super().uploadData(outputs, version) ftp_patcher = FTPPatcher(one=self.one) return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, versions=versions, **kwargs)
[docs] class SDSCDataHandler(DataHandler): """ Data handler for running tasks on SDSC compute node :param session_path: path to session :param signature: input and output file signatures :param one: ONE instance """ def __init__(self, session_path, signatures, one=None): super().__init__(session_path, signatures, one=one) self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) self.root_path = SDSC_ROOT_PATH
[docs] def setUp(self, task): """Function to create symlinks to necessary data to run tasks.""" df = super().getData() SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__)) session_path = Path(get_alf_path(self.session_path)) for uuid, d in df.iterrows(): file_path = session_path / d['rel_path'] file_uuid = add_uuid_string(file_path, uuid) file_link = SDSC_TMP.joinpath(file_path) file_link.parent.mkdir(exist_ok=True, parents=True) try: file_link.symlink_to( Path(self.root_path.joinpath(file_uuid))) except FileExistsError: pass task.session_path = SDSC_TMP.joinpath(session_path)
[docs] def uploadData(self, outputs, version, **kwargs): """ Function to upload and register data of completed task via SDSC patcher :param outputs: output files from task to register :param version: ibllib version :return: output info of registered datasets """ versions = super().uploadData(outputs, version) sdsc_patcher = SDSCPatcher(one=self.one) return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs)
[docs] def cleanUp(self, task): """Function to clean up symlinks created to run task.""" assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] shutil.rmtree(task.session_path)
[docs] class PopeyeDataHandler(SDSCDataHandler): def __init__(self, session_path, signatures, one=None): super().__init__(session_path, signatures, one=one) self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/")) self.root_path = Path("/mnt/sdceph/users/ibl/data")
[docs] def uploadData(self, outputs, version, **kwargs): raise NotImplementedError( "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task." )
[docs] def cleanUp(self, **_): """Symlinks are preserved until registration.""" pass