"""Downloading of task dependent datasets and registration of task output datasets.
The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are
present and to register and upload the output datasets. For examples on how to run a task using
specific data handlers, see :func:`ibllib.pipes.tasks`.
"""
import logging
import pandas as pd
from pathlib import Path, PurePosixPath
import shutil
import os
import abc
from time import time
from copy import copy
from one.api import ONE
from one.webclient import AlyxClient
from one.util import filter_datasets
from one.alf.path import add_uuid_string, session_path_parts, get_alf_path
from one.alf.cache import _make_datasets_df
from iblutil.util import flatten, ensure_list
from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher
_logger = logging.getLogger(__name__)
[docs]
class ExpectedDataset:
"""An expected input or output dataset."""
inverted = False
def __init__(self, name, collection, register=None, revision=None, unique=True):
"""
An expected input or output dataset.
NB: This should not be instantiated directly, but rather via the `input` or `output`
static method.
Parameters
----------
name : str, None
A dataset name or glob pattern.
collection : str, None
An ALF collection or pattern.
register : bool
Whether to register the file. Default is False for input files, True for output
files.
revision : str
An optional revision.
unique : bool
Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not
affect the output of `find_files`.
"""
if not (collection is None or isinstance(collection, str)):
collection = '/'.join(collection)
self._identifiers = (collection, revision, name)
self.operator = None
self._register = register or False
self.inverted = False
self.name = None
self.unique = unique
@property
def register(self):
"""bool: whether to register the output file."""
return self._register
@register.setter
def register(self, value):
"""bool: whether to register the output file."""
if self.operator is not None:
raise AttributeError('cannot set register attribute for operator datasets')
self._register = value
@property
def identifiers(self):
"""tuple: the identifying parts of the dataset.
If no operator is applied, the identifiers are (collection, revision, name).
If an operator is applied, a tuple of 3-element tuples is returned.
"""
if self.operator is None:
return self._identifiers
# Flatten nested identifiers into tuple of 3-element tuples
identifiers = []
for x in self._identifiers:
add = identifiers.extend if x.operator else identifiers.append
add(x.identifiers)
return tuple(identifiers)
@property
def glob_pattern(self):
"""str, tuple of str: one or more glob patterns."""
if self.operator is None:
return str(PurePosixPath(*filter(None, self._identifiers)))
else:
return tuple(flatten(x.glob_pattern for x in self._identifiers))
def __repr__(self):
"""Represent the dataset object as a string.
If the `name` property is not None, it is returned, otherwise the identifies are used to
format the name.
"""
name = self.__class__.__name__
if self.name:
return f'<{name}({self.name})>'
if self.operator:
sym = {'or': '|', 'and': '&', 'xor': '^'}
patterns = [d.__repr__() for d in self._identifiers]
pattern = f'{sym[self.operator]:^3}'.join(patterns)
if self.inverted:
pattern = f'~({pattern})'
else:
pattern = ('~' if self.inverted else '') + self.glob_pattern
return f'<{name}({pattern})>'
[docs]
def find_files(self, session_path, register=False):
"""Find files on disk.
Uses glob patterns to find dataset(s) on disk.
Parameters
----------
session_path : pathlib.Path, str
A session path within which to glob for the dataset(s).
register : bool
Only return files intended to be registered.
Returns
-------
bool
True if the dataset is found on disk or is optional.
list of pathlib.Path
A list of matching dataset files.
missing, None, str, set of str
One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
Notes
-----
- Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
although this may change in the future.
- If `register` is false, all files are returned regardless of whether they are intended to be registered.
- If `inverted` is true, and files are found, the glob pattern is returned as missing.
- If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
patterns.
- Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
"""
session_path = Path(session_path)
ok, actual_files, missing = False, [], None
if self.operator is None:
if register and not self.register:
return True, actual_files, missing
actual_files = sorted(session_path.rglob(self.glob_pattern))
# If no revision pattern provided and no files found, search for any revision
if self._identifiers[1] is None and not any(actual_files):
glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2]))
actual_files = sorted(session_path.rglob(glob_pattern))
ok = any(actual_files) != self.inverted
if not ok:
missing = self.glob_pattern
elif self.operator == 'and':
assert len(self._identifiers) == 2
_ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers))
ok = all(_ok)
actual_files = flatten(_actual_files)
missing = set(filter(None, flatten(_missing)))
elif self.operator == 'or':
assert len(self._identifiers) == 2
missing = set()
for d in self._identifiers:
ok, actual_files, _missing = d.find_files(session_path)
if ok:
break
if missing is not None:
missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing)
elif self.operator == 'xor':
assert len(self._identifiers) == 2
_ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers))
ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1
# Return only those datasets that are complete if OK
actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files)
if ok:
missing = set()
elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing
missing = set(flatten(self.glob_pattern))
elif not any(_ok): # return all missing glob patterns if none present
missing = set(filter(None, flatten(_missing)))
elif not isinstance(self.operator, str):
raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
else:
raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
return ok, actual_files, missing
[docs]
def filter(self, session_datasets, **kwargs):
"""Filter dataset frame by expected datasets.
Parameters
----------
session_datasets : pandas.DataFrame
An data frame of session datasets.
kwargs
Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and
ignore_qc_not_set.
Returns
-------
bool
True if the required dataset(s) are present in the data frame.
pandas.DataFrame
A filtered data frame of containing the expected dataset(s).
"""
# ok, datasets = False, session_datasets.iloc[0:0]
if self.operator is None:
collection, revision, file = self._identifiers
if self._identifiers[1] is not None:
raise NotImplementedError('revisions not yet supported')
datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs)
ok = datasets.empty == self.inverted
elif self.operator == 'or':
assert len(self._identifiers) == 2
for d in self._identifiers:
ok, datasets = d.filter(session_datasets, **kwargs)
if ok:
break
elif self.operator == 'xor':
assert len(self._identifiers) == 2
_ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers))
ok = sum(_ok) == 1
if ok:
# Return only those datasets that are complete.
datasets = _datasets[_ok.index(True)]
else:
datasets = pd.concat(_datasets)
elif self.operator == 'and':
assert len(self._identifiers) == 2
_ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers))
ok = all(_ok)
datasets = pd.concat(_datasets)
elif not isinstance(self.operator, str):
raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
else:
raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
return ok, datasets
def _apply_op(self, op, other):
"""Apply an operation between two datasets."""
# Assert both instances of Input or both instances of Output
if not isinstance(other, (self.__class__, tuple)):
raise TypeError(f'logical operations not supported between objects of type '
f'{self.__class__.__name__} and {other.__class__.__name__}')
# Assert operation supported
if op not in {'or', 'xor', 'and'}:
raise ValueError(op)
# Convert tuple to ExpectDataset instance
if isinstance(other, tuple):
D = (self.input if isinstance(self, Input) else self.output)
other = D(*other)
# Returned instance should only be optional if both datasets are optional
is_input = isinstance(self, Input)
if all(isinstance(x, OptionalDataset) for x in (self, other)):
D = OptionalInput if is_input else OptionalOutput
else:
D = Input if is_input else Output
# Instantiate 'empty' object
d = D(None, None)
d._identifiers = (self, other)
d.operator = op
return d
def __invert__(self):
"""Assert dataset doesn't exist on disk."""
obj = copy(self)
obj.inverted = not self.inverted
return obj
def __or__(self, b):
"""Assert either dataset exists or another does, or both exist."""
return self._apply_op('or', b)
def __xor__(self, b):
"""Assert either dataset exists or another does, not both."""
return self._apply_op('xor', b)
def __and__(self, b):
"""Assert that a second dataset exists together with the first."""
return self._apply_op('and', b)
[docs]
@staticmethod
def output(name, collection, required=True, register=True, **kwargs):
"""
Create an expected output dataset.
By default, expected output datasets are automatically registered.
Parameters
----------
name : str
A dataset name or glob pattern.
collection : str, None
An ALF collection or pattern.
required : bool
Whether file must always be present, or is an optional dataset. Default is True.
register : bool
Whether to register the output file. Default is False for input files, True for output
files.
revision : str
An optional revision.
unique : bool
Whether identifier pattern is expected to match a single dataset or several.
Returns
-------
Output, OptionalOutput
An instance of an Output dataset if required is true, otherwise an OptionalOutput.
"""
Class = Output if required else OptionalOutput
obj = Class(name, collection, register=register, **kwargs)
return obj
[docs]
class OptionalDataset(ExpectedDataset):
"""An expected dataset that is not strictly required."""
[docs]
def find_files(self, session_path, register=False):
"""Find files on disk.
Uses glob patterns to find dataset(s) on disk.
Parameters
----------
session_path : pathlib.Path, str
A session path within which to glob for the dataset(s).
register : bool
Only return files intended to be registered.
Returns
-------
True
Always True as dataset is optional.
list of pathlib.Path
A list of matching dataset files.
missing, None, str, set of str
One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
Notes
-----
- Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
although this may change in the future.
- If `register` is false, all files are returned regardless of whether they are intended to be registered.
- If `inverted` is true, and files are found, the glob pattern is returned as missing.
- If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
patterns.
- Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
"""
ok, actual_files, missing = super().find_files(session_path, register=register)
return True, actual_files, missing
[docs]
def filter(self, session_datasets, **kwargs):
"""Filter dataset frame by expected datasets.
Parameters
----------
session_datasets : pandas.DataFrame
An data frame of session datasets.
kwargs
Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc,
ignore_qc_not_set, and assert_unique.
Returns
-------
True
Always True as dataset is optional.
pandas.DataFrame
A filtered data frame of containing the expected dataset(s).
"""
ok, datasets = super().filter(session_datasets, **kwargs)
return True, datasets
[docs]
class Output(ExpectedDataset):
"""An expected output dataset."""
pass
[docs]
class OptionalOutput(Output, OptionalDataset):
"""An optional expected output dataset."""
pass
def _parse_signature(signature):
"""
Ensure all a signature's expected datasets are instances of ExpectedDataset.
Parameters
----------
signature : Dict[str, list]
A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or
ExpectedDataset instances.
Returns
-------
Dict[str, list of ExpectedDataset]
A dict containing all tuples converted to ExpectedDataset instances.
"""
I, O = ExpectedDataset.input, ExpectedDataset.output # noqa
inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']]
outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']]
return {'input_files': inputs, 'output_files': outputs}
[docs]
def dataset_from_name(name, datasets):
"""
From a list of ExpectedDataset instances, return those that match a given name.
Parameters
----------
name : str
The name of the dataset.
datasets : list of ExpectedDataset
A list of ExpectedDataset instances.
Returns
-------
list of ExpectedDataset
The ExpectedDataset instances that match the given name.
"""
matches = []
for dataset in datasets:
if dataset.operator is None:
if dataset._identifiers[2] == name:
matches.append(dataset)
else:
matches.extend(dataset_from_name(name, dataset._identifiers))
return matches
[docs]
def update_collections(dataset, new_collection, substring=None, unique=None):
"""
Update the collection of a dataset.
This updates all nested ExpectedDataset instances with the new collection and returns copies.
Parameters
----------
dataset : ExpectedDataset
The dataset to update.
new_collection : str, list of str
The new collection or collections.
substring : str, optional
An optional substring in the collection to replace with new collection(s). If None, the
entire collection will be replaced.
Returns
-------
ExpectedDataset
A copy of the dataset with the updated collection(s).
"""
after = ensure_list(new_collection)
D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output
if dataset.operator is None:
collection, revision, name = dataset.identifiers
if revision is not None:
raise NotImplementedError
if substring:
after = [(collection or '').replace(substring, x) or None for x in after]
if unique is None:
unique = [not set(name + (x or '')).intersection('*[?') for x in after]
else:
unique = [unique] * len(after)
register = dataset.register
updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0])
if len(after) > 1:
for folder, unq in zip(after[1:], unique[1:]):
updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq)
else:
updated = copy(dataset)
updated._identifiers = [update_collections(dd, new_collection, substring, unique)
for dd in updated._identifiers]
return updated
[docs]
class DataHandler(abc.ABC):
def __init__(self, session_path, signature, one=None):
"""
Base data handler class
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
self.session_path = session_path
self.signature = _parse_signature(signature)
self.one = one
self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records)
[docs]
def setUp(self, **kwargs):
"""Function to optionally overload to download required data to run task."""
pass
[docs]
def getData(self, one=None):
"""Finds the datasets required for task based on input signatures.
Parameters
----------
one : one.api.One, optional
An instance of ONE to use.
Returns
-------
pandas.DataFrame, None
A data frame of required datasets. An empty frame is returned if no registered datasets are required,
while None is returned if no instance of ONE is set.
"""
if self.one is None and one is None:
return
one = one or self.one
session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True)
dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']]
return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs)
[docs]
def getOutputFiles(self):
"""
Return a data frame of output datasets found on disk.
Returns
-------
pandas.DataFrame
A dataset data frame of datasets on disk that were specified in signature['output_files'].
"""
assert self.session_path
# Next convert datasets to frame
# Create dataframe of all ALF datasets
df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id'])
# Filter outputs
if len(self.signature['output_files']) == 0:
return pd.DataFrame()
present = [file.filter(df)[1] for file in self.signature['output_files']]
return pd.concat(present).droplevel('eid')
[docs]
def uploadData(self, outputs, version):
"""
Function to optionally overload to upload and register data
:param outputs: output files from task to register
:param version: ibllib version
:return:
"""
if isinstance(outputs, list):
versions = [version for _ in outputs]
else:
versions = [version]
return versions
[docs]
def cleanUp(self, **kwargs):
"""Function to optionally overload to clean up files after running task."""
pass
[docs]
class LocalDataHandler(DataHandler):
def __init__(self, session_path, signatures, one=None):
"""
Data handler for running tasks locally, with no architecture or db connection
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signatures, one=one)
[docs]
class ServerDataHandler(DataHandler):
def __init__(self, session_path, signatures, one=None):
"""
Data handler for running tasks on lab local servers when all data is available locally
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signatures, one=one)
[docs]
def uploadData(self, outputs, version, clobber=False, **kwargs):
"""
Upload and/or register output data.
This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`.
Parameters
----------
outputs : list of pathlib.Path
A set of ALF paths to register to Alyx.
version : str, list of str
The version of ibllib used to generate these output files.
clobber : bool
If True, re-upload outputs that have already been passed to this method.
kwargs
Optional keyword arguments for one.registration.RegistrationClient.register_files.
Returns
-------
list of dicts, dict
A list of newly created Alyx dataset records or the registration data if dry.
"""
versions = super().uploadData(outputs, version)
data_repo = get_local_data_repository(self.one.alyx)
# If clobber = False, do not re-upload the outputs that have already been processed
outputs = ensure_list(outputs)
to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs))
records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or []
if kwargs.get('dry', False):
return records
# Store processed outputs
self.processed.update({k: v for k, v in zip(to_upload, records) if v})
return [self.processed[x] for x in outputs if x in self.processed]
[docs]
def cleanUp(self, **_):
"""Empties and returns the processed dataset mep."""
super().cleanUp()
processed = self.processed
self.processed = {}
return processed
[docs]
class ServerGlobusDataHandler(DataHandler):
def __init__(self, session_path, signatures, one=None):
"""
Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus
:param session_path: path to session
:param signatures: input and output file signatures
:param one: ONE instance
"""
from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa
super().__init__(session_path, signatures, one=one)
self.globus = Globus(client_name='server', headless=True)
# on local servers set up the local root path manually as some have different globus config paths
self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects'
# Find the lab
self.lab = get_lab(self.session_path, self.one.alyx)
# For cortex lab we need to get the endpoint from the ibl alyx
if self.lab == 'cortexlab':
alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None)
self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx)
else:
self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx)
self.local_paths = []
[docs]
def setUp(self, **_):
"""Function to download necessary data to run tasks using globus-sdk."""
if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode))
else:
df = super().getData(one=self.one)
if len(df) == 0:
# If no datasets found in the cache only work off local file system do not attempt to
# download any missing data using Globus
return
# Check for space on local server. If less that 500 GB don't download new data
space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2]
if space_free < 500e9:
_logger.warning('Space left on server is < 500GB, won\'t re-download new data')
return
rel_sess_path = '/'.join(self.session_path.parts[-3:])
target_paths = []
source_paths = []
for i, d in df.iterrows():
sess_path = Path(rel_sess_path).joinpath(d['rel_path'])
full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path)
if not full_local_path.exists():
uuid = i
self.local_paths.append(full_local_path)
target_paths.append(sess_path)
source_paths.append(add_uuid_string(sess_path, uuid))
if len(target_paths) != 0:
ts = time()
for sp, tp in zip(source_paths, target_paths):
_logger.info(f'Downloading {sp} to {tp}')
self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths)
_logger.debug(f'Complete. Time elapsed {time() - ts}')
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
data_repo = get_local_data_repository(self.one.alyx)
return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs)
[docs]
def cleanUp(self, **_):
"""Clean up, remove the files that were downloaded from Globus once task has completed."""
for file in self.local_paths:
os.unlink(file)
[docs]
class RemoteEC2DataHandler(DataHandler):
def __init__(self, session_path, signature, one=None):
"""
Data handler for running tasks on remote compute node. Will download missing data via http using ONE
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signature, one=one)
[docs]
def setUp(self, **_):
"""
Function to download necessary data to run tasks using ONE
:return:
"""
df = super().getData()
self.one._check_filesystem(df)
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via S3 patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
s3_patcher = S3Patcher(one=self.one)
return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user,
versions=versions, **kwargs)
[docs]
class RemoteHttpDataHandler(DataHandler):
def __init__(self, session_path, signature, one=None):
"""
Data handler for running tasks on remote compute node. Will download missing data via http using ONE
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signature, one=one)
[docs]
def setUp(self, **_):
"""
Function to download necessary data to run tasks using ONE
:return:
"""
df = super().getData()
self.one._check_filesystem(df)
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via FTP patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
ftp_patcher = FTPPatcher(one=self.one)
return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
versions=versions, **kwargs)
[docs]
class RemoteAwsDataHandler(DataHandler):
def __init__(self, session_path, signature, one=None):
"""
Data handler for running tasks on remote compute node.
This will download missing data from the private IBL S3 AWS data bucket. New datasets are
uploaded via Globus.
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signature, one=one)
self.local_paths = []
[docs]
def setUp(self, **_):
"""Function to download necessary data to run tasks using AWS boto3."""
df = super().getData()
self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows()))
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via FTP patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
# Set up Globus
from one.remote.globus import Globus # noqa
self.globus = Globus(client_name='server', headless=True)
self.lab = session_path_parts(self.session_path, as_dict=True)['lab']
if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
base_url = 'https://alyx.internationalbrainlab.org'
_logger.warning('Changing Alyx client to %s', base_url)
ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode)
else:
ac = self.one.alyx
self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac)
# register datasets
versions = super().uploadData(outputs, version)
response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs)
# upload directly via globus
source_paths = []
target_paths = []
collections = {}
for dset, out in zip(response, outputs):
assert Path(out).name == dset['name']
# set flag to false
fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
collection = '/'.join(fr['relative_path'].split('/')[:-1])
if collection in collections.keys():
collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}})
else:
collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}
# Set all exists status to false for server file records
self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False})
source_paths.append(out)
target_paths.append(add_uuid_string(fr['relative_path'], dset['id']))
if len(target_paths) != 0:
ts = time()
for sp, tp in zip(source_paths, target_paths):
_logger.info(f'Uploading {sp} to {tp}')
self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths)
_logger.debug(f'Complete. Time elapsed {time() - ts}')
for collection, files in collections.items():
globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True)
file_names = [str(gl[0]) for gl in globus_files]
file_sizes = [gl[1] for gl in globus_files]
for name, details in files.items():
try:
idx = file_names.index(name)
size = file_sizes[idx]
if size == details['size']:
# update the file record if sizes match
self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True})
else:
_logger.warning(f'File {name} found on SDSC but sizes do not match')
except ValueError:
_logger.warning(f'File {name} not found on SDSC')
return response
# ftp_patcher = FTPPatcher(one=self.one)
# return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
# versions=versions, **kwargs)
[docs]
def cleanUp(self, task):
"""Clean up, remove the files that were downloaded from globus once task has completed."""
if task.status == 0:
for file in self.local_paths:
os.unlink(file)
[docs]
class RemoteGlobusDataHandler(DataHandler):
"""
Data handler for running tasks on remote compute node. Will download missing data using Globus.
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
def __init__(self, session_path, signature, one=None):
super().__init__(session_path, signature, one=one)
[docs]
def setUp(self, **_):
"""Function to download necessary data to run tasks using globus."""
# TODO
pass
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via FTP patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
ftp_patcher = FTPPatcher(one=self.one)
return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
versions=versions, **kwargs)
[docs]
class SDSCDataHandler(DataHandler):
"""
Data handler for running tasks on SDSC compute node
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
def __init__(self, session_path, signatures, one=None):
super().__init__(session_path, signatures, one=one)
self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH)
self.root_path = SDSC_ROOT_PATH
[docs]
def setUp(self, task):
"""Function to create symlinks to necessary data to run tasks."""
df = super().getData()
SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__))
session_path = Path(get_alf_path(self.session_path))
for uuid, d in df.iterrows():
file_path = session_path / d['rel_path']
file_uuid = add_uuid_string(file_path, uuid)
file_link = SDSC_TMP.joinpath(file_path)
file_link.parent.mkdir(exist_ok=True, parents=True)
try:
file_link.symlink_to(
Path(self.root_path.joinpath(file_uuid)))
except FileExistsError:
pass
task.session_path = SDSC_TMP.joinpath(session_path)
[docs]
def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via SDSC patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
sdsc_patcher = SDSCPatcher(one=self.one)
return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs)
[docs]
def cleanUp(self, task):
"""Function to clean up symlinks created to run task."""
assert self.patch_path.parts[0:4] == task.session_path.parts[0:4]
shutil.rmtree(task.session_path)
[docs]
class PopeyeDataHandler(SDSCDataHandler):
def __init__(self, session_path, signatures, one=None):
super().__init__(session_path, signatures, one=one)
self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/"))
self.root_path = Path("/mnt/sdceph/users/ibl/data")
[docs]
def uploadData(self, outputs, version, **kwargs):
raise NotImplementedError(
"Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task."
)
[docs]
def cleanUp(self, **_):
"""Symlinks are preserved until registration."""
pass