Source code for one.api

"""Classes for searching, listing and (down)loading ALyx Files
TODO Document
TODO Add sig to ONE Light uuids
TODO Save changes to cache
TODO Fix update cache in AlyxONE - save parquet table
TODO save parquet in update_filesystem

Points of discussion:
    - Module structure: oneibl is too restrictive, naming module `one` means obj should have
    different name
    - Download datasets timeout
    - Support for pids?
    - Need to check performance of 1. (re)setting index, 2. converting object array to 2D int array
    - NB: Sessions table date ordered.  Indexing by eid is therefore O(N) but not done in code.
    Datasets table has sorted index.
    - Conceivably you could have a subclass for Figshare, etc., not just Alyx
"""
import collections.abc
import concurrent.futures
import warnings
import logging
import os
from datetime import datetime, timedelta
from functools import lru_cache, reduce
from inspect import unwrap
from pathlib import Path
from typing import Any, Union, Optional, List
from uuid import UUID

import pandas as pd
import numpy as np
import requests.exceptions
from iblutil.io import parquet, hashfile
from iblutil.util import Bunch
from iblutil.numerical import ismember2d

import one.params
import one.webclient as wc
import one.alf.io as alfio
import one.alf.exceptions as alferr
from .alf.cache import make_parquet_db
from .alf.files import rel_path_parts, get_session_path, get_alf_path
from .alf.spec import is_uuid_string
from one.converters import ConversionMixin
import one.util as util

_logger = logging.getLogger(__name__)

N_THREADS = 4  # number of download threads


[docs]class One(ConversionMixin): _search_terms = ( 'dataset', 'date_range', 'laboratory', 'number', 'project', 'subject', 'task_protocol' ) def __init__(self, cache_dir=None, mode='auto', wildcards=True): """An API for searching and loading data on a local filesystem Parameters ---------- cache_dir : str, Path Path to the data files. If Alyx parameters have been set up for this location, an OneAlyx instance is returned. If data_dir and base_url are None, the default location is used. mode : str Query mode, options include 'auto' (reload cache daily), 'local' (offline) and 'refresh' (always reload cache tables). Most methods have a `query_type` parameter that can override the class mode. """ # get parameters override if inputs provided super().__init__() if not getattr(self, '_cache_dir', None): # May already be set by subclass self._cache_dir = cache_dir or one.params.get_cache_dir() self.cache_expiry = timedelta(hours=24) self.mode = mode self.wildcards = wildcards # Flag indicating whether to use regex or wildcards # init the cache file self._cache = Bunch({'_meta': { 'expired': False, 'created_time': None, 'loaded_time': None, 'raw': {} # map of original table metadata }}) self._load_cache() def __repr__(self): return f'One ({"off" if self.offline else "on"}line, {self._cache_dir})' @property def offline(self): return self.mode == 'local' or not getattr(self, '_web_client', False)
[docs] @util.refresh def search_terms(self, query_type=None): return self._search_terms
def _load_cache(self, cache_dir=None, **kwargs): meta = self._cache['_meta'] INDEX_KEY = 'id' for cache_file in Path(cache_dir or self._cache_dir).glob('*.pqt'): table = cache_file.stem # we need to keep this part fast enough for transient objects cache, meta['raw'][table] = parquet.load(cache_file) if 'date_created' not in meta['raw'][table]: _logger.warning(f"{cache_file} does not appear to be a valid table. Skipping") continue created = datetime.fromisoformat(meta['raw'][table]['date_created']) meta['created_time'] = min([meta['created_time'] or datetime.max, created]) meta['loaded_time'] = datetime.now() meta['expired'] |= datetime.now() - created > self.cache_expiry # Set the appropriate index if none already set if isinstance(cache.index, pd.RangeIndex): num_index = [f'{INDEX_KEY}_{n}' for n in range(2)] try: int_eids = cache[num_index].any(axis=None) except KeyError: int_eids = False cache.set_index(num_index if int_eids else INDEX_KEY, inplace=True) # Check sorted is_sorted = (cache.index.is_monotonic_increasing if isinstance(cache.index, pd.MultiIndex) else True) # Sorting makes MultiIndex indexing O(N) -> O(1) if table == 'datasets' and not is_sorted: cache.sort_index(inplace=True) self._cache[table] = cache if len(self._cache) == 1: # No tables present meta['expired'] = True self._cache.update({'datasets': pd.DataFrame(), 'sessions': pd.DataFrame()}) self._cache['_meta'] = meta return self._cache['_meta']['loaded_time']
[docs] def refresh_cache(self, mode='auto'): """Check and reload cache tables Parameters ---------- mode : str Options are 'local' (don't reload); 'refresh' (reload); 'auto' (reload if expired); 'remote' (don't reload) Returns ------- Loaded timestamp """ if mode in ('local', 'remote'): pass elif mode == 'auto': if datetime.now() - self._cache['_meta']['loaded_time'] >= self.cache_expiry: _logger.info('Cache expired, refreshing') self._load_cache() elif mode == 'refresh': _logger.debug('Forcing reload of cache') self._load_cache(clobber=True) else: raise ValueError(f'Unknown refresh type "{mode}"') return self._cache['_meta']['loaded_time']
def _download_datasets(self, dsets, **kwargs) -> List[Path]: """ Download several datasets given a set of datasets Parameters ---------- dsets : list List of dataset dictionaries from an Alyx REST query OR URL strings Returns ------- A local file path list """ out_files = [] if hasattr(dsets, 'iterrows'): dsets = list(map(lambda x: x[1], dsets.iterrows())) # Timeout based a download speed of 5 Mb/s timeout = reduce(lambda x, y: x + (y.get('file_size', 0) or 0), dsets, 0) / 625000 with concurrent.futures.ThreadPoolExecutor(max_workers=N_THREADS) as executor: # TODO Subclass can just call web client method directly, no need to pass hash, etc. futures = [executor.submit(self._download_dataset, dset, file_size=dset['file_size'], hash=dset['hash'], **kwargs) for dset in dsets] concurrent.futures.wait(futures, timeout=np.ceil(timeout) + 10) for future in futures: out_files.append(future.result()) return out_files def _download_dataset(self, dset, cache_dir=None, **kwargs) -> Path: """ Download a dataset from an alyx REST dictionary Parameters ---------- dset : pandas.Series, dict, str A single dataset dictionary from an Alyx REST query OR URL string cache_dir : str, pathlib.Path The root directory to save the data in (home/downloads by default) Returns ------- The local file path """ pass # pragma: no cover
[docs] def search(self, details=False, query_type=None, **kwargs): """ Searches sessions matching the given criteria and returns a list of matching eids For a list of search terms, use the methods one.search_terms() For all of the search parameters, a single value or list may be provided. For dataset, the sessions returned will contain all listed datasets. For the other parameters, the session must contain at least one of the entries. NB: Wildcards are not permitted, however if wildcards property is False, regular expressions may be used for all but number and date_range. Parameters ---------- dataset : str, list list of dataset names. Returns sessions containing all these datasets. A dataset matches if it contains the search string e.g. 'wheel.position' matches '_ibl_wheel.position.npy' date_range : str, list, datetime.datetime, datetime.date, pandas.timestamp A single date to search or a list of 2 dates that define the range (inclusive). To define only the upper or lower date bound, set the other element to None. lab : str A str or list of lab names, returns sessions from any of these labs number : str, int Number of session to be returned, i.e. number in sequence for a given date subject : str, list A list of subject nicknames, returns sessions for any of these subjects task_protocol : str The task protocol name (can be partial, i.e. any task protocol containing that str will be found) project : str The project name (can be partial, i.e. any task protocol containing that str will be found) details : bool If true also returns a dict of dataset details query_type : str, None Query cache ('local') or Alyx database ('remote') Returns ------- list of eids, if details is True, also returns a list of dictionaries, each entry corresponding to a matching session """ def all_present(x, dsets, exists=True): """Returns true if all datasets present in Series""" return all(any(x.str.contains(y, regex=self.wildcards) & exists) for y in dsets) # Iterate over search filters, reducing the sessions table sessions = self._cache['sessions'] # Ensure sessions filtered in a particular order, with datasets last search_order = ('date_range', 'number', 'dataset') def sort_fcn(itm): return -1 if itm[0] not in search_order else search_order.index(itm[0]) # Validate and get full name for queries search_terms = self.search_terms(query_type='local') queries = {util.autocomplete(k, search_terms): v for k, v in kwargs.items()} for key, value in sorted(queries.items(), key=sort_fcn): # key = util.autocomplete(key) # Validate and get full name # No matches; short circuit if sessions.size == 0: return ([], None) if details else [] # String fields elif key in ('subject', 'task_protocol', 'laboratory', 'project'): query = '|'.join(util.ensure_list(value)) key = 'lab' if key == 'laboratory' else key mask = sessions[key].str.contains(query, regex=self.wildcards) sessions = sessions[mask.astype(bool, copy=False)] elif key == 'date_range': start, end = util.validate_date_range(value) session_date = pd.to_datetime(sessions['date']) sessions = sessions[(session_date >= start) & (session_date <= end)] elif key == 'number': query = util.ensure_list(value) sessions = sessions[sessions[key].isin(map(int, query))] # Dataset check is biggest so this should be done last elif key == 'dataset': index = ['eid_0', 'eid_1'] if self._index_type('datasets') is int else 'eid' query = util.ensure_list(value) datasets = self._cache['datasets'] if self._index_type() is int: isin, _ = ismember2d(datasets[['eid_0', 'eid_1']].values, np.array(sessions.index.values.tolist())) else: isin = datasets['eid'].isin(sessions.index.values) # For each session check any dataset both contains query and exists mask = ( (datasets[isin] .groupby(index, sort=False) .apply(lambda x: all_present(x['rel_path'], query, x['exists']))) ) # eids of matching dataset records idx = mask[mask].index # Reduce sessions table by datasets mask sessions = sessions.loc[idx] # Return results if sessions.size == 0: return ([], None) if details else [] eids = sessions.index.to_list() if self._index_type() is int: eids = parquet.np2str(np.array(eids)) if details: return eids, sessions.reset_index().iloc[:, 2:].to_dict('records', Bunch) else: return eids
def _update_filesystem(self, datasets, offline=None, update_exists=True, clobber=False): """Update the local filesystem for the given datasets Given a set of datasets, check whether records correctly reflect the filesystem. Called by load methods, this returns a list of file paths to load and return. TODO This needs changing; overlaod for downloading? TODO change name to check_files, check_present, present_datasets, check_local_files? check_filesystem? This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to update and save tables. Download_datasets can also call this function. TODO Remove clobber Parameters ---------- datasets : pandas.Series, pandas.DataFrame, list of dicts A list or DataFrame of dataset records offline : bool, None If false and Web client present, downloads the missing datasets from a remote repository update_exists : bool If true, the cache is updated to reflect the filesystem clobber : bool If true and not offline, datasets are re-downloaded regardless of local filesystem Returns ------- A list of file paths for the datasets (None elements for non-existent datasets) """ if offline or self.offline: files = [] if isinstance(datasets, pd.Series): datasets = pd.DataFrame([datasets]) elif not isinstance(datasets, pd.DataFrame): # Cast set of dicts (i.e. from REST datasets endpoint) datasets = pd.DataFrame(list(datasets)) for i, rec in datasets.iterrows(): file = Path(self._cache_dir, *rec[['session_path', 'rel_path']]) if file.exists(): # TODO Factor out; hash & file size also checked in _download_file; # see _update_cache - we need to save changed cache files.append(file) new_hash = hashfile.md5(file) new_size = file.stat().st_size hash_mismatch = rec['hash'] and new_hash != rec['hash'] size_mismatch = rec['file_size'] and new_size != rec['file_size'] # TODO clobber and tag mismatched if hash_mismatch or size_mismatch: # the local file hash doesn't match the dataset table cached hash # datasets.at[i, ['hash', 'file_size']] = new_hash, new_size # Raise warning if size changed or hash changed and wasn't empty if size_mismatch or (hash_mismatch and rec['hash']): _logger.warning('local md5 or size mismatch') else: files.append(None) if rec['exists'] != file.exists(): datasets.at[i, 'exists'] = not rec['exists'] if update_exists: self._cache['datasets'].loc[i, 'exists'] = rec['exists'] else: # TODO deal with clobber and exists here? files = self._download_datasets(datasets, update_cache=update_exists, clobber=clobber) return files def _index_type(self, table='sessions'): idx_0 = self._cache[table].index.values[0] if len(self._cache[table].index.names) == 2 and all(isinstance(x, int) for x in idx_0): return int elif len(self._cache[table].index.names) == 1 and isinstance(idx_0, str): return str else: raise IndexError
[docs] @util.refresh @util.parse_id def get_details(self, eid: Union[str, Path, UUID], full: bool = False): # Int ids return DataFrame, making str eid a list ensures Series not returned int_ids = self._index_type() is int idx = parquet.str2np(eid).tolist() if int_ids else [eid] try: det = self._cache['sessions'].loc[idx] assert len(det) == 1 except KeyError: raise alferr.ALFObjectNotFound(eid) except AssertionError: raise alferr.ALFMultipleObjectsFound(f'Multiple sessions in cache for eid {eid}') if not full: return det.iloc[0] # to_drop = 'eid' if int_ids else ['eid_0', 'eid_1'] # det = det.drop(to_drop, axis=1) column = ['eid_0', 'eid_1'] if int_ids else 'eid' return self._cache['datasets'].join(det, on=column, how='right')
[docs] @util.refresh def list_subjects(self) -> List[str]: """ List all subjects in database Returns ------- Sorted list of subject names """ return self._cache['sessions']['subject'].sort_values().unique()
[docs] @util.refresh def list_datasets(self, eid=None, collection=None, details=False, query_type=None) -> Union[np.ndarray, pd.DataFrame]: """ Given an eid, return the datasets for those sessions. If no eid is provided, a list of all datasets is returned. When details is false, a sorted array of unique datasets is returned (their relative paths). Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. collection : str The collection to which the object belongs, e.g. 'alf/probe01'. This is the relative path of the file from the session root. Supports asterisks as wildcards. details : bool When true, a pandas DataFrame is returned, otherwise a numpy array of relative paths (collection/revision/filename) - see one.alf.spec.describe for details. query_type : str Query cache ('local') or Alyx database ('remote') Returns ------- Slice of datasets table or numpy array if details is False """ datasets = self._cache['datasets'] filter_args = dict(collection=collection, wildcards=self.wildcards, revision_last_before=False, assert_unique=False) if not eid: datasets = util.filter_datasets(datasets, **filter_args) return datasets.copy() if details else datasets['rel_path'].unique() eid = self.to_eid(eid) # Ensure we have a UUID str list if not eid: return datasets.iloc[0:0] # Return empty if self._index_type() is int: eid_num = parquet.str2np(eid) index = ['eid_0', 'eid_1'] isin, _ = ismember2d(datasets[index].to_numpy(), eid_num) datasets = datasets[isin] else: session_match = datasets['eid'] == eid datasets = datasets[session_match] datasets = util.filter_datasets(datasets, **filter_args) # Return only the relative path return datasets if details else datasets['rel_path'].sort_values().values
[docs] @util.refresh def list_collections(self, eid=None, details=False) -> Union[np.ndarray, dict]: """ List the collections for a given experiment. If no experiment id is given, all collections are returned. Parameters ---------- eid : [str, UUID, Path, dict] Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path details : bool If true a dict of pandas datasets tables is returned with collections as keys, otherwise a numpy array of unique collections Returns ------- A numpy array of unique collections or dict of datasets tables """ datasets = self.list_datasets(eid, details=True).copy() datasets['collection'] = datasets.rel_path.apply( lambda x: rel_path_parts(x, assert_valid=False)[0] or '' ) if details: return {k: table.drop('collection', axis=1) for k, table in datasets.groupby('collection')} else: return np.sort(datasets['collection'].unique())
[docs] @util.refresh def list_revisions(self, eid=None, dataset=None, collection=None, details=False): """ List the revisions for a given experiment. If no experiment id is given, all collections are returned. Parameters ---------- eid : [str, UUID, Path, dict] Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path details : bool If true a dict of pandas datasets tables is returned with collections as keys, otherwise a numpy array of unique collections Returns ------- A numpy array of unique collections or dict of datasets tables """ datasets = self.list_datasets(eid, collection, details=True).copy() if dataset: match = datasets.rel_path.apply(lambda x: x.split('/')[-1]).str.match(dataset) datasets = datasets[match] datasets['revision'] = datasets.rel_path.apply( lambda x: (rel_path_parts(x, assert_valid=False)[1] or '').strip('#') ) if details: return {k: table.drop('revision', axis=1) for k, table in datasets.groupby('revision')} else: return np.sort(datasets['revision'].unique())
[docs] @util.refresh @util.parse_id def load_object(self, eid: Union[str, Path, UUID], obj: str, collection: Optional[str] = None, revision: Optional[str] = None, query_type: Optional[str] = None, download_only: bool = False, **kwargs) -> Union[alfio.AlfBunch, List[Path]]: """ Load all attributes of an ALF object from a Session ID and an object name. Any datasets with matching object name will be loaded. Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. obj : str The ALF object to load. Supports asterisks as wildcards. collection : str The collection to which the object belongs, e.g. 'alf/probe01'. This is the relative path of the file from the session root. Supports asterisks as wildcards. revision : str The dataset revision (typically an ISO date). If no exact match, the previous revision (ordered lexicographically) is returned. If None, the default revision is returned (usually the most recent revision). Regular expressions/wildcards not permitted. query_type : str Query cache ('local') or Alyx database ('remote') download_only : bool When true the data are downloaded and the file path is returned. kwargs : dict Additional filters for datasets, including namespace and timescale. For full list see the one.alf.spec.describe function. Returns ------- An ALF bunch or if download_only is True, a list of Paths objects Examples -------- load_object(eid, 'moves') load_object(eid, 'trials') load_object(eid, 'spikes', collection='*probe01') # wildcards is True load_object(eid, 'spikes', collection='.*probe01') # wildcards is False load_object(eid, 'spikes', namespace='ibl') load_object(eid, 'spikes', timescale='ephysClock') # Load specific attributes load_object(eid, 'spikes', attribute=['times*', 'clusters']) """ query_type = query_type or self.mode datasets = self.list_datasets(eid, details=True, query_type=query_type) if len(datasets) == 0: raise alferr.ALFObjectNotFound(obj) dataset = {'object': obj, **kwargs} datasets = util.filter_datasets(datasets, dataset, collection, revision, assert_unique=False, wildcards=self.wildcards) # Validate result before loading if len(datasets) == 0: raise alferr.ALFObjectNotFound(obj) parts = [rel_path_parts(x) for x in datasets.rel_path] unique_objects = set(x[3] or '' for x in parts) unique_collections = set(x[0] or '' for x in parts) if len(unique_objects) > 1: raise alferr.ALFMultipleObjectsFound(*unique_objects) if len(unique_collections) > 1: raise alferr.ALFMultipleCollectionsFound(*unique_collections) # For those that don't exist, download them offline = None if query_type == 'auto' else self.mode == 'local' files = self._update_filesystem(datasets, offline=offline) files = [x for x in files if x] if not files: raise alferr.ALFObjectNotFound(f'ALF object "{obj}" not found on disk') if download_only: return files return alfio.load_object(files, wildcards=self.wildcards, **kwargs)
[docs] @util.refresh @util.parse_id def load_dataset(self, eid: Union[str, Path, UUID], dataset: str, collection: Optional[str] = None, revision: Optional[str] = None, query_type: Optional[str] = None, download_only: bool = False, **kwargs) -> Any: """ Load a single dataset for a given session id and dataset name Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. dataset : str, dict The ALF dataset to load. May be a string or dict of ALF parts. Supports asterisks as wildcards. collection : str The collection to which the object belongs, e.g. 'alf/probe01'. This is the relative path of the file from the session root. Supports asterisks as wildcards. revision : str The dataset revision (typically an ISO date). If no exact match, the previous revision (ordered lexicographically) is returned. If None, the default revision is returned (usually the most recent revision). Regular expressions/wildcards not permitted. query_type : str Query cache ('local') or Alyx database ('remote') download_only : bool When true the data are downloaded and the file path is returned. Returns ------- Dataset or a Path object if download_only is true. Examples -------- intervals = one.load_dataset(eid, '_ibl_trials.intervals.npy') # Load dataset without specifying extension intervals = one.load_dataset(eid, 'trials.intervals') # wildcard mode only intervals = one.load_dataset(eid, '*trials.intervals*') # wildcard mode only filepath = one.load_dataset(eid '_ibl_trials.intervals.npy', download_only=True) spike_times = one.load_dataset(eid 'spikes.times.npy', collection='alf/probe01') old_spikes = one.load_dataset(eid, 'spikes.times.npy', collection='alf/probe01', revision='2020-08-31') """ datasets = self.list_datasets(eid, details=True, query_type=query_type or self.mode) # If only two parts and wildcards are on, append ext wildcard if self.wildcards and isinstance(dataset, str) and len(dataset.split('.')) == 2: dataset += '.*' _logger.info('Appending extension wildcard: ' + dataset) datasets = util.filter_datasets(datasets, dataset, collection, revision, wildcards=self.wildcards) if len(datasets) == 0: raise alferr.ALFObjectNotFound(f'Dataset "{dataset}" not found') # Check files exist / download remote files file, = self._update_filesystem(datasets, **kwargs) if not file: raise alferr.ALFObjectNotFound('Dataset not found') elif download_only: return file return alfio.load_file_content(file)
[docs] @util.refresh @util.parse_id def load_datasets(self, eid: Union[str, Path, UUID], datasets: List[str], collections: Optional[str] = None, revisions: Optional[str] = None, query_type: Optional[str] = None, assert_present=True, download_only: bool = False, **kwargs) -> Any: """ Load datasets for a given session id. Returns two lists the length of datasets. The first is the data (or file paths if download_data is false), the second is a list of meta data Bunches. If assert_present is false, missing data will be returned as None. Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. datasets : list of strings The ALF datasets to load. May be a string or dict of ALF parts. Supports asterisks as wildcards. collections : str, list The collection(s) to which the object(s) belong, e.g. 'alf/probe01'. This is the relative path of the file from the session root. Supports asterisks as wildcards. revisions : str, list The dataset revision (typically an ISO date). If no exact match, the previous revision (ordered lexicographically) is returned. If None, the default revision is returned (usually the most recent revision). Regular expressions/wildcards not permitted. query_type : str Query cache ('local') or Alyx database ('remote') assert_present : bool If true, missing datasets raises and error, otherwise None is returned download_only : bool When true the data are downloaded and the file path is returned. Returns ------- Returns a list of data (or file paths) the length of datasets, and a list of meta data Bunches. If assert_present is False, missing data will be None """ def _verify_specifiers(specifiers): """Ensure specifiers lists matching datasets length""" out = [] for spec in specifiers: if not spec or isinstance(spec, str): out.append([spec] * len(datasets)) elif len(spec) != len(datasets): raise ValueError( 'Collection and revision specifiers must match number of datasets') else: out.append(spec) return out if isinstance(datasets, str): raise TypeError('`datasets` must be a non-string iterable') # Check input args collections, revisions = _verify_specifiers([collections, revisions]) # Short circuit query_type = query_type or self.mode all_datasets = self.list_datasets(eid, details=True, query_type=query_type) if len(all_datasets) == 0: if assert_present: raise alferr.ALFObjectNotFound(f'No datasets found for session {eid}') else: _logger.warning(f'No datasets found for session {eid}') return None, all_datasets if len(datasets) == 0: return None, all_datasets.iloc[0:0] # Return empty # Filter and load missing if self.wildcards: # Append extension wildcard if 'object.attribute' string datasets = [x + ('.*' if isinstance(x, str) and len(x.split('.')) == 2 else '') for x in datasets] slices = [util.filter_datasets(all_datasets, x, y, z, wildcards=self.wildcards) for x, y, z in zip(datasets, collections, revisions)] present = [len(x) == 1 for x in slices] present_datasets = pd.concat(slices) if not all(present): missing_list = ', '.join(x for x, y in zip(datasets, present) if not y) # FIXME include collection and revision also message = f'The following datasets are not in the cache: {missing_list}' if assert_present: raise alferr.ALFObjectNotFound(message) else: _logger.warning(message) # Check files exist / download remote files files = self._update_filesystem(present_datasets, **kwargs) if any(x is None for x in files): missing_list = ', '.join(x for x, y in zip(present_datasets.rel_path, files) if not y) message = f'The following datasets were not downloaded: {missing_list}' raise alferr.ALFObjectNotFound(message) if assert_present else _logger.warning(message) # Make list of metadata Bunches out of the table records = (present_datasets .reset_index() .drop(['eid_0', 'eid_1'], axis=1) .to_dict('records', Bunch)) # Ensure result same length as input datasets list files = [None if not here else files.pop(0) for here in present] records = [None if not here else records.pop(0) for here in files] if download_only: return files, records return [alfio.load_file_content(x) for x in files], records
[docs] @util.refresh def load_dataset_from_id(self, dset_id: Union[str, UUID], download_only: bool = False, details: bool = False) -> Any: """ Load a dataset given a dataset UUID Parameters ---------- dset_id : uuid.UUID, str A dataset UUID to load download_only : bool If true the dataset is downloaded (if necessary) and the filepath returned details : bool If true a pandas Series is returned in addition to the data Returns ------- Dataset data (or filepath if download_only) and dataset record if details is True """ int_idx = self._index_type('datasets') is int if isinstance(dset_id, str) and int_idx: dset_id = parquet.str2np(dset_id) elif isinstance(dset_id, UUID): dset_id = parquet.uuid2np([dset_id]) if int_idx else str(dset_id) try: if int_idx: dataset = self._cache['datasets'].loc[dset_id.tolist()].iloc[0] else: dataset = self._cache['datasets'].loc[[dset_id]] assert isinstance(dataset, pd.Series) or len(dataset) == 1 except KeyError: raise alferr.ALFObjectNotFound('Dataset not found') except AssertionError: raise alferr.ALFMultipleObjectsFound('Duplicate dataset IDs') filepath, = self._update_filesystem(dataset) if not filepath: raise alferr.ALFObjectNotFound('Dataset not found') output = filepath if download_only else alfio.load_file_content(filepath) if details: return output, dataset else: return output
[docs] @staticmethod def setup(cache_dir, **kwargs): """ Interactive command tool that populates parameter file for ONE IBL. FIXME See subclass """ make_parquet_db(cache_dir, **kwargs) return One(cache_dir, mode='local')
[docs]@lru_cache(maxsize=1) def ONE(*, mode='auto', wildcards=True, **kwargs): """ONE API factory Determine which class to instantiate depending on parameters passed. Parameters ---------- mode : str Query mode, options include 'auto', 'local' (offline) and 'remote' (online only). Most methods have a `query_type` parameter that can override the class mode. wildcards : bool If true all mathods use unix shell style pattern matching, otherwise regular expressions are used. cache_dir : str, Path Path to the data files. If Alyx parameters have been set up for this location, an OneAlyx instance is returned. If data_dir and base_url are None, the default location is used. base_url : str An Alyx database URL. The URL must start with 'http'. username : str An Alyx database login username. password : str An Alyx database password. cache_rest : str If not in 'local' mode, this determines which http request types to cache. Default is 'GET'. Use None to deactivate cache (not recommended). Returns ------- An One instance if mode is 'local', otherwise an OneAlyx instance. """ if kwargs.pop('offline', False): _logger.warning('the offline kwarg will probably be removed. ' 'ONE is now offline by default anyway') warnings.warn('"offline" param will be removed; use mode="local"', DeprecationWarning) mode = 'local' if (any(x in kwargs for x in ('base_url', 'username', 'password')) or not kwargs.get('cache_dir', False)): return OneAlyx(mode=mode, wildcards=wildcards, **kwargs) # If cache dir was provided and corresponds to one configured with an Alyx client, use OneAlyx try: one.params._check_cache_conflict(kwargs.get('cache_dir')) return One(mode=mode, wildcards=wildcards, **kwargs) except AssertionError: # Cache dir corresponds to a Alyx repo, call OneAlyx return OneAlyx(mode=mode, wildcards=wildcards, **kwargs)
[docs]class OneAlyx(One): def __init__(self, username=None, password=None, base_url=None, cache_dir=None, mode='auto', wildcards=True, **kwargs): """An API for searching and loading data through the Alyx database Parameters ---------- mode : str Query mode, options include 'auto', 'local' (offline) and 'remote' (online only). Most methods have a `query_type` parameter that can override the class mode. wildcards : bool If true, methods allow unix shell style pattern matching, otherwise regular expressions are supported cache_dir : str, Path Path to the data files. If Alyx parameters have been set up for this location, an OneAlyx instance is returned. If data_dir and base_url are None, the default location is used. base_url : str An Alyx database URL. The URL must start with 'http'. username : str An Alyx database login username. password : str An Alyx database password. cache_rest : str If not in 'local' mode, this determines which http request types to cache. Default is 'GET'. Use None to deactivate cache (not recommended). """ # Load Alyx Web client self._web_client = wc.AlyxClient(username=username, password=password, base_url=base_url, cache_dir=cache_dir, **kwargs) self._search_endpoint = 'sessions' # get parameters override if inputs provided super(OneAlyx, self).__init__(mode=mode, wildcards=wildcards, cache_dir=cache_dir) def __repr__(self): return f'One ({"off" if self.offline else "on"}line, {self.alyx.base_url})' def _load_cache(self, cache_dir=None, clobber=False): cache_meta = self._cache['_meta'] if not clobber: super(OneAlyx, self)._load_cache(self._cache_dir) # Load any present cache if (self._cache and not cache_meta['expired']) or self.mode == 'local': return # Warn user if expired if ( cache_meta['expired'] and cache_meta.get('created_time', False) and not self.alyx.silent ): age = datetime.now() - cache_meta['created_time'] t_str = (f'{age.days} days(s)' if age.days >= 1 else f'{np.floor(age.seconds / (60 * 2))} hour(s)') _logger.info(f'cache over {t_str} old') try: # Determine whether a newer cache is available cache_info = self.alyx.get('cache/info', expires=True) remote_created = datetime.fromisoformat(cache_info['date_created']) local_created = cache_meta.get('created_time', None) if local_created and (remote_created - local_created) < timedelta(minutes=1): _logger.info('No newer cache available') return # Download the remote cache files _logger.info('Downloading remote caches...') files = self.alyx.download_cache_tables() assert any(files) super(OneAlyx, self)._load_cache(self._cache_dir) # Reload cache after download except requests.exceptions.HTTPError: _logger.error('Failed to load the remote cache file') self.mode = 'remote' except (ConnectionError, requests.exceptions.ConnectionError): _logger.error('Failed to connect to Alyx') self.mode = 'local' @property def alyx(self): return self._web_client @property def _cache_dir(self): return self._web_client.cache_dir
[docs] @util.refresh def search_terms(self, query_type=None): """ Returns a list of search terms to be passed as kwargs to the search method Parameters ---------- query_type : str If 'remote', the search terms are largely determined by the REST endpoint used Returns ------- Tuple of search strings """ if (query_type or self.mode) != 'remote': return self._search_terms # Return search terms from REST schema fields = self.alyx.rest_schemes[self._search_endpoint]['list']['fields'] excl = ('lab',) # 'laboratory' already in search terms return tuple({*self._search_terms, *(x['name'] for x in fields if x['name'] not in excl)})
[docs] def describe_dataset(self, dataset_type=None): # TODO Move to AlyxClient?; add to rest examples; rename to describe? if not dataset_type: return self.alyx.rest('dataset-types', 'list') try: assert isinstance(dataset_type, str) and not is_uuid_string(dataset_type) _logger.disabled = True out = self.alyx.rest('dataset-types', 'read', dataset_type) except (AssertionError, requests.exceptions.HTTPError): # Try to get dataset type from dataset name out = self.alyx.rest('dataset-types', 'read', self.dataset2type(dataset_type)) finally: _logger.disabled = False print(out['description']) return out
[docs] @util.refresh def list_datasets(self, eid=None, collection=None, details=False, query_type=None) -> Union[np.ndarray, pd.DataFrame]: if (query_type or self.mode) != 'remote': return super().list_datasets(eid=eid, collection=collection, details=details, query_type=query_type) elif not eid: warnings.warn('Unable to list all remote datasets') return super().list_datasets(collection=collection, details=details, query_type=query_type) eid = self.to_eid(eid) # Ensure we have a UUID str list if not eid: return self._cache['datasets'].iloc[0:0] # Return empty _, datasets = util.ses2records(self.alyx.rest('sessions', 'read', id=eid)) # Return only the relative path return datasets if details else datasets['rel_path'].sort_values().values
[docs] @util.refresh def load_collection(self, eid, collection): raise NotImplementedError()
[docs] @util.refresh def pid2eid(self, pid: str, query_type=None) -> (str, str): """ Given an Alyx probe UUID string, returns the session id string and the probe label (i.e. the ALF collection) Parameters ---------- pid : str, uuid.UUID A probe UUID query_type : str Query mode - options include 'remote', and 'refresh' Returns ------- experiment ID, probe label """ query_type = query_type or self.mode if query_type != 'remote': self.refresh_cache(query_type) if query_type == 'local' and 'insertions' not in self._cache.keys(): raise NotImplementedError('Converting probe IDs required remote connection') rec = self.alyx.rest('insertions', 'read', id=str(pid)) return rec['session'], rec['name']
[docs] def search(self, details=False, query_type=None, **kwargs): """ Searches sessions matching the given criteria and returns a list of matching eids For a list of search terms, use the method one.search_terms(query_type='remote') For all of the search parameters, a single value or list may be provided. For dataset, the sessions returned will contain all listed datasets. For the other parameters, the session must contain at least one of the entries. NB: Wildcards are not permitted, however if wildcards property is False, regular expressions may be used for all but number and date_range. Parameters ---------- dataset : str, list list of dataset names. Returns sessions containing all these datasets. A dataset matches if it contains the search string e.g. 'wheel.position' matches '_ibl_wheel.position.npy' date_range : str, list, datetime.datetime, datetime.date, pandas.timestamp A single date to search or a list of 2 dates that define the range (inclusive). To define only the upper or lower date bound, set the other element to None. lab : str, list A str or list of lab names, returns sessions from any of these labs number : str, int Number of session to be returned, i.e. number in sequence for a given date subject : str, list A list of subject nicknames, returns sessions for any of these subjects task_protocol : str, list The task protocol name (can be partial, i.e. any task protocol containing that str will be found) project : str, list The project name (can be partial, i.e. any task protocol containing that str will be found) performance_lte / performance_gte : float search only for sessions whose performance is less equal or greater equal than a pre-defined threshold as a percentage (0-100) users : str, list A list of users location : str, list a str or list of lab location (as per Alyx definition) name Note: this corresponds to the specific rig, not the lab geographical location per se dataset_types : str, list One or more of dataset_types details : bool If true also returns a dict of dataset details query_type : str, None Query cache ('local') or Alyx database ('remote') limit : int The number of results to fetch in one go (if pagination enabled on server) Returns ------- List of eids and, if details is True, also returns a list of dictionaries, each entry corresponding to a matching session """ query_type = query_type or self.mode if query_type != 'remote': return super(OneAlyx, self).search(details=details, query_type=query_type, **kwargs) # loop over input arguments and build the url search_terms = self.search_terms(query_type=query_type) params = {'django': kwargs.pop('django', '')} for key, value in sorted(kwargs.items()): field = util.autocomplete(key, search_terms) # Validate and get full name # check that the input matches one of the defined filters if field == 'date_range': params[field] = [x.date().isoformat() for x in util.validate_date_range(value)] elif field == 'dataset': query = ('data_dataset_session_related__dataset_type__name__icontains,' + ','.join(util.ensure_list(value))) params['django'] += (',' if params['django'] else '') + query elif field == 'laboratory': params['lab'] = value else: params[field] = value # Make GET request ses = self.alyx.rest(self._search_endpoint, 'list', **params) # LazyId only transforms records when indexed eids = util.LazyId(ses) return (eids, ses) if details else eids
def _download_dataset(self, dset, cache_dir=None, update_cache=True, **kwargs): """ Download a dataset from an Alyx REST dictionary Parameters ---------- dset : dict, str A single dataset dictionary from an Alyx REST query OR URL string cache_dir : str, pathlib.Path The root directory to save the data to (default taken from ONE parameters) update_cache : bool If true, the cache is updated when filesystem discrepancies are encountered Returns ------- A local file path """ if isinstance(dset, str) and dset.startswith('http'): url = dset elif isinstance(dset, (str, Path)): url = self.path2url(dset) if not url: _logger.warning('Dataset not found in cache') return else: if 'data_url' in dset: # data_dataset_session_related dict url = dset['data_url'] did = dset['id'] elif 'file_records' not in dset: # Convert dataset Series to alyx dataset dict url = self.record2url(dset) did = dset.index else: # from datasets endpoint url = next((fr['data_url'] for fr in dset['file_records'] if fr['data_url'] and fr['exists']), None) did = dset['url'][-36:] if not url: dset_str = f"{dset['session'][-36:]}: {dset['collection']} / {dset['name']}" _logger.warning(f"{dset_str} Dataset not found") if update_cache: if isinstance(did, str) and self._index_type('datasets') is int: did = parquet.str2np(did) elif self._index_type('datasets') is str and not isinstance(did, str): did = parquet.np2str(did) try: self._cache['datasets'].loc[did, 'exists'] = False except KeyError as e: _logger.warning(f"{dset_str} couldn't update exist status in cache. {e}") return target_dir = Path(cache_dir or self._cache_dir, get_alf_path(url)).parent return self._download_file(url=url, target_dir=target_dir, **kwargs) def _tag_mismatched_file_record(self, url): fr = self.alyx.rest('files', 'list', django=f'dataset,{Path(url).name.split(".")[-2]},' f'data_repository__globus_is_personal,False', no_cache=True) if len(fr) > 0: json_field = fr[0]['json'] if json_field is None: json_field = {'mismatch_hash': True} else: json_field.update({'mismatch_hash': True}) self.alyx.rest('files', 'partial_update', id=fr[0]['url'][-36:], data={'json': json_field}) def _download_file(self, url, target_dir, clobber=False, offline=None, keep_uuid=False, file_size=None, hash=None): """ Downloads a single file from an HTTP webserver. The webserver in question is set by the AlyxClient object. Parameters ---------- url : str An absolute or relative URL for a remote dataset target_dir : str, pathlib.Path The root directory to download file to clobber : bool If true, overwrites local dataset if any offline : bool, None If true, the file path is returned only if the file exists. No download will take place keep_uuid : bool If true, the UUID is not removed from the file name (default is False) file_size : int The expected file size to compare with downloaded file hash : str The expected file hash to compare with downloaded file Returns ------- The file path of the downloaded file """ if offline is None: offline = self.mode == 'local' Path(target_dir).mkdir(parents=True, exist_ok=True) local_path = target_dir / os.path.basename(url) if not keep_uuid: local_path = alfio.remove_uuid_file(local_path, dry=True) if Path(local_path).exists(): # the local file hash doesn't match the dataset table cached hash hash_mismatch = hash and hashfile.md5(Path(local_path)) != hash file_size_mismatch = file_size and Path(local_path).stat().st_size != file_size if (hash_mismatch or file_size_mismatch) and not offline: clobber = True if not self.alyx.silent: _logger.warning(f'local md5 or size mismatch, re-downloading {local_path}') # if there is no cached file, download else: clobber = True if clobber and not offline: local_path, md5 = self.alyx.download_file( url, cache_dir=str(target_dir), clobber=clobber, return_md5=True) # TODO If 404 update JSON on Alyx for data record # post download, if there is a mismatch between Alyx and the newly downloaded file size # or hash flag the offending file record in Alyx for database maintenance hash_mismatch = hash and md5 != hash file_size_mismatch = file_size and Path(local_path).stat().st_size != file_size if hash_mismatch or file_size_mismatch: self._tag_mismatched_file_record(url) # TODO Update cache here if keep_uuid: return local_path else: return alfio.remove_uuid_file(local_path)
[docs] @staticmethod def setup(**kwargs): """ TODO Interactive command tool that sets up cache for ONE. """ root_dir = input('Select a directory from which to build cache') if root_dir: print('Building ONE cache from filesystem...') from one.alf import onepqt onepqt.make_parquet_db(root_dir, **kwargs) return One(cache_dir=root_dir)
[docs] @util.refresh @util.parse_id def eid2path(self, eid: str, query_type=None) -> util.Listable(Path): """ From an experiment ID gets the local session path Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. query_type : str If set to 'remote', will force database connection Returns ------- A session path or list of session paths """ # first try avoid hitting the database mode = query_type or self.mode if mode != 'remote': cache_path = super().eid2path(eid) if cache_path or mode == 'local': return cache_path # if it wasn't successful, query Alyx ses = self.alyx.rest('sessions', 'list', django=f'pk,{eid}') if len(ses) == 0: return None else: return Path(self._cache_dir).joinpath( ses[0]['lab'], 'Subjects', ses[0]['subject'], ses[0]['start_time'][:10], str(ses[0]['number']).zfill(3))
[docs] @util.refresh def path2eid(self, path_obj: Union[str, Path], query_type=None) -> util.Listable(Path): """ From a local path, gets the experiment ID Parameters ---------- path_obj : str, pathlib.Path, list Local path or list of local paths query_type : str If set to 'remote', will force database connection Returns ------- An eid or list of eids """ # If path_obj is a list recurse through it and return a list if isinstance(path_obj, list): path_obj = [Path(x) for x in path_obj] eid_list = [] unwrapped = unwrap(self.path2eid) for p in path_obj: eid_list.append(unwrapped(self, p)) return eid_list # else ensure the path ends with mouse,date, number path_obj = Path(path_obj) # try the cached info to possibly avoid hitting database mode = query_type or self.mode if mode != 'remote': cache_eid = super().path2eid(path_obj) if cache_eid or mode == 'local': return cache_eid session_path = get_session_path(path_obj) # if path does not have a date and a number return None if session_path is None: return None # if not search for subj, date, number XXX: hits the DB search = unwrap(self.search) uuid = search(subject=session_path.parts[-3], date_range=session_path.parts[-2], number=session_path.parts[-1], query_type='remote') # Return the uuid if any return uuid[0] if uuid else None
[docs] @util.refresh def path2url(self, filepath, query_type=None): """ Given a local file path, returns the URL of the remote file. Parameters ---------- filepath : str, pathlib.Path A local file path query_type : str If set to 'remote', will force database connection Returns ------- A URL string """ query_type = query_type or self.mode if query_type != 'remote': return super(OneAlyx, self).path2url(filepath) eid = self.path2eid(filepath) try: dataset, = self.alyx.rest('datasets', 'list', session=eid, name=Path(filepath).name) return next( r['data_url'] for r in dataset['file_records'] if r['data_url'] and r['exists']) except (ValueError, StopIteration): raise alferr.ALFObjectNotFound(f'File record for {filepath} not found on Alyx')
[docs] @util.parse_id def type2datasets(self, eid, dataset_type, details=False): """ Get list of datasets belonging to a given dataset type for a given session Parameters ---------- eid : str, UUID, pathlib.Path, dict Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path. dataset_type : str, list An Alyx dataset type, e.g. camera.times or a list of dtypes details : bool If True, a datasets DataFrame is returned Returns ------- A numpy array of data, or DataFrame if details is true """ if isinstance(dataset_type, str): restriction = f'session__id,{eid},dataset_type__name,{dataset_type}' elif isinstance(dataset_type, collections.abc.Sequence): restriction = f'session__id,{eid},dataset_type__name__in,{dataset_type}' else: raise TypeError('dataset_type must be a str or str list') datasets = util.datasets2records(self.alyx.rest('datasets', 'list', django=restriction)) return datasets if details else datasets['rel_path'].sort_values().values
[docs] def dataset2type(self, dset): """Return dataset type from dataset""" # Ensure dset is a str uuid if isinstance(dset, str) and not is_uuid_string(dset): dset = self._dataset_name2id(dset) if isinstance(dset, np.ndarray): dset = parquet.np2str(dset)[0] if isinstance(dset, tuple) and all(isinstance(x, int) for x in dset): dset = parquet.np2str(np.array(dset)) if not is_uuid_string(dset): raise ValueError('Unrecognized name or UUID') return self.alyx.rest('datasets', 'read', id=dset)['dataset_type']
[docs] def describe_revision(self, revision): raise NotImplementedError('Requires changes to revisions endpoint') rec = self.alyx.rest('revisions', 'list', name=revision) # py 3.8 # if rec := self.alyx.rest('revisions', 'list', name=revision): # py 3.8 if rec: print(rec[0]['description']) else: print(f'Revision "{revision}" not found')
def _dataset_name2id(self, dset_name, eid=None): # TODO finish function datasets = self.list_datasets(eid) if eid else self._cache['datasets'] # Get ID of fist matching dset for idx, rel_path in datasets['rel_path'].items(): if rel_path.endswith(dset_name): return idx raise ValueError(f'Dataset {dset_name} not found in cache')
[docs] @util.refresh @util.parse_id def get_details(self, eid: str, full: bool = False, query_type=None): """ Returns details of eid like from one.search, optional return full session details. """ if (query_type or self.mode) == 'local': return super().get_details(eid, full=full) # If eid is a list of eIDs recurse through list and return the results if isinstance(eid, list): details_list = [] for p in eid: details_list.append(self.get_details(p, full=full)) return details_list # If not valid return None if not is_uuid_string(eid): print(eid, " is not a valid eID/UUID string") return # load all details dets = self.alyx.rest("sessions", "read", eid) if full: return dets # If it's not full return the normal output like from a one.search det_fields = ["subject", "start_time", "number", "lab", "project", "url", "task_protocol", "local_path"] out = {k: v for k, v in dets.items() if k in det_fields} out.update({'local_path': self.eid2path(eid)}) return out
# def _update_cache(self, ses, dataset_types): # """ # TODO move to One; currently unused # :param ses: session details dictionary as per Alyx response # :param dataset_types: # :return: is_updated (bool): if the cache was updated or not # """ # save = False # pqt_dsets = _ses2pandas(ses, dtypes=dataset_types) # # if the dataframe is empty, return # if pqt_dsets.size == 0: # return # # if the cache is empty create the cache variable # elif self._cache.size == 0: # self._cache = pqt_dsets # save = True # # the cache is not empty and there are datasets in the query # else: # isin, icache = ismember2d(pqt_dsets[['id_0', 'id_1']].to_numpy(), # self._cache[['id_0', 'id_1']].to_numpy()) # # check if the hash / filesize fields have changed on patching # heq = (self._cache['hash'].iloc[icache].to_numpy() == # pqt_dsets['hash'].iloc[isin].to_numpy()) # feq = np.isclose(self._cache['file_size'].iloc[icache].to_numpy(), # pqt_dsets['file_size'].iloc[isin].to_numpy(), # rtol=0, atol=0, equal_nan=True) # eq = np.logical_and(heq, feq) # # update new hash / filesizes # if not np.all(eq): # self._cache.iloc[icache, 4:6] = pqt_dsets.iloc[np.where(isin)[0], 4:6].to_numpy() # save = True # # append datasets that haven't been found # if not np.all(isin): # self._cache = self._cache.append(pqt_dsets.iloc[np.where(~isin)[0]]) # self._cache = self._cache.reindex() # save = True # if save: # # before saving makes sure pandas did not cast uuids in float # typs = [t for t, k in zip(self._cache.dtypes, self._cache.keys()) if 'id_' in k] # assert (all(map(lambda t: t == np.int64, typs))) # # if this gets too big, look into saving only when destroying the ONE object # parquet.save(self._cache_file, self._cache)