Source code for one.util

"""Decorators and small standalone functions for api module"""
import logging
import urllib.parse
from functools import wraps
from typing import Sequence, Union, Iterable, Optional, List
from collections.abc import Mapping
import fnmatch

import pandas as pd
from iblutil.io import parquet
import numpy as np

import one.alf.exceptions as alferr
from one.alf.files import rel_path_parts, get_session_path
from one.alf.spec import FILE_SPEC, regex as alf_regex
import one.alf.io as alfio

logger = logging.getLogger(__name__)


[docs]def Listable(t): return Union[t, Sequence[t]]
[docs]def ses2records(ses: dict) -> [pd.Series, pd.DataFrame]: """Extract session cache record and datasets cache from a remote session data record TODO Fix for new tables; use to update caches from remote queries :param ses: session dictionary from rest endpoint :return: session record, datasets frame """ # Extract session record eid = parquet.str2np(ses['url'][-36:]) session_keys = ('subject', 'start_time', 'lab', 'number', 'task_protocol', 'project') session_data = {k: v for k, v in ses.items() if k in session_keys} # session_data['id_0'], session_data['id_1'] = eid.flatten().tolist() session = ( (pd.Series(data=session_data, name=tuple(eid.flatten())) .rename({'start_time': 'date'}, axis=1)) ) session['date'] = session['date'][:10] # Extract datasets table def _to_record(d): rec = dict(file_size=d['file_size'], hash=d['hash'], exists=True) rec['id_0'], rec['id_1'] = parquet.str2np(d['id']).flatten().tolist() rec['eid_0'], rec['eid_1'] = session.name file_path = urllib.parse.urlsplit(d['data_url'], allow_fragments=False).path.strip('/') file_path = alfio.remove_uuid_file(file_path, dry=True).as_posix() rec['session_path'] = get_session_path(file_path).as_posix() rec['rel_path'] = file_path[len(rec['session_path']):].strip('/') if 'default_revision' in d: rec['default_revision'] = d['default_revision'] == 'True' return rec records = map(_to_record, ses['data_dataset_session_related']) datasets = pd.DataFrame(records).set_index(['id_0', 'id_1']).sort_index() return session, datasets
[docs]def datasets2records(datasets) -> pd.DataFrame: """Extract datasets DataFrame from one or more Alyx dataset records Parameters ---------- datasets : dict, list One or more records from the Alyx 'datasets' endpoint Returns ------- Datasets frame Examples -------- datasets = one.alyx.rest('datasets', 'list', subject='foobar') df = datasets2records(datasets) """ records = [] for d in ensure_list(datasets): file_record = next((x for x in d['file_records'] if x['data_url'] and x['exists']), None) if not file_record: continue # Ignore files that are not accessible rec = dict(file_size=d['file_size'], hash=d['hash'], exists=True) rec['id_0'], rec['id_1'] = parquet.str2np(d['url'][-36:]).flatten().tolist() rec['eid_0'], rec['eid_1'] = parquet.str2np(d['session'][-36:]).flatten().tolist() data_url = urllib.parse.urlsplit(file_record['data_url'], allow_fragments=False) file_path = data_url.path.strip('/') file_path = alfio.remove_uuid_file(file_path, dry=True).as_posix() rec['session_path'] = get_session_path(file_path).as_posix() rec['rel_path'] = file_path[len(rec['session_path']):].strip('/') rec['default_revision'] = d['default_dataset'] records.append(rec) if not records: keys = ('id_0', 'id_1', 'eid_0', 'eid_1', 'file_size', 'hash', 'session_path', 'rel_path', 'default_revision') return pd.DataFrame(columns=keys).set_index(['id_0', 'id_1']) return pd.DataFrame(records).set_index(['id_0', 'id_1']).sort_index()
[docs]def parse_id(method): """ Ensures the input experiment identifier is an experiment UUID string Parameters ---------- method : function An ONE method whose second arg is an experiment ID Returns ------- A wrapper function that parses the ID to the expected string """ @wraps(method) def wrapper(self, id, *args, **kwargs): eid = self.to_eid(id) if eid is None: raise ValueError(f'Cannot parse session ID "{id}" (session may not exist)') return method(self, eid, *args, **kwargs) return wrapper
[docs]def refresh(method): """ Refresh cache depending of query_type kwarg """ @wraps(method) def wrapper(self, *args, **kwargs): mode = kwargs.get('query_type', None) if not mode or mode == 'auto': mode = self.mode self.refresh_cache(mode=mode) return method(self, *args, **kwargs) return wrapper
[docs]def validate_date_range(date_range): """ Validates and arrange date range in a 2 elements list Examples: _validate_date_range('2020-01-01') # On this day _validate_date_range(datetime.date(2020, 1, 1)) _validate_date_range(np.array(['2022-01-30', '2022-01-30'], dtype='datetime64[D]')) _validate_date_range(pd.Timestamp(2020, 1, 1)) _validate_date_range(np.datetime64(2021, 3, 11)) _validate_date_range(['2020-01-01']) # from date _validate_date_range(['2020-01-01', None]) # from date _validate_date_range([None, '2020-01-01']) # up to date """ if date_range is None: return # Ensure we have exactly two values if isinstance(date_range, str) or not isinstance(date_range, Iterable): # date_range = (date_range, pd.Timestamp(date_range) + pd.Timedelta(days=1)) dt = pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1) date_range = (date_range, pd.Timestamp(date_range) + dt) elif len(date_range) == 1: date_range = [date_range[0], pd.Timestamp.max] elif len(date_range) != 2: raise ValueError # For comparisons, ensure both values are pd.Timestamp (datetime, date and datetime64 # objects will be converted) start, end = date_range start = start or pd.Timestamp.min # Convert None to lowest possible date end = end or pd.Timestamp.max # Convert None to highest possible date # Convert to timestamp if not isinstance(start, pd.Timestamp): start = pd.Timestamp(start) if not isinstance(end, pd.Timestamp): end = pd.Timestamp(end) return start, end
def _collection_spec(collection=None, revision=None): """ Return a template string for a collection/revision regular expression. Because both are optional in the ALF spec, None will match any (including absent), while an empty string will match absent. Parameters ---------- collection : None, str An optional collection regular expression revision : None, str An optional revision regular expression Returns ------- A string format for matching the collection/revision """ spec = '' for value, default in zip((collection, revision), ('{collection}/', '#{revision}#/')): if not value: default = f'({default})?' if value is None else '' spec += default return spec
[docs]def filter_datasets(all_datasets, filename=None, collection=None, revision=None, revision_last_before=True, assert_unique=True, wildcards=False): """ Filter the datasets cache table by the relative path (dataset name, collection and revision). When None is passed, all values will match. To match on empty parts, use an empty string. When revision_last_before is true, None means return latest revision. Parameters ---------- all_datasets : pandas.DataFrame A datasets cache table filename : str, dict, None A filename str or a dict of alf parts. Regular expressions permitted. collection : str, None A collection string. Regular expressions permitted. revision : str, None A revision string to match. If revision_last_before is true, regular expressions are not permitted. revision_last_before : bool When true and no exact match exists, the (lexicographically) previous revision is used instead. When false the revision string is matched like collection and filename, with regular expressions permitted. assert_unique : bool When true an error is raised if multiple collections or datasets are found wildcards : bool If true, use unix shell style matching instead of regular expressions Returns ------- A slice of all_datasets that match the filters Examples -------- # Filter by dataset name and collection datasets = filter_datasets(all_datasets, '*.spikes.times.*', 'alf/probe00') # Filter datasets not in a collection datasets = filter_datasets(all_datasets, collection='') # Filter by matching revision datasets = filter_datasets(all_datasets, 'spikes.times.npy', revision='2020-01-12', revision_last_before=False) # Filter by filename parts datasets = filter_datasets(all_datasets, {object='spikes', attribute='times'}) """ # Create a regular expression string to match relative path against filename = filename or {} regex_args = {'collection': collection} spec_str = _collection_spec(collection, None if revision_last_before else revision) if isinstance(filename, dict): spec_str += FILE_SPEC regex_args.update(**filename) else: # Convert to regex is necessary and assert end of string spec_str += fnmatch.translate(filename) if wildcards else filename + '$' # If matching revision name, add to regex string if not revision_last_before: regex_args.update(revision=revision) for k, v in regex_args.items(): if v is None: continue if wildcards: # Convert to regex, remove \\Z which asserts end of string v = (fnmatch.translate(x).replace('\\Z', '') for x in ensure_list(v)) if not isinstance(v, str): regex_args[k] = '|'.join(v) # logical OR # Build regex string pattern = alf_regex('^' + spec_str, **regex_args) match = all_datasets[all_datasets['rel_path'].str.match(pattern)] if len(match) == 0 or not (revision_last_before or assert_unique): return match revisions = [rel_path_parts(x)[1] or '' for x in match.rel_path.values] if assert_unique: collections = set(rel_path_parts(x)[0] or '' for x in match.rel_path.values) if len(collections) > 1: _list = '"' + '", "'.join(collections) + '"' raise alferr.ALFMultipleCollectionsFound(_list) if filename and len(match) > 1: _list = '"' + '", "'.join(match['rel_path']) + '"' raise alferr.ALFMultipleObjectsFound(_list) if not revision_last_before: if len(set(revisions)) > 1: _list = '"' + '", "'.join(set(revisions)) + '"' raise alferr.ALFMultipleRevisionsFound(_list) else: return match return filter_revision_last_before(match, revision, assert_unique=assert_unique)
[docs]def filter_revision_last_before(datasets, revision=None, assert_unique=True): """ Filter datasets by revision, returning previous revision in ordered list if revision doesn't exactly match. Parameters ---------- datasets : pandas.DataFrame A datasets cache table revision : str A revision string to match (regular expressions not permitted) assert_unique : bool When true an alferr.ALFMultipleRevisionsFound exception is raised when multiple default revisions are found; an alferr.ALFError when no default revision is found Returns ------- A datasets DataFrame with 0 or 1 row per unique dataset """ def _last_before(df): """Takes a DataFrame with only one dataset and multiple revisions, returns matching row""" if revision is None and 'default_revision' in df.columns: if assert_unique and sum(df.default_revision) > 1: revisions = df['revision'][df.default_revision.values] rev_list = '"' + '", "'.join(revisions) + '"' raise alferr.ALFMultipleRevisionsFound(rev_list) if sum(df.default_revision) == 1: return df[df.default_revision] # default_revision column all False; default doesn't isn't copied to remote repository dset_name = df['rel_path'].iloc[0] if assert_unique: raise alferr.ALFError(f'No default revision for dataset {dset_name}') else: logger.warning(f'No default revision for dataset {dset_name}; using most recent') # Compare revisions lexicographically if assert_unique and len(df['revision'].unique()) > 1: rev_list = '"' + '", "'.join(df['revision'].unique()) + '"' raise alferr.ALFMultipleRevisionsFound(rev_list) # Square brackets forces 1 row DataFrame returned instead of Series idx = index_last_before(df['revision'].tolist(), revision) # return df.iloc[slice(0, 0) if idx is None else [idx], :] return df.iloc[slice(0, 0) if idx is None else [idx], :] with pd.option_context('mode.chained_assignment', None): # FIXME Explicitly copy? datasets['revision'] = [rel_path_parts(x)[1] or '' for x in datasets.rel_path] groups = datasets.rel_path.str.replace('#.*#/', '', regex=True).values grouped = datasets.groupby(groups, group_keys=False) return grouped.apply(_last_before)
[docs]def index_last_before(revisions: List[str], revision: Optional[str]) -> Optional[int]: """ Returns the index of string that occurs directly before the provided revision string when lexicographic sorted. If revision is None, the index of the most recent revision is returned. Parameters ---------- revisions : list of strings A list of revision strings revision : None, str The revision string to match on Returns ------- Index of revision before matching string in sorted list or None Examples -------- idx = _index_last_before([], '2020-08-01') """ if len(revisions) == 0: return # No revisions, just return revisions_sorted = sorted(revisions, reverse=True) if revision is None: # Return most recent revision return revisions.index(revisions_sorted[0]) lt = np.array(revisions_sorted) <= revision return revisions.index(revisions_sorted[lt.argmax()]) if any(lt) else None
[docs]def autocomplete(term, search_terms): """ Validate search term and return complete name, e.g. autocomplete('subj') == 'subject' """ term = term.lower() # Check if term already complete if term in search_terms: return term full_key = (x for x in search_terms if x.lower().startswith(term)) key_ = next(full_key, None) if not key_: raise ValueError(f'Invalid search term "{term}", see `one.search_terms()`') elif next(full_key, None): raise ValueError(f'Ambiguous search term "{term}"') return key_
[docs]def ensure_list(value): """Ensure input is a list""" return [value] if isinstance(value, (str, dict)) or not isinstance(value, Iterable) else value
[docs]class LazyId(Mapping): """ Using a paginated response object or list of session records, extracts eid string when required """ def __init__(self, pg): self._pg = pg def __getitem__(self, item): return self.ses2eid(self._pg.__getitem__(item)) def __len__(self): return self._pg.__len__() def __iter__(self): return map(self.ses2eid, self._pg.__iter__())
[docs] @staticmethod def ses2eid(ses): if isinstance(ses, list): return [LazyId.ses2eid(x) for x in ses] else: return ses.get('id', None) or ses['url'].split('/').pop()