Source code for oneibl.one

import abc
import concurrent.futures
import json
import logging
import os
import fnmatch
import re
from functools import wraps
from pathlib import Path, PurePath
import shutil
from typing import Any, Sequence, Union, Optional, List, Dict
from uuid import UUID

import requests
import tqdm
import pandas as pd
import numpy as np

import oneibl.params
import oneibl.webclient as wc
import alf.io as alfio
from alf.files import is_valid, alf_parts
# from ibllib.misc.exp_ref import is_exp_ref
from ibllib.exceptions import \
    ALFMultipleObjectsFound, ALFObjectNotFound, ALFMultipleCollectionsFound
from ibllib.io import hashfile, spikeglx
from ibllib.misc import pprint
from oneibl.dataclass import SessionDataInfo
from brainbox.io import parquet
from brainbox.numerical import ismember, ismember2d, find_first_2d

_logger = logging.getLogger('ibllib')


[docs]def Listable(t): return Union[t, Sequence[t]] # noqa
NTHREADS = 4 # number of download threads _ENDPOINTS = { # keynames are possible input arguments and values are actual endpoints 'data': 'dataset-types', 'dataset': 'datasets', 'datasets': 'datasets', 'dataset-types': 'dataset-types', 'dataset_types': 'dataset-types', 'dataset-type': 'dataset-types', 'dataset_type': 'dataset-types', 'dtypes': 'dataset-types', 'dtype': 'dataset-types', 'users': 'users', 'user': 'users', 'subject': 'subjects', 'subjects': 'subjects', 'labs': 'labs', 'lab': 'labs'} _SESSION_FIELDS = { # keynames are possible input arguments and values are actual fields 'subjects': 'subject', 'subject': 'subject', 'user': 'users', 'users': 'users', 'lab': 'lab', 'labs': 'lab', 'type': 'type', 'start_time': 'start_time', 'start-time': 'start_time', 'end_time': 'end_time', 'end-time': 'end_time'} SEARCH_TERMS = { # keynames are possible input arguments and values are actual fields 'data': 'dataset_types', 'dataset': 'dataset_types', 'datasets': 'dataset_types', 'dataset-types': 'dataset_types', 'dataset_types': 'dataset_types', 'users': 'users', 'user': 'users', 'subject': 'subject', 'subjects': 'subject', 'date_range': 'date_range', 'date-range': 'date_range', 'date': 'date_range', 'labs': 'lab', 'lab': 'lab', 'task': 'task_protocol', 'task_protocol': 'task_protocol', 'number': 'number', 'location': 'location', 'lab_location': 'location', 'performance_lte': 'performance_lte', 'performance_gte': 'performance_gte', 'project': 'project', } def _ses2pandas(ses, dtypes=None): """ :param ses: session dictionary from rest endpoint :param dtypes: list of dataset types :return: """ # selection: get relevant dtypes only if there is an url associated rec = list(filter(lambda x: x['url'], ses['data_dataset_session_related'])) if dtypes == ['__all__'] or dtypes == '__all__': dtypes = None if dtypes is not None: rec = list(filter(lambda x: x['dataset_type'] in dtypes, rec)) include = ['id', 'hash', 'dataset_type', 'name', 'file_size', 'collection'] uuid_fields = ['id', 'eid'] join = {'subject': ses['subject'], 'lab': ses['lab'], 'eid': ses['url'][-36:], 'start_time': np.datetime64(ses['start_time']), 'number': ses['number'], 'task_protocol': ses['task_protocol']} col = parquet.rec2col(rec, include=include, uuid_fields=uuid_fields, join=join, types={'file_size': np.double}).to_df() return col
[docs]def parse_id(method): """ Ensures the input experiment identifier is an experiment UUID string :param method: An ONE method whose second arg is an experiment id :return: A wrapper function that parses the id to the expected string """ @wraps(method) def wrapper(self, id, *args, **kwargs): id = self.to_eid(id) return method(self, id, *args, **kwargs) return wrapper
[docs]class OneAbstract(abc.ABC): def __init__(self, username=None, password=None, base_url=None, cache_dir=None, silent=None): # get parameters override if inputs provided self._par = oneibl.params.get(silent=silent) # can delete those 2 lines from mid January 2021 if self._par.HTTP_DATA_SERVER == 'http://ibl.flatironinstitute.org': self._par = self._par.set("HTTP_DATA_SERVER", "https://ibl.flatironinstitute.org") self._par = self._par.set('ALYX_LOGIN', username or self._par.ALYX_LOGIN) self._par = self._par.set('ALYX_URL', base_url or self._par.ALYX_URL) self._par = self._par.set('ALYX_PWD', password or self._par.ALYX_PWD) self._par = self._par.set('CACHE_DIR', cache_dir or self._par.CACHE_DIR) # init the cache file self._cache_file = Path(self._par.CACHE_DIR).joinpath('.one_cache.parquet') if self._cache_file.exists(): # we need to keep this part fast enough for transient objects self._cache = parquet.load(self._cache_file) else: self._cache = pd.DataFrame() def _load(self, eid, dataset_types=None, dclass_output=False, download_only=False, offline=False, **kwargs): """ From a Session ID and dataset types, queries Alyx database, downloads the data from Globus, and loads into numpy array. Single session only """ if alfio.is_uuid_string(eid): eid = '/sessions/' + eid eid_str = eid[-36:] # if no dataset_type is provided: # a) force the output to be a dictionary that provides context to the data # b) download all types that have a data url specified whithin the alf folder dataset_types = [dataset_types] if isinstance(dataset_types, str) else dataset_types if not dataset_types or dataset_types == ['__all__']: dclass_output = True if offline: dc = self._make_dataclass_offline(eid_str, dataset_types, **kwargs) else: dc = self._make_dataclass(eid_str, dataset_types, **kwargs) # load the files content in variables if requested if not download_only: for ind, fil in enumerate(dc.local_path): dc.data[ind] = alfio.load_file_content(fil) # parse output arguments if dclass_output: return dc # if required, parse the output as a list that matches dataset_types requested list_out = [] for dt in dataset_types: if dt not in dc.dataset_type: _logger.warning('dataset ' + dt + ' not found for session: ' + eid_str) list_out.append(None) continue for i, x, in enumerate(dc.dataset_type): if dt == x: if dc.data[i] is not None: list_out.append(dc.data[i]) else: list_out.append(dc.local_path[i]) return list_out def _get_cache_dir(self, cache_dir): if not cache_dir: cache_dir = self._par.CACHE_DIR # if empty in parameter file, do not allow and set default if not cache_dir: cache_dir = str(PurePath(Path.home(), "Downloads", "FlatIron")) assert cache_dir return cache_dir def _make_dataclass_offline(self, eid, dataset_types=None, cache_dir=None, **kwargs): if self._cache.size == 0: return SessionDataInfo() # select the session npeid = parquet.str2np(eid)[0] df = self._cache[self._cache['eid_0'] == npeid[0]] df = df[df['eid_1'] == npeid[1]] # select datasets df = df[ismember(df['dataset_type'], dataset_types)[0]] return SessionDataInfo.from_pandas(df, self._get_cache_dir(cache_dir))
[docs] def path_from_eid(self, eid: str) -> Optional[Listable(Path)]: """ From an experiment id or a list of experiment ids, gets the local cache path :param eid: eid (UUID) or list of UUIDs :return: eid or list of eids """ # If eid is a list of eIDs recurse through list and return the results if isinstance(eid, list): path_list = [] for p in eid: path_list.append(self.path_from_eid(p)) return path_list # If not valid return None if not alfio.is_uuid_string(eid): print(eid, " is not a valid eID/UUID string") return if self._cache.size == 0: return # load path from cache ic = find_first_2d( self._cache[['eid_0', 'eid_1']].to_numpy(), parquet.str2np(eid)) if ic is not None: ses = self._cache.iloc[ic] return Path(self._par.CACHE_DIR).joinpath( ses['lab'], 'Subjects', ses['subject'], ses['start_time'].isoformat()[:10], str(ses['number']).zfill(3))
[docs] def eid_from_path(self, path_obj): """ From a local path, gets the experiment id :param path_obj: local path or list of local paths :return: eid or list of eids """ # If path_obj is a list recurse through it and return a list if isinstance(path_obj, list): path_obj = [Path(x) for x in path_obj] eid_list = [] for p in path_obj: eid_list.append(self.eid_from_path(p)) return eid_list # else ensure the path ends with mouse,date, number path_obj = Path(path_obj) session_path = alfio.get_session_path(path_obj) # if path does not have a date and a number, or cache is empty return None if session_path is None or self._cache.size == 0: return None # fetch eid from cache ind = ((self._cache['subject'] == session_path.parts[-3]) & (self._cache['start_time'].apply( lambda x: x.isoformat()[:10] == session_path.parts[-2])) & (self._cache['number']) == int(session_path.parts[-1])) ind = np.where(ind.to_numpy())[0] if ind.size > 0: return parquet.np2str(self._cache[['eid_0', 'eid_1']].iloc[ind[0]])
@abc.abstractmethod def _make_dataclass(self, eid, dataset_types=None, cache_dir=None, **kwargs): pass
[docs] @abc.abstractmethod def load(self, **kwargs): pass
[docs] @abc.abstractmethod def list(self, **kwargs): pass
[docs] @abc.abstractmethod def search(self, **kwargs): pass
[docs]def ONE(offline=False, **kwargs): if offline: return OneOffline(**kwargs) else: return OneAlyx(**kwargs)
[docs]class OneOffline(OneAbstract): def _make_dataclass(self, *args, **kwargs): return self._make_dataclass_offline(*args, **kwargs)
[docs] def load(self, eid, **kwargs): return self._load(eid, **kwargs)
[docs] def list(self, **kwargs): pass
[docs] def search(self, **kwargs): pass
[docs]class OneAlyx(OneAbstract): def __init__(self, **kwargs): # get parameters override if inputs provided super(OneAlyx, self).__init__(**kwargs) try: self._alyxClient = wc.AlyxClient(username=self._par.ALYX_LOGIN, password=self._par.ALYX_PWD, base_url=self._par.ALYX_URL) # Display output when instantiating ONE print(f"Connected to {self._par.ALYX_URL} as {self._par.ALYX_LOGIN}", ) except requests.exceptions.ConnectionError: raise ConnectionError( f"Can't connect to {self._par.ALYX_URL}.\n" + "IP addresses are filtered on IBL database servers. \n" + "Are you connecting from an IBL participating institution ?" ) @property def alyx(self): return self._alyxClient
[docs] def help(self, dataset_type=None): if not dataset_type: return self.alyx.rest('dataset-types', 'list') if isinstance(dataset_type, list): for dt in dataset_type: self.help(dataset_type=dt) return if not isinstance(dataset_type, str): print('No dataset_type provided or wrong type. Should be str') return out = self.alyx.rest('dataset-types', 'read', dataset_type) print(out['description'])
[docs] def list(self, eid: Optional[Union[str, Path, UUID]] = None, details=False ) -> Union[List, Dict[str, str]]: """ From a Session ID, queries Alyx database for datasets related to a session. :param eid: Experiment session uuid str :type eid: str :param details: If false returns a list of path, otherwise returns the REST dictionary :type eid: bool :return: list of strings or dict of lists if details is True :rtype: list, dict """ if not eid: return [x['name'] for x in self.alyx.rest('dataset-types', 'list')] # Session specific list dsets = self.alyx.rest('datasets', 'list', session=eid, exists=True) if not details: dsets = sorted([Path(dset['collection']).joinpath(dset['name']) for dset in dsets]) return dsets
[docs] @parse_id def load(self, eid, dataset_types=None, dclass_output=False, dry_run=False, cache_dir=None, download_only=False, clobber=False, offline=False, keep_uuid=False): """ From a Session ID and dataset types, queries Alyx database, downloads the data from Globus, and loads into numpy array. :param eid: Experiment ID, for IBL this is the UUID of the Session as per Alyx database. Could be a full Alyx URL: 'http://localhost:8000/sessions/698361f6-b7d0-447d-a25d-42afdef7a0da' or only the UUID: '698361f6-b7d0-447d-a25d-42afdef7a0da'. Can also be a list of the above for multiple eids. :type eid: str :param dataset_types: [None]: Alyx dataset types to be returned. :type dataset_types: list :param dclass_output: [False]: forces the output as dataclass to provide context. :type dclass_output: bool If None or an empty dataset_type is specified, the output will be a dictionary by default. :param cache_dir: temporarly overrides the cache_dir from the parameter file :type cache_dir: str :param download_only: do not attempt to load data in memory, just download the files :type download_only: bool :param clobber: force downloading even if files exists locally :type clobber: bool :param keep_uuid: keeps the UUID at the end of the filename (defaults to False) :type keep_uuid: bool :return: List of numpy arrays matching the size of dataset_types parameter, OR a dataclass containing arrays and context data. :rtype: list, dict, dataclass SessionDataInfo """ # this is a wrapping function to keep signature and docstring accessible for IDE's return self._load_recursive(eid, dataset_types=dataset_types, dclass_output=dclass_output, dry_run=dry_run, cache_dir=cache_dir, keep_uuid=keep_uuid, download_only=download_only, clobber=clobber, offline=offline)
[docs] @parse_id def load_dataset(self, eid: Union[str, Path, UUID], dataset: str, collection: Optional[str] = None, download_only: bool = False) -> Any: """ Load a single dataset from a Session ID and a dataset type. :param eid: Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path :param dataset: The ALF dataset to load. Supports asterisks as wildcards. :param collection: The collection to which the object belongs, e.g. 'alf/probe01'. For IBL this is the relative path of the file from the session root. Supports asterisks as wildcards. :param download_only: When true the data are downloaded and the file path is returned :return: dataset or a Path object if download_only is true Examples: intervals = one.load_dataset(eid, '_ibl_trials.intervals.npy') intervals = one.load_dataset(eid, '*trials.intervals*') filepath = one.load_dataset(eid '_ibl_trials.intervals.npy', download_only=True) spikes = one.load_dataset(eid 'spikes.times.npy', collection='alf/probe01') """ search_str = 'name__regex,' + dataset.replace('.', r'\.').replace('*', '.*') if collection: search_str += ',collection__regex,' + collection.replace('*', '.*') results = self.alyx.rest('datasets', 'list', session=eid, django=search_str, exists=True) # Get filenames of returned ALF files collection_set = {x['collection'] for x in results} if len(collection_set) > 1: raise ALFMultipleCollectionsFound('Matching dataset belongs to multiple collections:' + ', '.join(collection_set)) if len(results) > 1: raise ALFMultipleObjectsFound('The following matching datasets were found: ' + ', '.join(x['name'] for x in results)) if len(results) == 0: raise ALFObjectNotFound(f'Dataset "{dataset}" not found on Alyx') filename = self.download_dataset(results[0]) assert filename is not None, 'failed to download dataset' return filename if download_only else alfio.load_file_content(filename)
[docs] @parse_id def load_object(self, eid: Union[str, Path, UUID], obj: str, collection: Optional[str] = 'alf', download_only: bool = False, **kwargs) -> Union[alfio.AlfBunch, List[Path]]: """ Load all attributes of an ALF object from a Session ID and an object name. :param eid: Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path :param obj: The ALF object to load. Supports asterisks as wildcards. :param collection: The collection to which the object belongs, e.g. 'alf/probe01'. Supports asterisks as wildcards. :param download_only: When true the data are downloaded and the file paths are returned :param kwargs: Optional filters for the ALF objects, including namespace and timescale :return: An ALF bunch or if download_only is True, a list of Paths objects Examples: load_object(eid, '*moves') load_object(eid, 'trials') load_object(eid, 'spikes', collection='*probe01') """ # Filter server-side by collection and dataset name search_str = 'name__regex,' + obj.replace('*', '.*') if collection and collection != 'all': search_str += ',collection__regex,' + collection.replace('*', '.*') results = self.alyx.rest('datasets', 'list', exists=True, session=eid, django=search_str) pattern = re.compile(fnmatch.translate(obj)) # Further refine by matching object part of ALF datasets def match(r): return is_valid(r['name']) and pattern.match(alf_parts(r['name'])[1]) # Get filenames of returned ALF files returned_obj = {alf_parts(x['name'])[1] for x in results if match(x)} # Validate result before loading if len(returned_obj) > 1: raise ALFMultipleObjectsFound('The following matching objects were found: ' + ', '.join(returned_obj)) elif len(returned_obj) == 0: raise ALFObjectNotFound(f'ALF object "{obj}" not found on Alyx') collection_set = {x['collection'] for x in results if match(x)} if len(collection_set) > 1: raise ALFMultipleCollectionsFound('Matching object belongs to multiple collections:' + ', '.join(collection_set)) # Download and optionally load the datasets out_files = self.download_datasets(x for x in results if match(x)) assert not any(x is None for x in out_files), 'failed to download object' if download_only: return out_files else: return alfio.load_object(out_files[0].parent, obj, **kwargs)
def _load_recursive(self, eid, **kwargs): """ From a Session ID and dataset types, queries Alyx database, downloads the data from Globus, and loads into numpy array. Supports multiple sessions """ if isinstance(eid, str): return self._load(eid, **kwargs) if isinstance(eid, list): # dataclass output requested if kwargs.get('dclass_output', False): for i, e in enumerate(eid): if i == 0: out = self._load(e, **kwargs) else: out.append(self._load(e, **kwargs)) else: # list output requested out = [] for e in eid: out.append(self._load(e, **kwargs)[0]) return out
[docs] def to_eid(self, id: Listable(Union[str, Path, UUID, dict]) = None, cache_dir: Optional[Union[str, Path]] = None) -> Listable(str): if isinstance(id, (list, tuple)): # Recurse return [self.to_eid(i, cache_dir) for i in id] if isinstance(id, UUID): return str(id) # elif is_exp_ref(id): # return ref2eid(id, one=self) elif isinstance(id, dict): assert {'subject', 'number', 'start_time', 'lab'}.issubset(id) root = Path(self._get_cache_dir(cache_dir)) id = root.joinpath( id['lab'], 'Subjects', id['subject'], id['start_time'][:10], ('%03d' % id['number'])) if alfio.is_session_path(id): return self.eid_from_path(id) elif isinstance(id, str): if len(id) > 36: id = id[-36:] if not alfio.is_uuid_string(id): raise ValueError('Invalid experiment ID') else: return id else: raise ValueError('Unrecognized experiment ID')
def _make_dataclass(self, eid, dataset_types=None, cache_dir=None, dry_run=False, clobber=False, offline=False, keep_uuid=False): # if the input as an UUID, add the beginning of URL to it cache_dir = self._get_cache_dir(cache_dir) # get session json information as a dictionary from the alyx API try: ses = self.alyx.rest('sessions', 'read', id=eid) except requests.HTTPError: raise requests.HTTPError('Session ' + eid + ' does not exist') # filter by dataset types dc = SessionDataInfo.from_session_details(ses, dataset_types=dataset_types, eid=eid) # loop over each dataset and download if necessary with concurrent.futures.ThreadPoolExecutor(max_workers=NTHREADS) as executor: futures = [] for ind in range(len(dc)): if dc.url[ind] is None or dry_run: futures.append(None) else: futures.append(executor.submit( self.download_dataset, dc.url[ind], cache_dir=cache_dir, clobber=clobber, offline=offline, keep_uuid=keep_uuid, file_size=dc.file_size[ind], hash=dc.hash[ind])) concurrent.futures.wait(list(filter(lambda x: x is not None, futures))) for ind, future in enumerate(futures): if future is None: continue dc.local_path[ind] = future.result() # filter by daataset types and update the cache self._update_cache(ses, dataset_types=dataset_types) return dc def _ls(self, table=None, verbose=False): """ Queries the database for a list of 'users' and/or 'dataset-types' and/or 'subjects' fields :param table: the table (s) to query among: 'dataset-types','users' and 'subjects'; if empty or None assumes all tables :type table: str :param verbose: [False] prints the list in the current window :type verbose: bool :return: list of names to query, list of full raw output in json serialized format :rtype: list, list """ assert (isinstance(table, str)) table_field_names = { 'dataset-types': 'name', 'datasets': 'name', 'users': 'username', 'subjects': 'nickname', 'labs': 'name'} if not table or table not in list(set(_ENDPOINTS.keys())): raise KeyError("The attribute/endpoint: " + table + " doesn't exist \n" + "possible values are " + str(set(_ENDPOINTS.values()))) field_name = table_field_names[_ENDPOINTS[table]] full_out = self.alyx.get('/' + _ENDPOINTS[table]) list_out = [f[field_name] for f in full_out] if verbose: pprint(list_out) return list_out, full_out # def search(self, dataset_types=None, users=None, subjects=None, date_range=None, # lab=None, number=None, task_protocol=None, details=False):
[docs] def search(self, details=False, limit=None, **kwargs): """ Applies a filter to the sessions (eid) table and returns a list of json dictionaries corresponding to sessions. For a list of search terms, use the methods >>> one.search_terms() :param dataset_types: list of dataset_types :type dataset_types: list of str :param date_range: list of 2 strings or list of 2 dates that define the range :type date_range: list :param details: default False, returns also the session details as per the REST response :type details: bool :param lab: a str or list of lab names :type lab: list or str :param limit: default None, limits results (if pagination enabled on server) :type limit: int List of possible search terms :param location: a str or list of lab location (as per Alyx definition) name Note: this corresponds to the specific rig, not the lab geographical location per se :type location: str :param number: number of session to be returned; will take the first n sessions found :type number: str or int :param performance_lte / performance_gte: search only for sessions whose performance is less equal or greater equal than a pre-defined threshold as a percentage (0-100) :type performance_gte: float :param subjects: a list of subjects nickname :type subjects: list or str :param task_protocol: a str or list of task protocol name (can be partial, i.e. any task protocol containing that str will be found) :type task_protocol: str :param users: a list of users :type users: list or str :return: list of eids, if details is True, also returns a list of json dictionaries, each entry corresponding to a matching session :rtype: list, list """ # small function to make sure string inputs are interpreted as lists def validate_input(inarg): if isinstance(inarg, str): return [inarg] elif isinstance(inarg, int): return [str(inarg)] else: return inarg # loop over input arguments and build the url url = '/sessions?' for k in kwargs.keys(): # check that the input matches one of the defined filters if k not in SEARCH_TERMS: _logger.error(f'"{k}" is not a valid search keyword' + '\n' + "Valid keywords are: " + str(set(SEARCH_TERMS.values()))) return # then make sure the field is formatted properly field = SEARCH_TERMS[k] if field == 'date_range': query = _validate_date_range(kwargs[k]) else: query = validate_input(kwargs[k]) # at last append to the URL url = url + f"&{field}=" + ','.join(query) # the REST pagination argument has to be the last one if limit: url += f'&limit={limit}' # implements the loading itself ses = self.alyx.get(url) if len(ses) > 2500: eids = [s['url'] for s in tqdm.tqdm(ses)] # flattens session info else: eids = [s['url'] for s in ses] eids = [e.split('/')[-1] for e in eids] # remove url to make it portable if details: for s in ses: if all([s.get('lab'), s.get('subject'), s.get('start_time')]): s['local_path'] = str(Path(self._par.CACHE_DIR, s['lab'], 'Subjects', s['subject'], s['start_time'][:10], str(s['number']).zfill(3))) else: s['local_path'] = None return eids, ses else: return eids
[docs] def download_datasets(self, dsets, **kwargs): """ Download several datsets through a list of alyx REST dictionaries :param dset: list of dataset dictionaries from an Alyx REST query OR list of URL strings :return: local file path """ out_files = [] with concurrent.futures.ThreadPoolExecutor(max_workers=NTHREADS) as executor: futures = [executor.submit(self.download_dataset, dset, file_size=dset['file_size'], hash=dset['hash'], **kwargs) for dset in dsets] concurrent.futures.wait(futures) for future in futures: out_files.append(future.result()) return out_files
[docs] def download_dataset(self, dset, cache_dir=None, **kwargs): """ Download a dataset from an alyx REST dictionary :param dset: single dataset dictionary from an Alyx REST query OR URL string :param cache_dir (optional): root directory to save the data in (home/downloads by default) :return: local file path """ if isinstance(dset, str): url = dset else: url = next((fr['data_url'] for fr in dset['file_records'] if fr['data_url'] and fr['exists']), None) if not url: str_dset = Path(dset['collection']).joinpath(dset['name']) _logger.warning(f"{str_dset} exist flag or url not found in Alyx") return assert url.startswith(self._par.HTTP_DATA_SERVER), \ ('remote protocol and/or hostname does not match HTTP_DATA_SERVER parameter:\n' + f'"{url[:40]}..." should start with "{self._par.HTTP_DATA_SERVER}"') relpath = Path(url.replace(self._par.HTTP_DATA_SERVER, '.')).parents[0] target_dir = Path(self._get_cache_dir(cache_dir), relpath) return self._download_file(url=url, target_dir=target_dir, **kwargs)
def _tag_mismatched_file_record(self, url): fr = self.alyx.rest('files', 'list', django=f"dataset,{Path(url).name.split('.')[-2]}," f"data_repository__globus_is_personal,False") if len(fr) > 0: json_field = fr[0]['json'] if json_field is None: json_field = {'mismatch_hash': True} else: json_field.update({'mismatch_hash': True}) self.alyx.rest('files', 'partial_update', id=fr[0]['url'][-36:], data={'json': json_field}) def _download_file(self, url, target_dir, clobber=False, offline=False, keep_uuid=False, file_size=None, hash=None): """ Downloads a single file from an HTTP webserver :param url: :param clobber: (bool: False) overwrites local dataset if any :param offline: :param keep_uuid: :param file_size: :param hash: :return: """ Path(target_dir).mkdir(parents=True, exist_ok=True) local_path = str(target_dir) + os.sep + os.path.basename(url) if not keep_uuid: local_path = alfio.remove_uuid_file(local_path, dry=True) if Path(local_path).exists(): # the local file hash doesn't match the dataset table cached hash hash_mismatch = hash and hashfile.md5(Path(local_path)) != hash file_size_mismatch = file_size and Path(local_path).stat().st_size != file_size if hash_mismatch or file_size_mismatch: clobber = True _logger.warning(f" local md5 or size mismatch, re-downloading {local_path}") # if there is no cached file, download else: clobber = True if clobber and not offline: local_path, md5 = wc.http_download_file( url, username=self._par.HTTP_DATA_SERVER_LOGIN, password=self._par.HTTP_DATA_SERVER_PWD, cache_dir=str(target_dir), clobber=clobber, return_md5=True) # post download, if there is a mismatch between Alyx and the newly downloaded file size # or hash flag the offending file record in Alyx for database maintenance hash_mismatch = hash and md5 != hash file_size_mismatch = file_size and Path(local_path).stat().st_size != file_size if hash_mismatch or file_size_mismatch: self._tag_mismatched_file_record(url) if keep_uuid: return local_path else: return alfio.remove_uuid_file(local_path)
[docs] @staticmethod def search_terms(): """ Returns possible search terms to be used in the one.search method. :return: a tuple containing possible search terms: :rtype: tuple """ return sorted(list(set(SEARCH_TERMS.values())))
[docs] @staticmethod def keywords(): """ Returns possible keywords to be used in the one.list method :return: a tuple containing possible search terms: :rtype: tuple """ return sorted(list(set(_ENDPOINTS.values())))
[docs] @staticmethod def setup(): """ Interactive command tool that populates parameter file for ONE IBL. """ oneibl.params.setup()
[docs] def path_from_eid(self, eid: str, use_cache: bool = True) -> Listable(Path): """ From an experiment id or a list of experiment ids, gets the local cache path :param eid: eid (UUID) or list of UUIDs :param use_cache: if set to False, will force database connection :return: eid or list of eids """ # If eid is a list of eIDs recurse through list and return the results if isinstance(eid, list): path_list = [] for p in eid: path_list.append(self.path_from_eid(p)) return path_list # If not valid return None if not alfio.is_uuid_string(eid): print(eid, " is not a valid eID/UUID string") return # first try avoid hitting the database if self._cache.size > 0 and use_cache: cache_path = super().path_from_eid(eid) if cache_path: return cache_path # if it wasn't successful, query Alyx ses = self.alyx.rest('sessions', 'list', django=f'pk,{eid}') if len(ses) == 0: return None else: return Path(self._par.CACHE_DIR).joinpath( ses[0]['lab'], 'Subjects', ses[0]['subject'], ses[0]['start_time'][:10], str(ses[0]['number']).zfill(3))
[docs] def eid_from_path(self, path_obj: Union[str, Path], use_cache: bool = True) -> Listable(Path): """ From a local path, gets the experiment id :param path_obj: local path or list of local paths :param use_cache: if set to False, will force database connection :return: eid or list of eids """ # If path_obj is a list recurse through it and return a list if isinstance(path_obj, list): path_obj = [Path(x) for x in path_obj] eid_list = [] for p in path_obj: eid_list.append(self.eid_from_path(p)) return eid_list # else ensure the path ends with mouse,date, number path_obj = Path(path_obj) session_path = alfio.get_session_path(path_obj) # if path does not have a date and a number return None if session_path is None: return None # try the cached info to possibly avoid hitting database cache_eid = super().eid_from_path(path_obj) if cache_eid: return cache_eid # if not search for subj, date, number XXX: hits the DB uuid = self.search(subjects=session_path.parts[-3], date_range=session_path.parts[-2], number=session_path.parts[-1]) # Return the uuid if any return uuid[0] if uuid else None
[docs] def url_from_path(self, filepath): """ Given a local file path, returns the URL of the remote file. :param filepath: A local file path :return: A URL string """ eid = self.eid_from_path(filepath) try: dataset, = self.alyx.rest('datasets', 'list', session=eid, name=Path(filepath).name) except ValueError: raise ALFObjectNotFound(f'File record for {filepath} not found on Alyx') return next( r['data_url'] for r in dataset['file_records'] if r['data_url'] and r['exists'])
[docs] @parse_id def datasets_from_type(self, eid, dataset_type, full=False): """ Get list of datasets belonging to a given dataset type for a given session :param eid: Experiment session identifier; may be a UUID, URL, experiment reference string details dict or Path :param dataset_type: A dataset type, e.g. camera.times :param full: If True, a dictionary of details is returned for each dataset :return: A list of datasets belonging to that session's dataset type """ restriction = f'session__id,{eid},dataset_type__name,{dataset_type}' datasets = self.alyx.rest('datasets', 'list', django=restriction) return datasets if full else [d['name'] for d in datasets]
[docs] def get_details(self, eid: str, full: bool = False): """ Returns details of eid like from one.search, optional return full session details. """ # If eid is a list of eIDs recurse through list and return the results if isinstance(eid, list): details_list = [] for p in eid: details_list.append(self.get_details(p, full=full)) return details_list # If not valid return None if not alfio.is_uuid_string(eid): print(eid, " is not a valid eID/UUID string") return # load all details dets = self.alyx.rest("sessions", "read", eid) if full: return dets # If it's not full return the normal output like from a one.search det_fields = ["subject", "start_time", "number", "lab", "project", "url", "task_protocol", "local_path"] out = {k: v for k, v in dets.items() if k in det_fields} out.update({'local_path': self.path_from_eid(eid)}) return out
def _update_cache(self, ses, dataset_types): """ :param ses: session details dictionary as per Alyx response :param dataset_types: :return: is_updated (bool): if the cache was updated or not """ save = False pqt_dsets = _ses2pandas(ses, dtypes=dataset_types) # if the dataframe is empty, return if pqt_dsets.size == 0: return # if the cache is empty create the cache variable elif self._cache.size == 0: self._cache = pqt_dsets save = True # the cache is not empty and there are datasets in the query else: isin, icache = ismember2d(pqt_dsets[['id_0', 'id_1']].to_numpy(), self._cache[['id_0', 'id_1']].to_numpy()) # check if the hash / filesize fields have changed on patching heq = (self._cache['hash'].iloc[icache].to_numpy() == pqt_dsets['hash'].iloc[isin].to_numpy()) feq = np.isclose(self._cache['file_size'].iloc[icache].to_numpy(), pqt_dsets['file_size'].iloc[isin].to_numpy(), rtol=0, atol=0, equal_nan=True) eq = np.logical_and(heq, feq) # update new hash / filesizes if not np.all(eq): self._cache.iloc[icache, 4:6] = pqt_dsets.iloc[np.where(isin)[0], 4:6].to_numpy() save = True # append datasets that haven't been found if not np.all(isin): self._cache = self._cache.append(pqt_dsets.iloc[np.where(~isin)[0]]) self._cache = self._cache.reindex() save = True if save: # before saving makes sure pandas did not cast uuids in float typs = [t for t, k in zip(self._cache.dtypes, self._cache.keys()) if 'id_' in k] assert (all(map(lambda t: t == np.int64, typs))) # if this gets too big, look into saving only when destroying the ONE object parquet.save(self._cache_file, self._cache)
[docs] def download_raw_partial(self, url_cbin, url_ch, first_chunk=0, last_chunk=0): assert str(url_cbin).endswith('.cbin') assert str(url_ch).endswith('.ch') relpath = Path(url_cbin.replace(self._par.HTTP_DATA_SERVER, '.')).parents[0] target_dir = Path(self._get_cache_dir(None), relpath) Path(target_dir).mkdir(parents=True, exist_ok=True) # First, download the .ch file if necessary if isinstance(url_ch, Path): ch_file = url_ch else: ch_file = Path(wc.http_download_file( url_ch, username=self._par.HTTP_DATA_SERVER_LOGIN, password=self._par.HTTP_DATA_SERVER_PWD, cache_dir=target_dir, clobber=True, return_md5=False)) ch_file = alfio.remove_uuid_file(ch_file) ch_file_stream = ch_file.with_suffix('.stream.ch') # Load the .ch file. with open(ch_file, 'r') as f: cmeta = json.load(f) # Get the first byte and number of bytes to download. i0 = cmeta['chunk_bounds'][first_chunk] ns_stream = cmeta['chunk_bounds'][last_chunk + 1] - i0 # if the cached version happens to be the same as the one on disk, just load it if ch_file_stream.exists(): with open(ch_file_stream, 'r') as f: cmeta_stream = json.load(f) if (cmeta_stream.get('chopped_first_sample', None) == i0 and cmeta_stream.get('chopped_total_samples', None) == ns_stream): return spikeglx.Reader(ch_file_stream.with_suffix('.cbin')) else: shutil.copy(ch_file, ch_file_stream) assert ch_file_stream.exists() # prepare the metadata file cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk:last_chunk + 2] cmeta['chunk_bounds'] = [_ - i0 for _ in cmeta['chunk_bounds']] assert len(cmeta['chunk_bounds']) >= 2 assert cmeta['chunk_bounds'][0] == 0 first_byte = cmeta['chunk_offsets'][first_chunk] cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk:last_chunk + 2] cmeta['chunk_offsets'] = [_ - first_byte for _ in cmeta['chunk_offsets']] assert len(cmeta['chunk_offsets']) >= 2 assert cmeta['chunk_offsets'][0] == 0 n_bytes = cmeta['chunk_offsets'][-1] assert n_bytes > 0 # Save the chopped chunk bounds and ossets. cmeta['sha1_compressed'] = None cmeta['sha1_uncompressed'] = None cmeta['chopped'] = True cmeta['chopped_first_sample'] = i0 cmeta['chopped_total_samples'] = ns_stream with open(ch_file_stream, 'w') as f: json.dump(cmeta, f, indent=2, sort_keys=True) # Download the requested chunks cbin_local_path = wc.http_download_file( url_cbin, username=self._par.HTTP_DATA_SERVER_LOGIN, password=self._par.HTTP_DATA_SERVER_PWD, cache_dir=target_dir, clobber=True, return_md5=False, chunks=(first_byte, n_bytes)) cbin_local_path = alfio.remove_uuid_file(cbin_local_path) cbin_local_path_renamed = cbin_local_path.with_suffix('.stream.cbin') cbin_local_path.rename(cbin_local_path_renamed) assert cbin_local_path_renamed.exists() shutil.copy(cbin_local_path.with_suffix('.meta'), cbin_local_path_renamed.with_suffix('.meta')) reader = spikeglx.Reader(cbin_local_path_renamed) return reader
def _validate_date_range(date_range): """ Validates and arrange date range in a 2 elements list """ if isinstance(date_range, str): date_range = [date_range, date_range] if len(date_range) == 1: date_range = [date_range[0], date_range[0]] return date_range