import logging
from abc import abstractmethod
from pathlib import Path
from itertools import chain
import numpy as np
from one.api import ONE
from one.alf import spec
"""dict: custom sign off categories"""
SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}
[docs]
class QC:
"""A base class for data quality control."""
def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'):
"""
A base class for data quality control.
:param endpoint_id: Eid for endpoint. If using sessions can also be a session path
:param log: A logging.Logger instance, if None the 'ibllib' logger is used
:param one: An ONE instance for fetching and setting the QC on Alyx
:param endpoint: The endpoint name to apply qc to. Default is 'sessions'
"""
self.one = one or ONE()
self.log = log or logging.getLogger(__name__)
if endpoint == 'sessions':
self.endpoint = endpoint
self._set_eid_or_path(endpoint_id)
self.json = False
else:
self.endpoint = endpoint
self._confirm_endpoint_id(endpoint_id)
# Ensure outcome attribute matches Alyx record
updatable = self.eid and self.one and not self.one.offline
self._outcome = self.update('NOT_SET', namespace='') if updatable else spec.QC.NOT_SET
self.log.debug(f'Current QC status is {self.outcome}')
[docs]
@abstractmethod
def run(self):
"""Run the QC tests and return the outcome.
:return: One of "CRITICAL", "FAIL", "WARNING" or "PASS"
"""
pass
[docs]
@abstractmethod
def load_data(self):
"""Load the data required to compute the QC.
Subclasses may implement this for loading raw data.
"""
pass
@property
def outcome(self):
"""one.alf.spec.QC: The overall session outcome."""
return self._outcome
@outcome.setter
def outcome(self, value):
value = spec.QC.validate(value) # ensure valid enum
if self._outcome < value:
self._outcome = value
[docs]
@staticmethod
def overall_outcome(outcomes: iter, agg=max) -> spec.QC:
"""
Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome.
Example:
QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL'
Parameters
----------
outcomes : iterable of one.alf.spec.QC, str or int
An iterable of QC outcomes.
agg : function
Outcome code aggregate function, default is max (i.e. worst).
Returns
-------
one.alf.spec.QC
The overall outcome.
"""
outcomes = filter(lambda x: x not in (None, np.nan), outcomes)
return agg(map(spec.QC.validate, outcomes))
def _set_eid_or_path(self, session_path_or_eid):
"""Parse a given eID or session path.
If a session UUID is given, resolves and stores the local path and vice versa
:param session_path_or_eid: A session eid or path
:return:
"""
self.eid = None
if spec.is_uuid_string(str(session_path_or_eid)):
self.eid = session_path_or_eid
# Try to set session_path if data is found locally
self.session_path = self.one.eid2path(self.eid)
elif spec.is_session_path(session_path_or_eid):
self.session_path = Path(session_path_or_eid)
if self.one is not None:
self.eid = self.one.path2eid(self.session_path)
if not self.eid:
self.log.warning('Failed to determine eID from session path')
else:
self.log.error('Cannot run QC: an experiment uuid or session path is required')
raise ValueError("'session' must be a valid session path or uuid")
def _confirm_endpoint_id(self, endpoint_id):
# Have as read for now since 'list' isn't working
target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None
default_data = {}
if target_obj:
self.json = 'qc' not in target_obj
self.eid = endpoint_id
if self.json:
default_data['qc'] = 'NOT_SET'
if 'extended_qc' not in target_obj:
default_data['extended_qc'] = {}
if not default_data:
return # No need to set up JSON for QC
json_field = target_obj.get('json')
if not json_field or (self.json and not json_field.get('qc', None)):
self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
field_name='json', data=default_data)
else:
self.log.error('Cannot run QC: endpoint id is not recognised')
raise ValueError("'endpoint_id' must be a valid uuid")
[docs]
def update(self, outcome=None, namespace='experimenter', override=False):
"""Update the qc field in Alyx.
Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value.
Parameters
----------
outcome : str, int, one.alf.spec.QC
A QC outcome; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET".
namespace : str
The extended QC key specifying the type of QC associated with the outcome.
override : bool
If True the QC field is updated even if new value is better than previous.
Returns
-------
one.alf.spec.QC
The current QC outcome on Alyx.
Example
-------
>>> qc = QC('path/to/session')
>>> qc.update('PASS') # Update current QC field to 'PASS' if not set
"""
assert self.one, 'instance of one should be provided'
if self.one.offline:
self.log.warning('Running on OneOffline instance, unable to update remote QC')
return
outcome = spec.QC.validate(self.outcome if outcome is None else outcome)
assert self.eid, 'Unable to update Alyx; eID not set'
if namespace: # Record in extended qc
self.update_extended_qc({namespace: outcome.name})
details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
current_status = (details['json'] if self.json else details)['qc']
current_status = spec.QC.validate(current_status)
if current_status < outcome or override:
r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
field_name='json', data={'qc': outcome.name}) \
if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid,
data={'qc': outcome.name})
current_status = spec.QC.validate(r['qc'])
assert current_status == outcome, 'Failed to update session QC'
self.log.info(f'QC field successfully updated to {outcome.name} for {self.endpoint[:-1]} '
f'{self.eid}')
self._outcome = current_status
return self.outcome
[docs]
def update_extended_qc(self, data):
"""Update the extended_qc field in Alyx.
Subclasses should chain a call to this.
:param data: a dict of qc tests and their outcomes, typically a value between 0. and 1.
:return: the updated extended_qc field
"""
assert self.eid, 'Unable to update Alyx; eID not set'
assert self.one, 'instance of one should be provided'
if self.one.offline:
self.log.warning('Running on OneOffline instance, unable to update remote QC')
return
# Ensure None instead of NaNs
for k, v in data.items():
if v is not None and not isinstance(v, str):
if isinstance(v, tuple):
data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v)
else:
data[k] = None if np.isnan(v).all() else v
details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
if 'extended_qc' not in details:
extended_qc = details['json']['extended_qc'] or {}
extended_qc.update(data)
extended_qc_dict = {'extended_qc': extended_qc}
out = self.one.alyx.json_field_update(
endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict)
else:
extended_qc = details['extended_qc'] or {}
extended_qc.update(data)
out = self.one.alyx.json_field_update(
endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc)
self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} '
f'{self.eid}')
return out
[docs]
def compute_outcome_from_extended_qc(self) -> str:
"""Return the session outcome computed from aggregating the extended QC."""
details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc']
return self.overall_outcome(v for k, v in extended_qc.items() or {} if k[0] != '_')
[docs]
def sign_off_dict(exp_dec, sign_off_categories=None):
"""
Create sign off dictionary.
Creates a dict containing 'sign off' keys for each device and task protocol in the provided
experiment description.
Parameters
----------
exp_dec : dict
A loaded experiment description file.
sign_off_categories : dict of list
A dictionary of custom JSON keys for a given device in the acquisition description file.
Returns
-------
dict of dict
The sign off dictionary with the main key 'sign_off_checklist' containing keys for each
device and task protocol.
"""
# Note this assumes devices each contain a dict of dicts
# e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},}
sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES
sign_off_keys = set()
for k, v in exp_dec.get('devices', {}).items():
assert isinstance(v, dict) and v
if len(v.keys()) == 1 and next(iter(v.keys())) == k:
if k in sign_off_categories:
for subkey in sign_off_categories[k]:
sign_off_keys.add(f'{k}_{subkey}')
else:
sign_off_keys.add(k)
else:
for kk in v.keys():
if k in sign_off_categories:
for subkey in sign_off_categories[k]:
sign_off_keys.add(f'{k}_{subkey}_{kk}')
else:
sign_off_keys.add(f'{k}_{kk}')
# Add keys for each protocol
for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))):
sign_off_keys.add(f'{v}_{i:02}')
return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))}