Source code for ibllib.qc.base

import logging
from abc import abstractmethod
from pathlib import Path
from itertools import chain

import numpy as np
from one.api import ONE
from one.alf import spec

"""dict: custom sign off categories"""
SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}


[docs] class QC: """A base class for data quality control.""" def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'): """ A base class for data quality control. :param endpoint_id: Eid for endpoint. If using sessions can also be a session path :param log: A logging.Logger instance, if None the 'ibllib' logger is used :param one: An ONE instance for fetching and setting the QC on Alyx :param endpoint: The endpoint name to apply qc to. Default is 'sessions' """ self.one = one or ONE() self.log = log or logging.getLogger(__name__) if endpoint == 'sessions': self.endpoint = endpoint self._set_eid_or_path(endpoint_id) self.json = False else: self.endpoint = endpoint self._confirm_endpoint_id(endpoint_id) # Ensure outcome attribute matches Alyx record updatable = self.eid and self.one and not self.one.offline self._outcome = self.update('NOT_SET', namespace='') if updatable else spec.QC.NOT_SET self.log.debug(f'Current QC status is {self.outcome}')
[docs] @abstractmethod def run(self): """Run the QC tests and return the outcome. :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS" """ pass
[docs] @abstractmethod def load_data(self): """Load the data required to compute the QC. Subclasses may implement this for loading raw data. """ pass
@property def outcome(self): """one.alf.spec.QC: The overall session outcome.""" return self._outcome @outcome.setter def outcome(self, value): value = spec.QC.validate(value) # ensure valid enum if self._outcome < value: self._outcome = value
[docs] @staticmethod def overall_outcome(outcomes: iter, agg=max) -> spec.QC: """ Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome. Example: QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL' Parameters ---------- outcomes : iterable of one.alf.spec.QC, str or int An iterable of QC outcomes. agg : function Outcome code aggregate function, default is max (i.e. worst). Returns ------- one.alf.spec.QC The overall outcome. """ outcomes = filter(lambda x: x not in (None, np.NaN), outcomes) return agg(map(spec.QC.validate, outcomes))
def _set_eid_or_path(self, session_path_or_eid): """Parse a given eID or session path. If a session UUID is given, resolves and stores the local path and vice versa :param session_path_or_eid: A session eid or path :return: """ self.eid = None if spec.is_uuid_string(str(session_path_or_eid)): self.eid = session_path_or_eid # Try to set session_path if data is found locally self.session_path = self.one.eid2path(self.eid) elif spec.is_session_path(session_path_or_eid): self.session_path = Path(session_path_or_eid) if self.one is not None: self.eid = self.one.path2eid(self.session_path) if not self.eid: self.log.warning('Failed to determine eID from session path') else: self.log.error('Cannot run QC: an experiment uuid or session path is required') raise ValueError("'session' must be a valid session path or uuid") def _confirm_endpoint_id(self, endpoint_id): # Have as read for now since 'list' isn't working target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None default_data = {} if target_obj: self.json = 'qc' not in target_obj self.eid = endpoint_id if self.json: default_data['qc'] = 'NOT_SET' if 'extended_qc' not in target_obj: default_data['extended_qc'] = {} if not default_data: return # No need to set up JSON for QC json_field = target_obj.get('json') if not json_field or (self.json and not json_field.get('qc', None)): self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, field_name='json', data=default_data) else: self.log.error('Cannot run QC: endpoint id is not recognised') raise ValueError("'endpoint_id' must be a valid uuid")
[docs] def update(self, outcome=None, namespace='experimenter', override=False): """Update the qc field in Alyx. Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value. Parameters ---------- outcome : str, int, one.alf.spec.QC A QC outcome; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET". namespace : str The extended QC key specifying the type of QC associated with the outcome. override : bool If True the QC field is updated even if new value is better than previous. Returns ------- one.alf.spec.QC The current QC outcome on Alyx. Example ------- >>> qc = QC('path/to/session') >>> qc.update('PASS') # Update current QC field to 'PASS' if not set """ assert self.one, 'instance of one should be provided' if self.one.offline: self.log.warning('Running on OneOffline instance, unable to update remote QC') return outcome = spec.QC.validate(self.outcome if outcome is None else outcome) assert self.eid, 'Unable to update Alyx; eID not set' if namespace: # Record in extended qc self.update_extended_qc({namespace: outcome.name}) details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) current_status = (details['json'] if self.json else details)['qc'] current_status = spec.QC.validate(current_status) if current_status < outcome or override: r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, field_name='json', data={'qc': outcome.name}) \ if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid, data={'qc': outcome.name}) current_status = spec.QC.validate(r['qc']) assert current_status == outcome, 'Failed to update session QC' self.log.info(f'QC field successfully updated to {outcome.name} for {self.endpoint[:-1]} ' f'{self.eid}') self._outcome = current_status return self.outcome
[docs] def update_extended_qc(self, data): """Update the extended_qc field in Alyx. Subclasses should chain a call to this. :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1. :return: the updated extended_qc field """ assert self.eid, 'Unable to update Alyx; eID not set' assert self.one, 'instance of one should be provided' if self.one.offline: self.log.warning('Running on OneOffline instance, unable to update remote QC') return # Ensure None instead of NaNs for k, v in data.items(): if v is not None and not isinstance(v, str): if isinstance(v, tuple): data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) else: data[k] = None if np.isnan(v).all() else v details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) if 'extended_qc' not in details: extended_qc = details['json']['extended_qc'] or {} extended_qc.update(data) extended_qc_dict = {'extended_qc': extended_qc} out = self.one.alyx.json_field_update( endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict) else: extended_qc = details['extended_qc'] or {} extended_qc.update(data) out = self.one.alyx.json_field_update( endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc) self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' f'{self.eid}') return out
[docs] def compute_outcome_from_extended_qc(self) -> str: """Return the session outcome computed from aggregating the extended QC.""" details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc'] return self.overall_outcome(v for k, v in extended_qc.items() or {} if k[0] != '_')
[docs] def sign_off_dict(exp_dec, sign_off_categories=None): """ Create sign off dictionary. Creates a dict containing 'sign off' keys for each device and task protocol in the provided experiment description. Parameters ---------- exp_dec : dict A loaded experiment description file. sign_off_categories : dict of list A dictionary of custom JSON keys for a given device in the acquisition description file. Returns ------- dict of dict The sign off dictionary with the main key 'sign_off_checklist' containing keys for each device and task protocol. """ # Note this assumes devices each contain a dict of dicts # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},} sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES sign_off_keys = set() for k, v in exp_dec.get('devices', {}).items(): assert isinstance(v, dict) and v if len(v.keys()) == 1 and next(iter(v.keys())) == k: if k in sign_off_categories: for subkey in sign_off_categories[k]: sign_off_keys.add(f'{k}_{subkey}') else: sign_off_keys.add(k) else: for kk in v.keys(): if k in sign_off_categories: for subkey in sign_off_categories[k]: sign_off_keys.add(f'{k}_{subkey}_{kk}') else: sign_off_keys.add(f'{k}_{kk}') # Add keys for each protocol for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): sign_off_keys.add(f'{v}_{i:02}') return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))}