Source code for oneibl.webclient

import json
import logging
import math
import os
import re
import urllib.request
from collections.abc import Mapping
from pathlib import Path, PurePosixPath
import hashlib

import requests

from ibllib.misc import pprint, print_progress

SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl')
_logger = logging.getLogger('ibllib')


class _PaginatedResponse(Mapping):
    """
    This class allows to emulate a list from a paginated response.
    Provides cache functionality
    PaginatedResponse(alyx, response)
    """

    def __init__(self, alyx, rep):
        self.alyx = alyx
        self.count = rep['count']
        self.limit = len(rep['results'])
        # warning: the offset and limit filters are not necessarily the last ones
        lquery = [q for q in rep['next'].split('&')
                  if not (q.startswith('offset=') or q.startswith('limit='))]
        self.query = '&'.join(lquery)
        # init the cache, list with None with count size
        self._cache = [None for _ in range(self.count)]
        # fill the cache with results of the query
        for i in range(self.limit):
            self._cache[i] = rep['results'][i]

    def __len__(self):
        return self.count

    def __getitem__(self, item):
        if self._cache[item] is None:
            offset = self.limit * math.floor(item / self.limit)
            query = f'{self.query}&limit={self.limit}&offset={offset}'
            res = self.alyx._generic_request(requests.get, query)
            for i, r in enumerate(res['results']):
                self._cache[i + offset] = res['results'][i]
        return self._cache[item]

    def __iter__(self):
        for i in range(self.count):
            yield self.__getitem__(i)


[docs]def sdsc_globus_path_from_dataset(dset): """ :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint Returns SDSC globus file path from a dset record or a list of dsets records from REST """ return _path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True)
[docs]def globus_path_from_dataset(dset, repository=None, uuid=False): """ Returns local one file path from a dset record or a list of dsets records from REST :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param repository: (optional) repository name of the file record (if None, will take the first filerecord with an URL) """ return _path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid)
[docs]def one_path_from_dataset(dset, one_cache): """ Returns local one file path from a dset record or a list of dsets records from REST :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param one_cache: the one cache directory """ return _path_from_dataset(dset, root_path=one_cache, uuid=False)
[docs]def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH): """ Returns sdsc file path from a dset record or a list of dsets records from REST :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param root_path: (optional) the prefix path such as one download directory or sdsc root """ return _path_from_dataset(dset, root_path=root_path, uuid=True)
def _path_from_dataset(dset, root_path=None, repository=None, uuid=False): """ returns the local file path from a dset record from a REST query :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint :param root_path: (optional) the prefix path such as one download directory or sdsc root :param repository: :param uuid: (optional bool) if True, will add UUID before the file extension :return: Path or list of Path """ if isinstance(dset, list): return [_path_from_dataset(d) for d in dset] if repository: fr = next((fr for fr in dset['file_records'] if fr['data_repository'] == repository)) else: fr = next((fr for fr in dset['file_records'] if fr['data_url'])) uuid = dset['url'][-36:] if uuid else None return _path_from_filerecord(fr, root_path=root_path, uuid=uuid) def _path_from_filerecord(fr, root_path=SDSC_ROOT_PATH, uuid=None): """ Returns a data file Path constructed from an Alyx file record. The Path type returned depends on the type of root_path: If root_path is a string a Path object is returned, otherwise if the root_path is a PurePath, the same path type is returned. :param fr: An Alyx file record dict :param root_path: An optional root path :param uuid: An optional UUID to add to the file name :return: A filepath as a pathlib object """ import alf.io if isinstance(fr, list): return [_path_from_filerecord(f) for f in fr] repo_path = fr['data_repository_path'] repo_path = repo_path[repo_path.startswith('/'):] # remove starting / if any # repo_path = (p := fr['data_repository_path'])[p[0] == '/':] # py3.8 Remove slash at start file_path = PurePosixPath(repo_path, fr['relative_path']) if root_path: # NB: By checking for string we won't cast any PurePaths if isinstance(root_path, str): root_path = Path(root_path) file_path = root_path / file_path if uuid: file_path = alf.io.add_uuid_string(file_path, uuid) return file_path
[docs]def http_download_file_list(links_to_file_list, **kwargs): """ Downloads a list of files from the flat Iron from a list of links. Same options behaviour as http_download_file :param links_to_file_list: list of http links to files. :type links_to_file_list: list :return: (list) a list of the local full path of the downloaded files. """ file_names_list = [] for link_str in links_to_file_list: file_names_list.append(http_download_file(link_str, **kwargs)) return file_names_list
[docs]def http_download_file(full_link_to_file, chunks=None, *, clobber=False, username='', password='', cache_dir='', return_md5=False, headers=None): """ :param full_link_to_file: http link to the file. :type full_link_to_file: str :param clobber: [False] If True, force overwrite the existing file. :type clobber: bool :param username: [''] authentication for password protected file server. :type username: str :param password: [''] authentication for password protected file server. :type password: str :param cache_dir: [''] directory in which files are cached; defaults to user's Download directory. :param: headers: [{}] additional headers to add to the request (auth tokens etc..) :type cache_dir: str :return: (str) a list of the local full path of the downloaded files. """ from ibllib.io import hashfile if not full_link_to_file: return '' # default cache directory is the home dir if not cache_dir: cache_dir = str(Path.home().joinpath("Downloads")) # This is the local file name file_name = str(cache_dir) + os.sep + os.path.basename(full_link_to_file) # do not overwrite an existing file unless specified if not clobber and os.path.exists(file_name): return (file_name, hashfile.md5(file_name)) if return_md5 else file_name # This should be the base url you wanted to access. baseurl = os.path.split(str(full_link_to_file))[0] # Create a password manager manager = urllib.request.HTTPPasswordMgrWithDefaultRealm() if (len(password) != 0) & (len(username) != 0): manager.add_password(None, baseurl, username, password) # Create an authentication handler using the password manager auth = urllib.request.HTTPBasicAuthHandler(manager) # Create an opener that will replace the default urlopen method on further calls opener = urllib.request.build_opener(auth) urllib.request.install_opener(opener) # Support for partial download. req = urllib.request.Request(full_link_to_file) if chunks is not None: first_byte, n_bytes = chunks req.add_header("Range", "bytes=%d-%d" % (first_byte, first_byte + n_bytes - 1)) # add additional headers if headers is not None: for k in headers: req.add_header(k, headers[k]) # Open the url and get the length try: u = urllib.request.urlopen(req) except urllib.error.HTTPError as e: _logger.error(f"{str(e)} {full_link_to_file}") raise e file_size = int(u.getheader('Content-length')) print(f"Downloading: {file_name} Bytes: {file_size}") file_size_dl = 0 block_sz = 8192 * 64 * 8 md5 = hashlib.md5() f = open(file_name, 'wb') while True: buffer = u.read(block_sz) if not buffer: break file_size_dl += len(buffer) f.write(buffer) if return_md5: md5.update(buffer) print_progress(file_size_dl, file_size, prefix='', suffix='') f.close() return (file_name, md5.hexdigest()) if return_md5 else file_name
[docs]def file_record_to_url(file_records, urls=[]): """ Translate a Json dictionary to an usable http url for downlading files. :param file_records: json containing a 'data_url' field :type file_records: dict :param urls: a list of strings containing previous data_urls on which new urls will be appended :type urls: list :return: urls: (list) a list of strings representing full data urls """ for fr in file_records: if fr['data_url'] is not None: urls.append(fr['data_url']) return urls
[docs]def dataset_record_to_url(dataset_record): """ Extracts a list of files urls from a list of dataset queries. :param dataset_record: dataset Json from a rest request. :type dataset_record: list :return: (list) a list of strings representing files urls corresponding to the datasets records """ urls = [] if type(dataset_record) is dict: dataset_record = [dataset_record] for ds in dataset_record: urls = file_record_to_url(ds['file_records'], urls) return urls
[docs]class UniqueSingletons(type): _instances: list = [] def __call__(cls, *args, **kwargs): # print('args', args, '\nkwargs', kwargs) for inst in UniqueSingletons._instances: if cls in inst and inst.get(cls, None).get('args') == (args, kwargs): return inst[cls].get('instance') new_instance = super(UniqueSingletons, cls).__call__(*args, **kwargs) # Optional rerun of constructor # new_instance.__init__(*args, **kwargs) new_instance_record = { cls: {'args': (args, kwargs), 'instance': new_instance} } UniqueSingletons._instances.append(new_instance_record) return new_instance
[docs]class AlyxClient(metaclass=UniqueSingletons): """ Class that implements simple GET/POST wrappers for the Alyx REST API http://alyx.readthedocs.io/en/latest/api.html """ def __init__(self, **kwargs): """ Create a client instance that allows to GET and POST to the Alyx server For oneibl, constructor attempts to authenticate with credentials in params.py For standalone cases, AlyxClient(username='', password='', base_url='') :param username: Alyx database user :type username: str :param password: Alyx database password :type password: str :param base_url: Alyx server address, including port and protocol :type base_url: str """ self.authenticate(**kwargs) self._headers['Accept'] = 'application/coreapi+json' self._rest_schemes = self.get('/docs') # the mixed accept application may cause errors sometimes, only necessary for the docs self._headers['Accept'] = 'application/json' self._obj_id = id(self) def _generic_request(self, reqfunction, rest_query, data=None, files=None): # makes sure the base url is the one from the instance rest_query = rest_query.replace(self._base_url, '') if not rest_query.startswith('/'): rest_query = '/' + rest_query _logger.debug(f"{self._base_url + rest_query}, headers: {self._headers}") headers = self._headers.copy() if files is None: data = json.dumps(data) if isinstance(data, dict) or isinstance(data, list) else data headers['Content-Type'] = 'application/json' r = reqfunction(self._base_url + rest_query, stream=True, headers=headers, data=data, files=files) if r and r.status_code in (200, 201): return json.loads(r.text) elif r and r.status_code == 204: return else: _logger.error(self._base_url + rest_query) _logger.error(r.text) raise(requests.HTTPError(r))
[docs] def authenticate(self, username='', password='', base_url=''): """ Gets a security token from the Alyx REST API to create requests headers. Credentials are in the params_secret_template.py file :param username: Alyx database user :type username: str :param password: Alyx database password :type password: str :param base_url: Alyx server address, including port and protocol :type base_url: str """ self._base_url = base_url rep = requests.post(base_url + '/auth-token', data=dict(username=username, password=password)) # Assign token or raise exception on internal server error self._token = rep.json() if rep.ok else rep.raise_for_status() if not (list(self._token.keys()) == ['token']): _logger.error(rep) raise Exception('Alyx authentication error. Check your credentials') self._headers = { 'Authorization': 'Token {}'.format(list(self._token.values())[0]), 'Accept': 'application/json'}
[docs] def delete(self, rest_query): """ Sends a DELETE request to the Alyx server. Will raise an exception on any status_code other than 200, 201. :param rest_query: examples: '/weighings/c617562d-c107-432e-a8ee-682c17f9e698' 'https://test.alyx.internationalbrainlab.org/weighings/c617562d-c107-432e-a8ee-682c17f9e698'. :type rest_query: str :return: (dict/list) json interpreted dictionary from response """ return self._generic_request(requests.delete, rest_query)
[docs] def download_file(self, url, **kwargs): """ Downloads a file on the Alyx server from a filerecord REST field URL :param url: full url of the file :param kwargs: webclient.http_download_file parameters :return: local path of downloaded file """ return http_download_file(url, headers=self._headers, **kwargs)
[docs] def get(self, rest_query): """ Sends a GET request to the Alyx server. Will raise an exception on any status_code other than 200, 201. For the dictionary contents and list of endpoints, refer to: https://alyx.internationalbrainlab.org/docs :param rest_query: example: '/sessions?user=Hamish'. :type rest_query: str :return: (dict/list) json interpreted dictionary from response """ rep = self._generic_request(requests.get, rest_query) _logger.debug(rest_query) if isinstance(rep, dict) and list(rep.keys()) == ['count', 'next', 'previous', 'results']: if len(rep['results']) < rep['count']: rep = _PaginatedResponse(self, rep) else: rep = rep['results'] return rep
[docs] def patch(self, rest_query, data=None, files=None): """ Sends a PATCH request to the Alyx server. For the dictionary contents, refer to: https://alyx.internationalbrainlab.org/docs :param rest_query: (required)the endpoint as full or relative URL :type rest_query: str :param data: json encoded string or dictionary (cf.requests) :type data: None, dict or str :param files: dictionary / tuple (cf.requests) :return: response object """ return self._generic_request(requests.patch, rest_query, data=data, files=files)
[docs] def post(self, rest_query, data=None, files=None): """ Sends a POST request to the Alyx server. For the dictionary contents, refer to: https://alyx.internationalbrainlab.org/docs :param rest_query: (required)the endpoint as full or relative URL :type rest_query: str :param data: dictionary or json encoded string :type data: None, dict or str :param files: dictionary / tuple (cf.requests) :return: response object """ return self._generic_request(requests.post, rest_query, data=data, files=files)
[docs] def put(self, rest_query, data=None, files=None): """ Sends a PUT request to the Alyx server. For the dictionary contents, refer to: https://alyx.internationalbrainlab.org/docs :param rest_query: (required)the endpoint as full or relative URL :type rest_query: str :param data: dictionary or json encoded string :type data: None, dict or str :param files: dictionary / tuple (cf.requests) :return: response object """ return self._generic_request(requests.put, rest_query, data=data, files=files)
[docs] def rest(self, url=None, action=None, id=None, data=None, files=None, **kwargs): """ alyx_client.rest(): lists endpoints alyx_client.rest(endpoint): lists actions for endpoint alyx_client.rest(endpoint, action): lists fields and URL Example with a rest endpoint with all actions >>> alyx.client.rest('subjects', 'list') alyx.client.rest('subjects', 'list', field_filter1='filterval') alyx.client.rest('subjects', 'create', data=sub_dict) alyx.client.rest('subjects', 'read', id='nickname') alyx.client.rest('subjects', 'update', id='nickname', data=sub_dict) alyx.client.rest('subjects', 'partial_update', id='nickname', data=sub_dict) alyx.client.rest('subjects', 'delete', id='nickname') alyx.client.rest('notes', 'create', data=nd, files={'image': open(image_file, 'rb')}) :param url: endpoint name :param action: 'list', 'create', 'read', 'update', 'partial_update', 'delete' :param id: lookup string for actions 'read', 'update', 'partial_update', and 'delete' :param data: data dictionary for actions 'update', 'partial_update' and 'create' :param files: if file upload :param ``**kwargs``: filter as per the Alyx REST documentation cf. https://alyx.internationalbrainlab.org/docs/ :return: list of queried dicts ('list') or dict (other actions) """ # if endpoint is None, list available endpoints if not url: pprint([k for k in self._rest_schemes.keys() if not k.startswith('_') and k]) return # remove beginning slash if any if url.startswith('/'): url = url[1:] # and split to the next slash or question mark endpoint = re.findall("^/*[^?/]*", url)[0].replace('/', '') # make sure the queryied endpoint exists, if not throw an informative error if endpoint not in self._rest_schemes.keys(): av = [k for k in self._rest_schemes.keys() if not k.startswith('_') and k] raise ValueError('REST endpoint "' + endpoint + '" does not exist. Available ' + 'endpoints are \n ' + '\n '.join(av)) endpoint_scheme = self._rest_schemes[endpoint] # on a filter request, override the default action parameter if '?' in url: action = 'list' # if action is None, list available actions for the required endpoint if not action: pprint(list(endpoint_scheme.keys())) return # make sure the the desired action exists, if not throw an informative error if action not in endpoint_scheme.keys(): raise ValueError('Action "' + action + '" for REST endpoint "' + endpoint + '" does ' + 'not exist. Available actions are: ' + '\n ' + '\n '.join(endpoint_scheme.keys())) # the actions below require an id in the URL, warn and help the user if action in ['read', 'update', 'partial_update', 'delete'] and not id: _logger.warning('REST action "' + action + '" requires an ID in the URL: ' + endpoint_scheme[action]['url']) return # the actions below require a data dictionary, warn and help the user with fields list if action in ['create', 'update', 'partial_update'] and not data: pprint(endpoint_scheme[action]['fields']) for act in endpoint_scheme[action]['fields']: print("'" + act['name'] + "': ...,") _logger.warning('REST action "' + action + '" requires a data dict with above keys') return if action == 'list': # list doesn't require id nor assert(endpoint_scheme[action]['action'] == 'get') # add to url data if it is a string if id: # this is a special case of the list where we query an uuid. Usually read is better if 'django' in kwargs.keys(): kwargs['django'] = kwargs['django'] + ',' else: kwargs['django'] = "" kwargs['django'] = f"{kwargs['django']}pk,{id}" # otherwise, look for a dictionary of filter terms if kwargs: url += '?' for k in kwargs.keys(): if isinstance(kwargs[k], str): query = kwargs[k] elif isinstance(kwargs[k], list): query = ','.join(kwargs[k]) else: query = str(kwargs[k]) url = url + f"&{k}=" + query return self.get('/' + url) if action == 'read': assert(endpoint_scheme[action]['action'] == 'get') return self.get('/' + endpoint + '/' + id.split('/')[-1]) elif action == 'create': assert(endpoint_scheme[action]['action'] == 'post') return self.post('/' + endpoint, data=data, files=files) elif action == 'delete': assert(endpoint_scheme[action]['action'] == 'delete') return self.delete('/' + endpoint + '/' + id.split('/')[-1]) elif action == 'partial_update': assert(endpoint_scheme[action]['action'] == 'patch') return self.patch('/' + endpoint + '/' + id.split('/')[-1], data=data, files=files) elif action == 'update': assert(endpoint_scheme[action]['action'] == 'put') return self.put('/' + endpoint + '/' + id.split('/')[-1], data=data, files=files)
# JSON field interface convenience methods def _check_inputs(self, endpoint: str) -> None: # make sure the queryied endpoint exists, if not throw an informative error if endpoint not in self._rest_schemes.keys(): av = [k for k in self._rest_schemes.keys() if not k.startswith('_') and k] raise ValueError('REST endpoint "' + endpoint + '" does not exist. Available ' + 'endpoints are \n ' + '\n '.join(av)) return
[docs] def json_field_write( self, endpoint: str = None, uuid: str = None, field_name: str = None, data: dict = None ) -> dict: """json_field_write [summary] Write data to WILL NOT CHECK IF DATA EXISTS NOTE: Destructive write! :param endpoint: Valid alyx endpoint, defaults to None :type endpoint: str, optional :param uuid: uuid or lookup name for endpoint :type uuid: str, optional :param field_name: Valid json field name, defaults to None :type field_name: str, optional :param data: data to write to json field, defaults to None :type data: dict, optional :return: Written data dict :rtype: dict """ self._check_inputs(endpoint) # Prepare data to patch patch_dict = {field_name: data} # Upload new extended_qc to session ret = self.rest(endpoint, "partial_update", id=uuid, data=patch_dict) return ret[field_name]
[docs] def json_field_update( self, endpoint: str = None, uuid: str = None, field_name: str = 'json', data: dict = None ) -> dict: """json_field_update Non destructive update of json field of endpoint for object Will update the field_name of the object with pk = uuid of given endpoint If data has keys with the same name of existing keys it will squash the old values (uses the dict.update() method) Example: one.alyx.json_field_update("sessions", "eid_str", "extended_qc", {"key": value}) :param endpoint: endpoint to hit :type endpoint: str :param uuid: uuid or lookup name of object :type uuid: str :param field_name: name of the json field :type field_name: str :param data: dictionary with fields to be updated :type data: dict :return: new patched json field contents :rtype: dict """ self._check_inputs(endpoint) # Load current json field contents current = self.rest(endpoint, "read", id=uuid)[field_name] if current is None: current = {} if not isinstance(current, dict): _logger.warning( f"Current json field {field_name} does not contains a dict, aborting update" ) return current # Patch current dict with new data current.update(data) # Prepare data to patch patch_dict = {field_name: current} # Upload new extended_qc to session ret = self.rest(endpoint, "partial_update", id=uuid, data=patch_dict) return ret[field_name]
[docs] def json_field_remove_key( self, endpoint: str = None, uuid: str = None, field_name: str = 'json', key: str = None ) -> dict: """json_field_remove_key Will remove inputted key from json field dict and reupload it to Alyx. Needs endpoint, uuid and json field name :param endpoint: endpoint to hit, defaults to None :type endpoint: str, optional :param uuid: uuid or lookup name for endpoint :type uuid: str, optional :param field_name: json field name of object, defaults to None :type field_name: str, optional :param key: key name of dictionary inside object, defaults to None :type key: str, optional :return: returns new content of json field :rtype: dict """ self._check_inputs(endpoint) current = self.rest(endpoint, "read", id=uuid)[field_name] # If no contents, cannot remove key, return if current is None: return current # if contents are not dict, cannot remove key, return contents if isinstance(current, str): _logger.warning(f"Cannot remove key {key} content of json field is of type str") return current # If key not present in contents of json field cannot remove key, return contents if current.get(key, None) is None: _logger.warning( f"{key}: Key not found in endpoint {endpoint} field {field_name}" ) return current _logger.info(f"Removing key from dict: '{key}'") current.pop(key) # Re-write contents without removed key written = self.json_field_write( endpoint=endpoint, uuid=uuid, field_name=field_name, data=current ) return written
[docs] def json_field_delete( self, endpoint: str = None, uuid: str = None, field_name: str = None ) -> None: self._check_inputs(endpoint) _ = self.rest(endpoint, "partial_update", id=uuid, data={field_name: None}) return _[field_name]