"""Decorators and small standalone functions for api module"""
import logging
import urllib.parse
from functools import wraps
from typing import Sequence, Union, Iterable, Optional, List
from collections.abc import Mapping
import fnmatch
import pandas as pd
from iblutil.io import parquet
import numpy as np
import one.alf.exceptions as alferr
from one.alf.files import rel_path_parts, get_session_path
from one.alf.spec import FILE_SPEC, regex as alf_regex
import one.alf.io as alfio
logger = logging.getLogger(__name__)
[docs]def Listable(t):
return Union[t, Sequence[t]]
[docs]def ses2records(ses: dict) -> [pd.Series, pd.DataFrame]:
"""Extract session cache record and datasets cache from a remote session data record
TODO Fix for new tables; use to update caches from remote queries
:param ses: session dictionary from rest endpoint
:return: session record, datasets frame
"""
# Extract session record
eid = parquet.str2np(ses['url'][-36:])
session_keys = ('subject', 'start_time', 'lab', 'number', 'task_protocol', 'project')
session_data = {k: v for k, v in ses.items() if k in session_keys}
# session_data['id_0'], session_data['id_1'] = eid.flatten().tolist()
session = (
(pd.Series(data=session_data, name=tuple(eid.flatten()))
.rename({'start_time': 'date'}, axis=1))
)
session['date'] = session['date'][:10]
# Extract datasets table
def _to_record(d):
rec = dict(file_size=d['file_size'], hash=d['hash'], exists=True)
rec['id_0'], rec['id_1'] = parquet.str2np(d['id']).flatten().tolist()
rec['eid_0'], rec['eid_1'] = session.name
file_path = urllib.parse.urlsplit(d['data_url'], allow_fragments=False).path.strip('/')
file_path = alfio.remove_uuid_file(file_path, dry=True).as_posix()
rec['session_path'] = get_session_path(file_path).as_posix()
rec['rel_path'] = file_path[len(rec['session_path']):].strip('/')
if 'default_revision' in d:
rec['default_revision'] = d['default_revision'] == 'True'
return rec
records = map(_to_record, ses['data_dataset_session_related'])
datasets = pd.DataFrame(records).set_index(['id_0', 'id_1']).sort_index()
return session, datasets
[docs]def datasets2records(datasets) -> pd.DataFrame:
"""Extract datasets DataFrame from one or more Alyx dataset records
Parameters
----------
datasets : dict, list
One or more records from the Alyx 'datasets' endpoint
Returns
-------
Datasets frame
Examples
--------
datasets = one.alyx.rest('datasets', 'list', subject='foobar')
df = datasets2records(datasets)
"""
records = []
for d in ensure_list(datasets):
file_record = next((x for x in d['file_records'] if x['data_url'] and x['exists']), None)
if not file_record:
continue # Ignore files that are not accessible
rec = dict(file_size=d['file_size'], hash=d['hash'], exists=True)
rec['id_0'], rec['id_1'] = parquet.str2np(d['url'][-36:]).flatten().tolist()
rec['eid_0'], rec['eid_1'] = parquet.str2np(d['session'][-36:]).flatten().tolist()
data_url = urllib.parse.urlsplit(file_record['data_url'], allow_fragments=False)
file_path = data_url.path.strip('/')
file_path = alfio.remove_uuid_file(file_path, dry=True).as_posix()
rec['session_path'] = get_session_path(file_path).as_posix()
rec['rel_path'] = file_path[len(rec['session_path']):].strip('/')
rec['default_revision'] = d['default_dataset']
records.append(rec)
if not records:
keys = ('id_0', 'id_1', 'eid_0', 'eid_1', 'file_size', 'hash', 'session_path',
'rel_path', 'default_revision')
return pd.DataFrame(columns=keys).set_index(['id_0', 'id_1'])
return pd.DataFrame(records).set_index(['id_0', 'id_1']).sort_index()
[docs]def parse_id(method):
"""
Ensures the input experiment identifier is an experiment UUID string
Parameters
----------
method : function
An ONE method whose second arg is an experiment ID
Returns
-------
A wrapper function that parses the ID to the expected string
"""
@wraps(method)
def wrapper(self, id, *args, **kwargs):
eid = self.to_eid(id)
if eid is None:
raise ValueError(f'Cannot parse session ID "{id}" (session may not exist)')
return method(self, eid, *args, **kwargs)
return wrapper
[docs]def refresh(method):
"""
Refresh cache depending of query_type kwarg
"""
@wraps(method)
def wrapper(self, *args, **kwargs):
mode = kwargs.get('query_type', None)
if not mode or mode == 'auto':
mode = self.mode
self.refresh_cache(mode=mode)
return method(self, *args, **kwargs)
return wrapper
[docs]def validate_date_range(date_range):
"""
Validates and arrange date range in a 2 elements list
Examples:
_validate_date_range('2020-01-01') # On this day
_validate_date_range(datetime.date(2020, 1, 1))
_validate_date_range(np.array(['2022-01-30', '2022-01-30'], dtype='datetime64[D]'))
_validate_date_range(pd.Timestamp(2020, 1, 1))
_validate_date_range(np.datetime64(2021, 3, 11))
_validate_date_range(['2020-01-01']) # from date
_validate_date_range(['2020-01-01', None]) # from date
_validate_date_range([None, '2020-01-01']) # up to date
"""
if date_range is None:
return
# Ensure we have exactly two values
if isinstance(date_range, str) or not isinstance(date_range, Iterable):
# date_range = (date_range, pd.Timestamp(date_range) + pd.Timedelta(days=1))
dt = pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
date_range = (date_range, pd.Timestamp(date_range) + dt)
elif len(date_range) == 1:
date_range = [date_range[0], pd.Timestamp.max]
elif len(date_range) != 2:
raise ValueError
# For comparisons, ensure both values are pd.Timestamp (datetime, date and datetime64
# objects will be converted)
start, end = date_range
start = start or pd.Timestamp.min # Convert None to lowest possible date
end = end or pd.Timestamp.max # Convert None to highest possible date
# Convert to timestamp
if not isinstance(start, pd.Timestamp):
start = pd.Timestamp(start)
if not isinstance(end, pd.Timestamp):
end = pd.Timestamp(end)
return start, end
def _collection_spec(collection=None, revision=None):
"""
Return a template string for a collection/revision regular expression. Because both are
optional in the ALF spec, None will match any (including absent), while an empty string will
match absent.
Parameters
----------
collection : None, str
An optional collection regular expression
revision : None, str
An optional revision regular expression
Returns
-------
A string format for matching the collection/revision
"""
spec = ''
for value, default in zip((collection, revision), ('{collection}/', '#{revision}#/')):
if not value:
default = f'({default})?' if value is None else ''
spec += default
return spec
[docs]def filter_datasets(all_datasets, filename=None, collection=None, revision=None,
revision_last_before=True, assert_unique=True, wildcards=False):
"""
Filter the datasets cache table by the relative path (dataset name, collection and revision).
When None is passed, all values will match. To match on empty parts, use an empty string.
When revision_last_before is true, None means return latest revision.
Parameters
----------
all_datasets : pandas.DataFrame
A datasets cache table
filename : str, dict, None
A filename str or a dict of alf parts. Regular expressions permitted.
collection : str, None
A collection string. Regular expressions permitted.
revision : str, None
A revision string to match. If revision_last_before is true, regular expressions are
not permitted.
revision_last_before : bool
When true and no exact match exists, the (lexicographically) previous revision is used
instead. When false the revision string is matched like collection and filename,
with regular expressions permitted.
assert_unique : bool
When true an error is raised if multiple collections or datasets are found
wildcards : bool
If true, use unix shell style matching instead of regular expressions
Returns
-------
A slice of all_datasets that match the filters
Examples
--------
# Filter by dataset name and collection
datasets = filter_datasets(all_datasets, '*.spikes.times.*', 'alf/probe00')
# Filter datasets not in a collection
datasets = filter_datasets(all_datasets, collection='')
# Filter by matching revision
datasets = filter_datasets(all_datasets, 'spikes.times.npy',
revision='2020-01-12', revision_last_before=False)
# Filter by filename parts
datasets = filter_datasets(all_datasets, {object='spikes', attribute='times'})
"""
# Create a regular expression string to match relative path against
filename = filename or {}
regex_args = {'collection': collection}
spec_str = _collection_spec(collection, None if revision_last_before else revision)
if isinstance(filename, dict):
spec_str += FILE_SPEC
regex_args.update(**filename)
else:
# Convert to regex is necessary and assert end of string
spec_str += fnmatch.translate(filename) if wildcards else filename + '$'
# If matching revision name, add to regex string
if not revision_last_before:
regex_args.update(revision=revision)
for k, v in regex_args.items():
if v is None:
continue
if wildcards:
# Convert to regex, remove \\Z which asserts end of string
v = (fnmatch.translate(x).replace('\\Z', '') for x in ensure_list(v))
if not isinstance(v, str):
regex_args[k] = '|'.join(v) # logical OR
# Build regex string
pattern = alf_regex('^' + spec_str, **regex_args)
match = all_datasets[all_datasets['rel_path'].str.match(pattern)]
if len(match) == 0 or not (revision_last_before or assert_unique):
return match
revisions = [rel_path_parts(x)[1] or '' for x in match.rel_path.values]
if assert_unique:
collections = set(rel_path_parts(x)[0] or '' for x in match.rel_path.values)
if len(collections) > 1:
_list = '"' + '", "'.join(collections) + '"'
raise alferr.ALFMultipleCollectionsFound(_list)
if filename and len(match) > 1:
_list = '"' + '", "'.join(match['rel_path']) + '"'
raise alferr.ALFMultipleObjectsFound(_list)
if not revision_last_before:
if len(set(revisions)) > 1:
_list = '"' + '", "'.join(set(revisions)) + '"'
raise alferr.ALFMultipleRevisionsFound(_list)
else:
return match
return filter_revision_last_before(match, revision, assert_unique=assert_unique)
[docs]def filter_revision_last_before(datasets, revision=None, assert_unique=True):
"""
Filter datasets by revision, returning previous revision in ordered list if revision
doesn't exactly match.
Parameters
----------
datasets : pandas.DataFrame
A datasets cache table
revision : str
A revision string to match (regular expressions not permitted)
assert_unique : bool
When true an alferr.ALFMultipleRevisionsFound exception is raised when multiple
default revisions are found; an alferr.ALFError when no default revision is found
Returns
-------
A datasets DataFrame with 0 or 1 row per unique dataset
"""
def _last_before(df):
"""Takes a DataFrame with only one dataset and multiple revisions, returns matching row"""
if revision is None and 'default_revision' in df.columns:
if assert_unique and sum(df.default_revision) > 1:
revisions = df['revision'][df.default_revision.values]
rev_list = '"' + '", "'.join(revisions) + '"'
raise alferr.ALFMultipleRevisionsFound(rev_list)
if sum(df.default_revision) == 1:
return df[df.default_revision]
# default_revision column all False; default doesn't isn't copied to remote repository
dset_name = df['rel_path'].iloc[0]
if assert_unique:
raise alferr.ALFError(f'No default revision for dataset {dset_name}')
else:
logger.warning(f'No default revision for dataset {dset_name}; using most recent')
# Compare revisions lexicographically
if assert_unique and len(df['revision'].unique()) > 1:
rev_list = '"' + '", "'.join(df['revision'].unique()) + '"'
raise alferr.ALFMultipleRevisionsFound(rev_list)
# Square brackets forces 1 row DataFrame returned instead of Series
idx = index_last_before(df['revision'].tolist(), revision)
# return df.iloc[slice(0, 0) if idx is None else [idx], :]
return df.iloc[slice(0, 0) if idx is None else [idx], :]
with pd.option_context('mode.chained_assignment', None): # FIXME Explicitly copy?
datasets['revision'] = [rel_path_parts(x)[1] or '' for x in datasets.rel_path]
groups = datasets.rel_path.str.replace('#.*#/', '', regex=True).values
grouped = datasets.groupby(groups, group_keys=False)
return grouped.apply(_last_before)
[docs]def index_last_before(revisions: List[str], revision: Optional[str]) -> Optional[int]:
"""
Returns the index of string that occurs directly before the provided revision string when
lexicographic sorted. If revision is None, the index of the most recent revision is returned.
Parameters
----------
revisions : list of strings
A list of revision strings
revision : None, str
The revision string to match on
Returns
-------
Index of revision before matching string in sorted list or None
Examples
--------
idx = _index_last_before([], '2020-08-01')
"""
if len(revisions) == 0:
return # No revisions, just return
revisions_sorted = sorted(revisions, reverse=True)
if revision is None: # Return most recent revision
return revisions.index(revisions_sorted[0])
lt = np.array(revisions_sorted) <= revision
return revisions.index(revisions_sorted[lt.argmax()]) if any(lt) else None
[docs]def autocomplete(term, search_terms):
"""
Validate search term and return complete name, e.g. autocomplete('subj') == 'subject'
"""
term = term.lower()
# Check if term already complete
if term in search_terms:
return term
full_key = (x for x in search_terms if x.lower().startswith(term))
key_ = next(full_key, None)
if not key_:
raise ValueError(f'Invalid search term "{term}", see `one.search_terms()`')
elif next(full_key, None):
raise ValueError(f'Ambiguous search term "{term}"')
return key_
[docs]def ensure_list(value):
"""Ensure input is a list"""
return [value] if isinstance(value, (str, dict)) or not isinstance(value, Iterable) else value
[docs]class LazyId(Mapping):
"""
Using a paginated response object or list of session records, extracts eid string when required
"""
def __init__(self, pg):
self._pg = pg
def __getitem__(self, item):
return self.ses2eid(self._pg.__getitem__(item))
def __len__(self):
return self._pg.__len__()
def __iter__(self):
return map(self.ses2eid, self._pg.__iter__())
[docs] @staticmethod
def ses2eid(ses):
if isinstance(ses, list):
return [LazyId.ses2eid(x) for x in ses]
else:
return ses.get('id', None) or ses['url'].split('/').pop()