"""Video quality control.
This module runs a list of quality control metrics on the camera and extracted video data.
Examples
--------
Run right camera QC, downloading all but video file
>>> qc = CameraQC(eid, 'right', download_data=True, stream=True)
>>> qc.run()
Run left camera QC with session path, update QC field in Alyx
>>> qc = CameraQC(session_path, 'left')
>>> outcome, extended = qc.run(update=True) # Returns outcome of videoQC only
>>> print(f'video QC = {outcome}; overall session QC = {qc.outcome}') # NB difference outcomes
Run only video QC (no timestamp/alignment checks) on 20 frames for the body camera
>>> qc = CameraQC(eid, 'body', n_samples=20)
>>> qc.load_video_data() # Quicker than loading all data
>>> qc.run()
Run specific video QC check and display the plots
>>> qc = CameraQC(eid, 'left')
>>> qc.load_data(download_data=True)
>>> qc.check_position(display=True) # NB: Not all checks make plots
Run the QC for all cameras
>>> qcs = run_all_qc(eid)
>>> qcs['left'].metrics # Dict of checks and outcomes for left camera
"""
import logging
from inspect import getmembers, isfunction
from pathlib import Path
from itertools import chain
import copy
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.patches import Rectangle
from labcams import parse_cam_log
import one.alf.io as alfio
from one.util import filter_datasets
from one.alf.spec import is_session_path
from one.alf.exceptions import ALFObjectNotFound
from iblutil.util import Bunch
from iblutil.numerical import within_ranges
from ibllib.io.extractors.camera import extract_camera_sync, extract_all
from ibllib.io.extractors import ephys_fpga, training_wheel, mesoscope
from ibllib.io.extractors.video_motion import MotionAlignment
from ibllib.io.extractors.base import get_session_extractor_type
from ibllib.io import raw_data_loaders as raw
from ibllib.io.raw_daq_loaders import load_timeline_sync_and_chmap
from ibllib.io.session_params import read_params, get_sync, get_sync_namespace
import brainbox.behavior.wheel as wh
from ibllib.io.video import get_video_meta, get_video_frames_preload, assert_valid_label
from . import base
_log = logging.getLogger(__name__)
[docs]
class CameraQC(base.QC):
"""A class for computing camera QC metrics"""
dstypes = [
'_ibl_experiment.description',
'_iblrig_Camera.frameData', # Replaces the next 3 datasets
'_iblrig_Camera.frame_counter',
'_iblrig_Camera.GPIO',
'_iblrig_Camera.timestamps',
'_iblrig_taskData.raw',
'_iblrig_taskSettings.raw',
'_iblrig_Camera.raw',
'camera.times',
'wheel.position',
'wheel.timestamps'
]
dstypes_fpga = [
'_spikeglx_sync.channels',
'_spikeglx_sync.polarities',
'_spikeglx_sync.times',
'ephysData.raw.meta'
]
"""Recall that for the training rig there is only one side camera at 30 Hz and 1280 x 1024 px.
For the recording rig there are two label cameras (left: 60 Hz, 1280 x 1024 px;
right: 150 Hz, 640 x 512 px) and one body camera (30 Hz, 640 x 512 px). """
video_meta = {
'training': {
'left': {
'fps': 30,
'width': 1280,
'height': 1024
}
},
'ephys': {
'left': {
'fps': 60,
'width': 1280,
'height': 1024
},
'right': {
'fps': 150,
'width': 640,
'height': 512
},
'body': {
'fps': 30,
'width': 640,
'height': 512
},
}
}
def __init__(self, session_path_or_eid, camera, **kwargs):
"""
:param session_path_or_eid: A session id or path
:param camera: The camera to run QC on, if None QC is run for all three cameras
:param n_samples: The number of frames to sample for the position and brightness QC
:param stream: If true and local video files not available, the data are streamed from
the remote source.
:param log: A logging.Logger instance, if None the 'ibllib' logger is used
:param one: An ONE instance for fetching and setting the QC on Alyx
"""
# When an eid is provided, we will download the required data by default (if necessary)
download_data = not is_session_path(session_path_or_eid)
self.download_data = kwargs.pop('download_data', download_data)
self.stream = kwargs.pop('stream', None)
self.n_samples = kwargs.pop('n_samples', 100)
self.sync_collection = kwargs.pop('sync_collection', None)
self.sync = kwargs.pop('sync_type', None)
super().__init__(session_path_or_eid, **kwargs)
# Data
self.label = assert_valid_label(camera)
filename = f'_iblrig_{self.label}Camera.raw*.mp4'
raw_video_path = self.session_path.joinpath('raw_video_data')
self.video_path = next(raw_video_path.glob(filename), None)
# If local video doesn't exist, change video path to URL
if not self.video_path and self.stream is not False and self.one is not None:
try:
self.stream = True
self.video_path = self.one.path2url(raw_video_path / filename.replace('*', ''))
except (StopIteration, ALFObjectNotFound):
_log.error('No remote or local video file found')
self.video_path = None
logging.disable(logging.NOTSET)
keys = ('count', 'pin_state', 'audio', 'fpga_times', 'wheel', 'video',
'frame_samples', 'timestamps', 'camera_times', 'bonsai_times')
self.data = Bunch.fromkeys(keys)
self.frame_samples_idx = None
# QC outcomes map
self.metrics = None
self.outcome = 'NOT_SET'
# Specify any checks to remove
self.checks_to_remove = []
self._type = None
@property
def type(self):
"""
Returns the camera type based on the protocol.
:return: Returns either None, 'ephys' or 'training'
"""
if not self._type:
return
else:
return 'ephys' if 'ephys' in self._type else 'training'
[docs]
def load_data(self, download_data: bool = None, extract_times: bool = False, load_video: bool = True) -> None:
"""Extract the data from raw data files
Extracts all the required task data from the raw data files.
Data keys:
- count (int array): the sequential frame number (n, n+1, n+2...)
- pin_state (): the camera GPIO pin; records the audio TTLs; should be one per frame
- audio (float array): timestamps of audio TTL fronts
- fpga_times (float array): timestamps of camera TTLs recorded by FPGA
- timestamps (float array): extracted video timestamps (the camera.times ALF)
- bonsai_times (datetime array): system timestamps of video PC; should be one per frame
- camera_times (float array): camera frame timestamps extracted from frame headers
- wheel (Bunch): rotary encoder timestamps, position and period used for wheel motion
- video (Bunch): video meta data, including dimensions and FPS
- frame_samples (h x w x n array): array of evenly sampled frames (1 colour channel)
:param download_data: if True, any missing raw data is downloaded via ONE.
Missing data will raise an AssertionError
:param extract_times: if True, the camera.times are re-extracted from the raw data
:param load_video: if True, calls the load_video_data method
"""
assert self.session_path, 'no session path set'
if download_data is not None:
self.download_data = download_data
if self.download_data and self.eid and self.one and not self.one.offline:
self.ensure_required_data()
_log.info('Gathering data for QC')
# Get frame count and pin state
self.data['count'], self.data['pin_state'] = \
raw.load_embedded_frame_data(self.session_path, self.label, raw=True)
# If there is an experiment description and there are video parameters
sess_params = read_params(self.session_path) or {}
task_collection = get_task_collection(sess_params)
ns = get_sync_namespace(sess_params)
self._set_sync(sess_params)
if not self.sync:
if not self.type:
self._type = get_session_extractor_type(self.session_path, task_collection)
self.sync = 'nidq' if 'ephys' in self.type else 'bpod'
self._update_meta_from_session_params(sess_params)
# Load the audio and raw FPGA times
if self.sync != 'bpod' and self.sync is not None:
self.sync_collection = self.sync_collection or 'raw_ephys_data'
ns = ns or 'spikeglx'
if ns == 'spikeglx':
sync, chmap = ephys_fpga.get_sync_and_chn_map(self.session_path, self.sync_collection)
elif ns == 'timeline':
sync, chmap = load_timeline_sync_and_chmap(self.session_path / self.sync_collection)
else:
raise NotImplementedError(f'Unknown namespace "{ns}"')
audio_ttls = ephys_fpga.get_sync_fronts(sync, chmap['audio'])
self.data['audio'] = audio_ttls['times'] # Get rises
# Load raw FPGA times
cam_ts = extract_camera_sync(sync, chmap)
self.data['fpga_times'] = cam_ts[self.label]
else:
self.sync_collection = self.sync_collection or task_collection
bpod_data = raw.load_data(self.session_path, task_collection)
_, audio_ttls = raw.load_bpod_fronts(
self.session_path, data=bpod_data, task_collection=task_collection)
self.data['audio'] = audio_ttls['times']
# Load extracted frame times
alf_path = self.session_path / 'alf'
try:
assert not extract_times
self.data['timestamps'] = alfio.load_object(
alf_path, f'{self.label}Camera', short_keys=True)['times']
except AssertionError: # Re-extract
kwargs = dict(video_path=self.video_path, labels=self.label)
if self.sync != 'bpod' and self.sync is not None:
kwargs = {**kwargs, 'sync': sync, 'chmap': chmap} # noqa
outputs, _ = extract_all(self.session_path, self.sync, save=False,
sync_collection=self.sync_collection, **kwargs)
self.data['timestamps'] = outputs[f'{self.label}_camera_timestamps']
except ALFObjectNotFound:
_log.warning('no camera.times ALF found for session')
# Get audio and wheel data
wheel_keys = ('timestamps', 'position')
try:
# glob in case wheel data are in sub-collections
alf_path = next(alf_path.rglob('*wheel.timestamps*')).parent
self.data['wheel'] = alfio.load_object(alf_path, 'wheel', short_keys=True)
except (StopIteration, ALFObjectNotFound):
# Extract from raw data
if self.sync != 'bpod' and self.sync is not None:
if ns == 'spikeglx':
wheel_data = ephys_fpga.extract_wheel_sync(sync, chmap)
elif ns == 'timeline':
extractor = mesoscope.TimelineTrials(self.session_path, sync_collection=self.sync_collection)
wheel_data = extractor.extract_wheel_sync()
else:
raise NotImplementedError(f'Unknown namespace "{ns}"')
else:
wheel_data = training_wheel.get_wheel_position(
self.session_path, task_collection=task_collection)
self.data['wheel'] = Bunch(zip(wheel_keys, wheel_data))
# Find short period of wheel motion for motion correlation.
if data_for_keys(wheel_keys, self.data['wheel']) and self.data['timestamps'] is not None:
self.data['wheel'].period = self.get_active_wheel_period(self.data['wheel'])
# Load Bonsai frame timestamps
try:
ssv_times = raw.load_camera_ssv_times(self.session_path, self.label)
self.data['bonsai_times'], self.data['camera_times'] = ssv_times
except AssertionError:
_log.warning('No Bonsai video timestamps file found')
# Gather information from video file
if load_video:
_log.info('Inspecting video file...')
self.load_video_data()
[docs]
def load_video_data(self):
# Get basic properties of video
try:
self.data['video'] = get_video_meta(self.video_path, one=self.one)
# Sample some frames from the video file
indices = np.linspace(100, self.data['video'].length - 100, self.n_samples).astype(int)
self.frame_samples_idx = indices
self.data['frame_samples'] = get_video_frames_preload(self.video_path, indices,
mask=np.s_[:, :, 0])
except AssertionError:
_log.error('Failed to read video file; setting outcome to CRITICAL')
self._outcome = 'CRITICAL'
[docs]
@staticmethod
def get_active_wheel_period(wheel, duration_range=(3., 20.), display=False):
"""
Attempts to find a period of movement where the wheel accelerates and decelerates for
the wheel motion alignment QC.
:param wheel: A Bunch of wheel timestamps and position data
:param duration_range: The candidates must be within min/max duration range
:param display: If true, plot the selected wheel movement
:return: 2-element array comprising the start and end times of the active period
"""
pos, ts = wh.interpolate_position(wheel.timestamps, wheel.position)
v, acc = wh.velocity_filtered(pos, 1000)
on, off, *_ = wh.movements(ts, acc, pos_thresh=.1, make_plots=False)
edges = np.c_[on, off]
indices, _ = np.where(np.logical_and(
np.diff(edges) > duration_range[0], np.diff(edges) < duration_range[1]))
if len(indices) == 0:
_log.warning('No period of wheel movement found for motion alignment.')
return None
# Pick movement somewhere in the middle
i = indices[int(indices.size / 2)]
if display:
_, (ax0, ax1) = plt.subplots(2, 1, sharex='all')
mask = np.logical_and(ts > edges[i][0], ts < edges[i][1])
ax0.plot(ts[mask], pos[mask])
ax1.plot(ts[mask], acc[mask])
return edges[i]
[docs]
def ensure_required_data(self):
"""
Ensures the datasets required for QC are local. If the download_data attribute is True,
any missing data are downloaded. If all the data are not present locally at the end of
it an exception is raised. If the stream attribute is True, the video file is not
required to be local, however it must be remotely accessible.
NB: Requires a valid instance of ONE and a valid session eid.
:return:
"""
assert self.one is not None, 'ONE required to download data'
sess_params = {}
if self.download_data:
dset = self.one.list_datasets(self.session_path, '*experiment.description*', details=True)
if self.one._check_filesystem(dset):
sess_params = read_params(self.session_path) or {}
else:
sess_params = read_params(self.session_path) or {}
self._set_sync(sess_params)
# Get extractor type
is_ephys = 'ephys' in (self.type or self.one.get_details(self.eid)['task_protocol'])
self.sync = self.sync or ('nidq' if is_ephys else 'bpod')
is_fpga = 'bpod' not in self.sync
# dataset collections outside this list are ignored (e.g. probe00, raw_passive_data)
collections = (
'alf', '', get_task_collection(sess_params), get_video_collection(sess_params, self.label)
)
dtypes = self.dstypes + self.dstypes_fpga if is_fpga else self.dstypes
assert_unique = True
# Check we have raw ephys data for session
if is_ephys:
if len(self.one.list_datasets(self.eid, collection='raw_ephys_data')) == 0:
# Assert 3A probe model; if so download all probe data
det = self.one.get_details(self.eid, full=True)
probe_model = next(x['model'] for x in det['probe_insertion'])
assert probe_model == '3A', 'raw ephys data missing'
collections += (self.sync_collection or 'raw_ephys_data',)
if sess_params:
probes = sess_params.get('devices', {}).get('neuropixel', {})
probes = set(x.get('collection') for x in chain(*map(dict.values, probes)))
collections += tuple(probes)
else:
collections += ('raw_ephys_data/probe00', 'raw_ephys_data/probe01')
assert_unique = False
else:
# 3B probes have data in root collection
collections += ('raw_ephys_data',)
for dstype in dtypes:
datasets = self.one.type2datasets(self.eid, dstype, details=True)
if 'camera' in dstype.lower(): # Download individual camera file
datasets = filter_datasets(datasets, filename=f'.*{self.label}.*')
else: # Ignore probe datasets, etc.
_datasets = filter_datasets(datasets, collection=collections, assert_unique=assert_unique)
if '' in collections: # Must be handled as a separate query
datasets = filter_datasets(datasets, collection='', assert_unique=assert_unique)
datasets = pd.concat([datasets, _datasets]).sort_index()
else:
datasets = _datasets
if any(x.endswith('.mp4') for x in datasets.rel_path) and self.stream:
names = [x.split('/')[-1] for x in self.one.list_datasets(self.eid, details=False)]
assert f'_iblrig_{self.label}Camera.raw.mp4' in names, 'No remote video file found'
continue
optional = ('camera.times', '_iblrig_Camera.raw', 'wheel.position', 'wheel.timestamps',
'_iblrig_Camera.timestamps', '_iblrig_Camera.frame_counter', '_iblrig_Camera.GPIO',
'_iblrig_Camera.frameData', '_ibl_experiment.description')
present = (
self.one._check_filesystem(datasets)
if self.download_data
else (next(self.session_path.rglob(d), None) for d in datasets['rel_path'])
)
required = (dstype not in optional)
all_present = not datasets.empty and all(present)
assert all_present or not required, f'Dataset {dstype} not found'
if not self.type and self.sync != 'nidq':
self._type = get_session_extractor_type(self.session_path)
def _set_sync(self, session_params=False):
"""Set the sync and sync_collection attributes if not already set.
Also set the type attribute based on the sync. NB The type attribute is for legacy sessions.
Parameters
----------
session_params : dict, bool
The loaded experiment description file. If False, attempts to load it from the session_path.
"""
if session_params is False:
if not self.session_path:
raise ValueError('No session path set')
session_params = read_params(self.session_path)
sync, sync_dict = get_sync(session_params)
self.sync = self.sync or sync
self.sync_collection = self.sync_collection or sync_dict.get('collection')
if self.sync:
self._type = 'ephys' if self.sync == 'nidq' else 'training'
def _update_meta_from_session_params(self, sess_params):
"""
Update the default expected video properties with those defined in the experiment
description file (if any). This updates the `video_meta` property with the fps, width and
height for the type and camera label.
Parameters
----------
sess_params : dict
The loaded experiment.description file.
"""
try:
assert sess_params
video_pars = sess_params.get('devices', {}).get('cameras', {}).get(self.label)
except (AssertionError, KeyError):
return
PROPERTIES = ('width', 'height', 'fps')
video_meta = copy.deepcopy(self.video_meta) # must re-assign as it's a class attribute
if self.type not in video_meta:
video_meta[self.type] = {}
if self.label not in video_meta[self.type]:
video_meta[self.type][self.label] = {}
video_meta[self.type][self.label].update(
**{k: v for k, v in video_pars.items() if k in PROPERTIES}
)
self.video_meta = video_meta
[docs]
def run(self, update: bool = False, **kwargs) -> (str, dict):
"""
Run video QC checks and return outcome
:param update: if True, updates the session QC fields on Alyx
:param download_data: if True, downloads any missing data if required
:param extract_times: if True, re-extracts the camera timestamps from the raw data
:returns: overall outcome as a str, a dict of checks and their outcomes
"""
_log.info(f'Computing QC outcome for {self.label} camera, session {self.eid}')
namespace = f'video{self.label.capitalize()}'
if all(x is None for x in self.data.values()):
self.load_data(**kwargs)
if self.data['frame_samples'] is None or self.data['timestamps'] is None:
return 'NOT_SET', {}
if self.data['timestamps'].shape[0] == 0:
_log.error(f'No timestamps for {self.label} camera; setting outcome to CRITICAL')
return 'CRITICAL', {}
def is_metric(x):
return isfunction(x) and x.__name__.startswith('check_')
# import importlib
# classe = getattr(importlib.import_module(self.__module__), self.__name__)
# print(classe)
checks = getmembers(self.__class__, is_metric)
checks = self.remove_check(checks)
self.metrics = {f'_{namespace}_' + k[6:]: fn(self) for k, fn in checks}
values = [x if isinstance(x, str) else x[0] for x in self.metrics.values()]
code = max(base.CRITERIA[x] for x in values)
outcome = next(k for k, v in base.CRITERIA.items() if v == code)
if update:
extended = {
k: 'NOT_SET' if v is None else v
for k, v in self.metrics.items()
}
self.update_extended_qc(extended)
self.update(outcome, namespace)
return outcome, self.metrics
[docs]
def remove_check(self, checks):
if len(self.checks_to_remove) == 0:
return checks
else:
for check in self.checks_to_remove:
check_names = [ch[0] for ch in checks]
idx = check_names.index(check)
checks.pop(idx)
return checks
[docs]
def check_brightness(self, bounds=(40, 200), max_std=20, roi=True, display=False):
"""Check that the video brightness is within a given range
The mean brightness of each frame must be with the bounds provided, and the standard
deviation across samples frames should be less then the given value. Assumes that the
frame samples are 2D (no colour channels).
:param bounds: For each frame, check that: bounds[0] < M < bounds[1],
where M = mean(frame). If less than 75% of sample frames outside of these bounds, the
outcome is WARNING. If <75% of frames within twice the bounds, the outcome is FAIL.
:param max_std: The standard deviation of the frame luminance means must be less than this
:param roi: If True, check brightness on ROI of frame
:param display: When True the mean frame luminance is plotted against sample frames.
The sample frames with the lowest and highest mean luminance are shown.
"""
if self.data['frame_samples'] is None:
return 'NOT_SET'
if roi is True:
_, h, w = self.data['frame_samples'].shape
if self.label == 'body': # Latter half
roi = (slice(None), slice(None), slice(int(w / 2), None, None))
elif self.label == 'left': # Top left quadrant (~2/3, 1/2 height)
roi = (slice(None), slice(None, int(h / 2), None), slice(None, int(w * .66), None))
else: # Top right quadrant (~2/3 width, 1/2 height)
roi = (slice(None), slice(None, int(h / 2), None), slice(int(w * .66), None, None))
else:
roi = (slice(None), slice(None), slice(None))
brightness = self.data['frame_samples'][roi].mean(axis=(1, 2))
# dims = self.data['frame_samples'].shape
# brightness /= np.array((*dims[1:], 255)).prod() # Normalize
if display:
f = plt.figure()
gs = f.add_gridspec(2, 3)
indices = self.frame_samples_idx
# Plot mean frame luminance
ax = f.add_subplot(gs[:2, :2])
plt.plot(indices, brightness, label='brightness')
ax.set(
xlabel='frame #',
ylabel='brightness (mean pixel)',
title='Brightness')
ax.hlines(bounds, 0, indices[-1],
colors='tab:orange', linestyles=':', label='warning bounds')
ax.hlines((bounds[0] / 2, bounds[1] * 2), 0, indices[-1],
colors='r', linestyles=':', label='failure bounds')
ax.legend()
# Plot min-max frames
for i, idx in enumerate((np.argmax(brightness), np.argmin(brightness))):
a = f.add_subplot(gs[i, 2])
ax.annotate('*', (indices[idx], brightness[idx]), # this is the point to label
textcoords='offset points', xytext=(0, 1), ha='center')
frame = self.data['frame_samples'][idx][roi[1:]]
title = ('min' if i else 'max') + ' mean luminance = %.2f' % brightness[idx]
self.imshow(frame, ax=a, title=title)
PCT_PASS = .75 # Proportion of sample frames that must pass
# Warning if brightness not within range (3/4 of frames must be between bounds)
warn_range = np.logical_and(brightness > bounds[0], brightness < bounds[1])
warn_range = 'PASS' if sum(warn_range) / self.n_samples > PCT_PASS else 'WARNING'
# Fail if brightness not within twice the range or std less than threshold
fail_range = np.logical_and(brightness > bounds[0] / 2, brightness < bounds[1] * 2)
within_range = sum(fail_range) / self.n_samples > PCT_PASS
fail_range = 'PASS' if within_range and np.std(brightness) < max_std else 'FAIL'
return self.overall_outcome([warn_range, fail_range])
[docs]
def check_framerate(self, threshold=1.):
"""Check camera times match specified frame rate for camera
:param threshold: The maximum absolute difference between timestamp sample rate and video
frame rate. NB: Does not take into account dropped frames.
"""
if any(x is None for x in (self.data['timestamps'], self.video_meta)):
return 'NOT_SET'
fps = self.video_meta[self.type][self.label]['fps']
Fs = 1 / np.median(np.diff(self.data['timestamps'])) # Approx. frequency of camera
return 'PASS' if abs(Fs - fps) < threshold else 'FAIL', float(round(Fs, 3))
[docs]
def check_pin_state(self, display=False):
"""Check the pin state reflects Bpod TTLs"""
if not data_for_keys(('video', 'pin_state', 'audio'), self.data):
return 'NOT_SET'
size_diff = int(self.data['pin_state'].shape[0] - self.data['video']['length'])
# NB: The pin state can be high for 2 consecutive frames
low2high = np.insert(np.diff(self.data['pin_state'][:, -1].astype(int)) == 1, 0, False)
# NB: Time between two consecutive TTLs can be sub-frame, so this will fail
ndiff_low2high = int(self.data['audio'][::2].size - sum(low2high))
# state_ttl_matches = ndiff_low2high == 0
# Check within ms of audio times
if display:
plt.Figure()
plt.plot(self.data['timestamps'][low2high], np.zeros(sum(low2high)), 'o',
label='GPIO Low -> High')
plt.plot(self.data['audio'], np.zeros(self.data['audio'].size), 'rx',
label='Audio TTL High')
plt.xlabel('FPGA frame times / s')
plt.gca().set(yticklabels=[])
plt.gca().tick_params(left=False)
plt.legend()
outcome = self.overall_outcome(
('PASS' if size_diff == 0 else 'WARNING' if np.abs(size_diff) < 5 else 'FAIL',
'PASS' if np.abs(ndiff_low2high) < 5 else 'WARNING')
)
return outcome, ndiff_low2high, size_diff
[docs]
def check_dropped_frames(self, threshold=.1):
"""Check how many frames were reported missing
:param threshold: The maximum allowable percentage of dropped frames
"""
if not data_for_keys(('video', 'count'), self.data):
return 'NOT_SET'
size_diff = int(self.data['count'].size - self.data['video']['length'])
strict_increase = np.all(np.diff(self.data['count']) > 0)
if not strict_increase:
n_effected = np.sum(np.invert(strict_increase))
_log.info(f'frame count not strictly increasing: '
f'{n_effected} frames effected ({n_effected / strict_increase.size:.2%})')
return 'CRITICAL'
dropped = np.diff(self.data['count']).astype(int) - 1
pct_dropped = (sum(dropped) / len(dropped) * 100)
# Calculate overall outcome for this check
outcome = self.overall_outcome(
('PASS' if size_diff == 0 else 'WARNING' if np.abs(size_diff) < 5 else 'FAIL',
'PASS' if pct_dropped < threshold else 'FAIL')
)
return outcome, int(sum(dropped)), size_diff
[docs]
def check_timestamps(self):
"""Check that the camera.times array is reasonable"""
if not data_for_keys(('timestamps', 'video'), self.data):
return 'NOT_SET'
# Check number of timestamps matches video
length_matches = self.data['timestamps'].size == self.data['video'].length
# Check times are strictly increasing
increasing = all(np.diff(self.data['timestamps']) > 0)
# Check times do not contain nans
nanless = not np.isnan(self.data['timestamps']).any()
return 'PASS' if increasing and length_matches and nanless else 'FAIL'
[docs]
def check_camera_times(self):
"""Check that the number of raw camera timestamps matches the number of video frames"""
if not data_for_keys(('bonsai_times', 'video'), self.data):
return 'NOT_SET'
length_match = len(self.data['camera_times']) == self.data['video'].length
outcome = 'PASS' if length_match else 'WARNING'
# 1 / np.median(np.diff(self.data.camera_times))
return outcome, len(self.data['camera_times']) - self.data['video'].length
[docs]
def check_resolution(self):
"""Check that the timestamps and video file resolution match what we expect"""
if self.data['video'] is None:
return 'NOT_SET'
actual = self.data['video']
expected = self.video_meta[self.type][self.label]
match = actual['width'] == expected['width'] and actual['height'] == expected['height']
return 'PASS' if match else 'FAIL'
[docs]
def check_wheel_alignment(self, tolerance=(1, 2), display=False):
"""Check wheel motion in video correlates with the rotary encoder signal
Check is skipped for body camera videos as the wheel is often obstructed
Parameters
----------
tolerance : int, (int, int)
Maximum absolute offset in frames. If two values, the maximum value is taken as the
warning threshold.
display : bool
If true, the wheel motion energy is plotted against the rotary encoder.
Returns
-------
str
The outcome string, one of {'NOT_SET', 'FAIL', 'WARNING', 'PASS'}.
int
Frame offset, i.e. by how many frames the video was shifted to match the rotary encoder
signal. Negative values mean the video was shifted backwards with respect to the wheel
timestamps.
Notes
-----
- A negative frame offset typically means that there were frame TTLs at the beginning that
do not correspond to any video frames (sometimes the first few frames aren't saved to
disk). Since 2021-09-15 the extractor should compensate for this.
"""
wheel_present = data_for_keys(('position', 'timestamps', 'period'), self.data['wheel'])
if not wheel_present or self.label == 'body':
return 'NOT_SET'
# Check the selected wheel movement period occurred within camera timestamp time
camera_times = self.data['timestamps']
in_range = within_ranges(camera_times, self.data['wheel']['period'].reshape(-1, 2))
if not in_range.any():
# Check if any camera timestamps overlap with the wheel times
if np.any(np.logical_and(
camera_times > self.data['wheel']['timestamps'][0],
camera_times < self.data['wheel']['timestamps'][-1])
):
_log.warning('Unable to check wheel alignment: '
'chosen movement is not during video')
return 'NOT_SET'
else:
# No overlap, return fail
return 'FAIL'
aln = MotionAlignment(self.eid, self.one, self.log, session_path=self.session_path)
aln.data = self.data.copy()
aln.data['camera_times'] = {self.label: camera_times}
aln.video_paths = {self.label: self.video_path}
offset, *_ = aln.align_motion(period=self.data['wheel'].period,
display=display, side=self.label)
if offset is None:
return 'NOT_SET'
if display:
aln.plot_alignment()
# Determine the outcome. If there are two values for the tolerance, one is taken to be
# a warning threshold, the other a failure threshold.
out_map = {0: 'WARNING', 1: 'WARNING', 2: 'PASS'} # 0: FAIL -> WARNING Aug 2022
passed = np.abs(offset) <= np.sort(np.array(tolerance))
return out_map[sum(passed)], int(offset)
[docs]
def check_position(self, hist_thresh=(75, 80), pos_thresh=(10, 15),
metric=cv2.TM_CCOEFF_NORMED,
display=False, test=False, roi=None, pct_thresh=True):
"""Check camera is positioned correctly
For the template matching zero-normalized cross-correlation (default) should be more
robust to exposure (which we're not checking here). The L2 norm (TM_SQDIFF) should
also work.
If display is True, the template ROI (pick hashed) is plotted over a video frame,
along with the threshold regions (green solid). The histogram correlations are plotted
and the full histogram is plotted for one of the sample frames and the reference frame.
:param hist_thresh: The minimum histogram cross-correlation threshold to pass (0-1).
:param pos_thresh: The maximum number of pixels off that the template matcher may be off
by. If two values are provided, the lower threshold is treated as a warning boundary.
:param metric: The metric to use for template matching.
:param display: If true, the results are plotted
:param test: If true a reference frame instead of the frames in frame_samples.
:param roi: A tuple of indices for the face template in the for ((y1, y2), (x1, x2))
:param pct_thresh: If true, the thresholds are treated as percentages
"""
if not test and self.data['frame_samples'] is None:
return 'NOT_SET'
refs = self.load_reference_frames(self.label)
# ensure iterable
pos_thresh = np.sort(np.array(pos_thresh))
hist_thresh = np.sort(np.array(hist_thresh))
# Method 1: compareHist
#### Mean hist comparison
# ref_h = [cv2.calcHist([x], [0], None, [256], [0, 256]) for x in refs]
# ref_h = np.array(ref_h).mean(axis=0)
# frames = refs if test else self.data['frame_samples']
# hists = [cv2.calcHist([x], [0], None, [256], [0, 256]) for x in frames]
# test_h = np.array(hists).mean(axis=0)
# corr = cv2.compareHist(test_h, ref_h, cv2.HISTCMP_CORREL)
# if pct_thresh:
# corr *= 100
# hist_passed = corr > hist_thresh
####
ref_h = cv2.calcHist([refs[0]], [0], None, [256], [0, 256])
frames = refs if test else self.data['frame_samples']
hists = [cv2.calcHist([x], [0], None, [256], [0, 256]) for x in frames]
corr = np.array([cv2.compareHist(test_h, ref_h, cv2.HISTCMP_CORREL) for test_h in hists])
if pct_thresh:
corr *= 100
hist_passed = [np.all(corr > x) for x in hist_thresh]
# Method 2:
top_left, roi, template = self.find_face(roi=roi, test=test, metric=metric, refs=refs)
(y1, y2), (x1, x2) = roi
err = (x1, y1) - np.median(np.array(top_left), axis=0)
h, w = frames[0].shape[:2]
if pct_thresh: # Threshold as percent
# t_x, t_y = pct_thresh
err_pct = [(abs(x) / y) * 100 for x, y in zip(err, (h, w))]
face_passed = [all(err_pct < x) for x in pos_thresh]
else:
face_passed = [np.all(np.abs(err) < x) for x in pos_thresh]
if display:
plt.figure()
# Plot frame with template overlay
img = frames[0]
ax0 = plt.subplot(221)
ax0.imshow(img, cmap='gray', vmin=0, vmax=255)
bounds = (x1 - err[0], x2 - err[0], y2 - err[1], y1 - err[1])
ax0.imshow(template, cmap='gray', alpha=0.5, extent=bounds)
if pct_thresh:
for c, thresh in zip(('green', 'yellow'), pos_thresh):
t_y = (h / 100) * thresh
t_x = (w / 100) * thresh
xy = (x1 - t_x, y1 - t_y)
ax0.add_patch(Rectangle(xy, x2 - x1 + (t_x * 2), y2 - y1 + (t_y * 2),
fill=True, facecolor=c, lw=0, alpha=0.05))
else:
for c, thresh in zip(('green', 'yellow'), pos_thresh):
xy = (x1 - thresh, y1 - thresh)
ax0.add_patch(Rectangle(xy, x2 - x1 + (thresh * 2), y2 - y1 + (thresh * 2),
fill=True, facecolor=c, lw=0, alpha=0.05))
xy = (x1 - err[0], y1 - err[1])
ax0.add_patch(Rectangle(xy, x2 - x1, y2 - y1,
edgecolor='pink', fill=False, hatch='//', lw=1))
ax0.set(xlim=(0, img.shape[1]), ylim=(img.shape[0], 0))
ax0.set_axis_off()
# Plot the image histograms
ax1 = plt.subplot(212)
ax1.plot(ref_h[5:-1], label='reference frame')
ax1.plot(np.array(hists).mean(axis=0)[5:-1], label='mean frame')
ax1.set_xlim([0, 256])
plt.legend()
# Plot the correlations for each sample frame
ax2 = plt.subplot(222)
ax2.plot(corr, label='hist correlation')
ax2.axhline(hist_thresh[0], 0, self.n_samples,
linestyle=':', color='r', label='fail threshold')
ax2.axhline(hist_thresh[1], 0, self.n_samples,
linestyle=':', color='g', label='pass threshold')
ax2.set(xlabel='Sample Frame #', ylabel='Hist correlation')
plt.legend()
plt.suptitle('Check position')
plt.show()
pass_map = {i: s for i, s in enumerate(('FAIL', 'WARNING', 'PASS'))}
face_aligned = pass_map[sum(face_passed)]
hist_correlates = pass_map[sum(hist_passed)]
return self.overall_outcome([face_aligned, hist_correlates], agg=min)
[docs]
def check_focus(self, n=20, threshold=(100, 6),
roi=False, display=False, test=False, equalize=True):
"""Check video is in focus
Two methods are used here: Looking at the high frequencies with a DFT and
applying a Laplacian HPF and looking at the variance.
Note:
- Both methods are sensitive to noise (Laplacian is 2nd order filter).
- The thresholds for the fft may need to be different for the left/right vs body as
the distribution of frequencies in the image is different (e.g. the holder
comprises mostly very high frequencies).
- The image may be overall in focus but the places we care about can still be out of
focus (namely the face). For this we'll take an ROI around the face.
- Focus check thrown off by brightness. This may be fixed by equalizing the histogram
(set equalize=True)
:param n: number of frames from frame_samples data to use in check.
:param threshold: the lower boundary for Laplacian variance and mean FFT filtered
brightness, respectively
:param roi: if False, the roi is determined via template matching for the face or body.
If None, some set ROIs for face and paws are used. A list of slices may also be passed.
:param display: if true, the results are displayed
:param test: if true, a set of artificially blurred reference frames are used as the
input. This can be used to selecting reasonable thresholds.
:param equalize: if true, the histograms of the frames are equalized, resulting in an
increased the global contrast and linear CDF. This makes check robust to low light
conditions.
"""
no_frames = self.data['frame_samples'] is None or len(self.data['frame_samples']) == 0
if not test and no_frames:
return 'NOT_SET'
if roi is False:
top_left, roi, _ = self.find_face(test=test) # (y1, y2), (x1, x2)
h, w = map(lambda x: np.diff(x).item(), roi)
x, y = np.median(np.array(top_left), axis=0).round().astype(int)
roi = (np.s_[y: y + h, x: x + w],)
else:
ROI = {
'left': (np.s_[:400, :561], np.s_[500:, 100:800]), # (face, wheel)
'right': (np.s_[:196, 397:], np.s_[221:, 255:]),
'body': (np.s_[143:274, 84:433],) # body holder
}
roi = roi or ROI[self.label]
if test:
"""In test mode load a reference frame and run it through a normalized box filter with
increasing kernel size.
"""
idx = (0,)
ref = self.load_reference_frames(self.label)[idx]
kernal_sz = np.unique(np.linspace(0, 15, n, dtype=int))
n = kernal_sz.size # Size excluding repeated kernels
img = np.empty((n, *ref.shape), dtype=np.uint8)
for i, k in enumerate(kernal_sz):
img[i] = ref.copy() if k == 0 else cv2.blur(ref, (k, k))
if equalize:
[cv2.equalizeHist(x, x) for x in img]
if display:
# Plot blurred images
f, axes = plt.subplots(1, len(kernal_sz))
for ax, ig, k in zip(axes, img, kernal_sz):
self.imshow(ig, ax=ax, title='Kernal ({0}, {0})'.format(k or 'None'))
f.suptitle('Reference frame with box filter')
else:
# Sub-sample the frame samples
idx = np.unique(np.linspace(0, len(self.data['frame_samples']) - 1, n, dtype=int))
img = self.data['frame_samples'][idx]
if equalize:
[cv2.equalizeHist(x, x) for x in img]
# A measure of the sharpness effectively taking the second derivative of the image
lpc_var = np.empty((min(n, len(img)), len(roi)))
for i, frame in enumerate(img[::-1]):
lpc = cv2.Laplacian(frame, cv2.CV_16S, ksize=1)
lpc_var[i] = [lpc[mask].var() for mask in roi]
if display:
# Plot the first sample image
f = plt.figure()
gs = f.add_gridspec(len(roi) + 1, 3)
f.add_subplot(gs[0:len(roi), 0])
frame = img[0]
self.imshow(frame, title=f'Frame #{self.frame_samples_idx[idx[0]]}')
# Plot the ROIs with and without filter
lpc = cv2.Laplacian(frame, cv2.CV_16S, ksize=1)
abs_lpc = cv2.convertScaleAbs(lpc)
for i, r in enumerate(roi):
f.add_subplot(gs[i, 1])
self.imshow(frame[r], title=f'ROI #{i + 1}')
f.add_subplot(gs[i, 2])
self.imshow(abs_lpc[r], title=f'ROI #{i + 1} - Lapacian filter')
f.suptitle('Laplacian blur detection')
# Plot variance over frames
ax = f.add_subplot(gs[len(roi), :])
ln = plt.plot(lpc_var)
[l.set_label(f'ROI #{i + 1}') for i, l in enumerate(ln)]
ax.axhline(threshold[0], 0, n, linestyle=':', color='r', label='lower threshold')
ax.set(xlabel='Frame sample', ylabel='Variance of the Laplacian')
plt.tight_layout()
plt.legend()
# Second test is to highpass with dft
h, w = img.shape[1:]
cX, cY = w // 2, h // 2
sz = 60 # Seems to be the magic number for high pass
mask = np.ones((h, w, 2), bool)
mask[cY - sz:cY + sz, cX - sz:cX + sz] = False
filt_mean = np.empty(len(img))
for i, frame in enumerate(img[::-1]):
dft = cv2.dft(np.float32(frame), flags=cv2.DFT_COMPLEX_OUTPUT)
f_shift = np.fft.fftshift(dft) * mask # Shift & remove low frequencies
f_ishift = np.fft.ifftshift(f_shift) # Shift back
filt_frame = cv2.idft(f_ishift) # Reconstruct
filt_frame = cv2.magnitude(filt_frame[..., 0], filt_frame[..., 1])
# Re-normalize to 8-bits to make threshold simpler
img_back = cv2.normalize(filt_frame, None, alpha=0, beta=256,
norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
filt_mean[i] = np.mean(img_back)
if i == len(img) - 1 and display:
# Plot Fourier transforms
f = plt.figure()
gs = f.add_gridspec(2, 3)
self.imshow(img[0], ax=f.add_subplot(gs[0, 0]), title='Original frame')
dft_shift = np.fft.fftshift(dft)
magnitude = 20 * np.log(cv2.magnitude(dft_shift[..., 0], dft_shift[..., 1]))
self.imshow(magnitude, ax=f.add_subplot(gs[0, 1]), title='Magnitude spectrum')
self.imshow(img_back, ax=f.add_subplot(gs[0, 2]), title='Filtered frame')
ax = f.add_subplot(gs[1, :])
ax.plot(filt_mean)
ax.axhline(threshold[1], 0, n, linestyle=':', color='r', label='lower threshold')
ax.set(xlabel='Frame sample', ylabel='Mean of filtered frame')
f.suptitle('Discrete Fourier Transform')
plt.show()
passes = np.all(lpc_var > threshold[0]) or np.all(filt_mean > threshold[1])
return 'PASS' if passes else 'FAIL'
[docs]
def find_face(self, roi=None, test=False, metric=cv2.TM_CCOEFF_NORMED, refs=None):
"""Use template matching to find face location in frame
For the template matching zero-normalized cross-correlation (default) should be more
robust to exposure (which we're not checking here). The L2 norm (TM_SQDIFF) should
also work. That said, normalizing the histograms works best.
:param roi: A tuple of indices for the face template in the for ((y1, y2), (x1, x2))
:param test: If True the template is matched against frames that come from the same session
:param metric: The metric to use for template matching
:param refs: An array of frames to match the template to
:returns: (y1, y2), (x1, x2)
"""
ROI = {
'left': ((45, 346), (138, 501)),
'right': ((14, 174), (430, 618)),
'body': ((141, 272), (90, 339))
}
roi = roi or ROI[self.label]
refs = self.load_reference_frames(self.label) if refs is None else refs
frames = refs if test else self.data['frame_samples']
template = refs[0][tuple(slice(*r) for r in roi)]
top_left = [] # [(x1, y1), ...]
for frame in frames:
res = cv2.matchTemplate(frame, template, metric)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
# If the method is TM_SQDIFF or TM_SQDIFF_NORMED, take minimum
top_left.append(min_loc if metric < 2 else max_loc)
# bottom_right = (top_left[0] + w, top_left[1] + h)
return top_left, roi, template
[docs]
@staticmethod
def load_reference_frames(side):
"""Load some reference frames for a given video
The reference frames are from sessions where the camera was well positioned. The
frames are in qc/reference, one file per camera, only one channel per frame. The
session eids can be found in qc/reference/frame_src.json
:param side: Video label, e.g. 'left'
:return: numpy array of frames with the shape (n, h, w)
"""
file = next(Path(__file__).parent.joinpath('reference').glob(f'frames_{side}.npy'))
refs = np.load(file)
return refs
[docs]
@staticmethod
def imshow(frame, ax=None, title=None, **kwargs):
"""plt.imshow with some convenient defaults for greyscale frames"""
h = ax or plt.gca()
defaults = {
'cmap': kwargs.pop('cmap', 'gray'),
'vmin': kwargs.pop('vmin', 0),
'vmax': kwargs.pop('vmax', 255)
}
h.imshow(frame, **defaults, **kwargs)
h.set(title=title)
h.set_axis_off()
return ax
[docs]
class CameraQCCamlog(CameraQC):
"""A class for computing camera QC metrics from camlog data. For this QC we expect the check_pin_state to be NOT_SET as we are
not using the GPIO for timestamp alignment"""
dstypes = [
'_iblrig_taskData.raw',
'_iblrig_taskSettings.raw',
'_iblrig_Camera.raw',
'camera.times',
'wheel.position',
'wheel.timestamps'
]
dstypes_fpga = [
'_spikeglx_sync.channels',
'_spikeglx_sync.polarities',
'_spikeglx_sync.times',
'DAQData.raw.meta',
'DAQData.wiring'
]
def __init__(self, session_path_or_eid, camera, sync_collection='raw_sync_data', sync_type='nidq', **kwargs):
super().__init__(session_path_or_eid, camera, sync_collection=sync_collection, sync_type=sync_type, **kwargs)
self._type = 'ephys'
self.checks_to_remove = ['check_pin_state']
[docs]
def load_data(self, download_data: bool = None,
extract_times: bool = False, load_video: bool = True, **kwargs) -> None:
"""Extract the data from raw data files
Extracts all the required task data from the raw data files.
Data keys:
- count (int array): the sequential frame number (n, n+1, n+2...)
- pin_state (): the camera GPIO pin; records the audio TTLs; should be one per frame
- audio (float array): timestamps of audio TTL fronts
- fpga_times (float array): timestamps of camera TTLs recorded by FPGA
- timestamps (float array): extracted video timestamps (the camera.times ALF)
- bonsai_times (datetime array): system timestamps of video PC; should be one per frame
- camera_times (float array): camera frame timestamps extracted from frame headers
- wheel (Bunch): rotary encoder timestamps, position and period used for wheel motion
- video (Bunch): video meta data, including dimensions and FPS
- frame_samples (h x w x n array): array of evenly sampled frames (1 colour channel)
:param download_data: if True, any missing raw data is downloaded via ONE.
Missing data will raise an AssertionError
:param extract_times: if True, the camera.times are re-extracted from the raw data
:param load_video: if True, calls the load_video_data method
"""
assert self.session_path, 'no session path set'
if download_data is not None:
self.download_data = download_data
if self.download_data and self.eid and self.one and not self.one.offline:
self.ensure_required_data()
_log.info('Gathering data for QC')
# If there is an experiment description and there are video parameters
sess_params = read_params(self.session_path) or {}
video_collection = get_video_collection(sess_params, self.label)
task_collection = get_task_collection(sess_params)
self._set_sync(sess_params)
self._update_meta_from_session_params(sess_params)
# Get frame count
log, _ = parse_cam_log(self.session_path.joinpath(video_collection, f'_iblrig_{self.label}Camera.raw.camlog'))
self.data['count'] = log.frame_id.values
# Load the audio and raw FPGA times
if self.sync != 'bpod' and self.sync is not None:
sync, chmap = ephys_fpga.get_sync_and_chn_map(self.session_path, self.sync_collection)
audio_ttls = ephys_fpga.get_sync_fronts(sync, chmap['audio'])
self.data['audio'] = audio_ttls['times'] # Get rises
# Load raw FPGA times
cam_ts = extract_camera_sync(sync, chmap)
self.data['fpga_times'] = cam_ts[self.label]
else:
bpod_data = raw.load_data(self.session_path, task_collection=task_collection)
_, audio_ttls = raw.load_bpod_fronts(self.session_path, data=bpod_data, task_collection=task_collection)
self.data['audio'] = audio_ttls['times']
# Load extracted frame times
alf_path = self.session_path / 'alf'
try:
assert not extract_times
self.data['timestamps'] = alfio.load_object(
alf_path, f'{self.label}Camera', short_keys=True)['times']
except AssertionError: # Re-extract
kwargs = dict(video_path=self.video_path, labels=self.label)
if self.sync == 'bpod':
kwargs = {**kwargs, 'task_collection': task_collection}
else:
kwargs = {**kwargs, 'sync': sync, 'chmap': chmap} # noqa
outputs, _ = extract_all(self.session_path, self.sync, save=False, camlog=True, **kwargs)
self.data['timestamps'] = outputs[f'{self.label}_camera_timestamps']
except ALFObjectNotFound:
_log.warning('no camera.times ALF found for session')
# Get audio and wheel data
wheel_keys = ('timestamps', 'position')
try:
# glob in case wheel data are in sub-collections
alf_path = next(alf_path.rglob('*wheel.timestamps*')).parent
self.data['wheel'] = alfio.load_object(alf_path, 'wheel', short_keys=True)
except ALFObjectNotFound:
# Extract from raw data
if self.sync != 'bpod':
wheel_data = ephys_fpga.extract_wheel_sync(sync, chmap)
else:
wheel_data = training_wheel.get_wheel_position(self.session_path, task_collection=task_collection)
self.data['wheel'] = Bunch(zip(wheel_keys, wheel_data))
# Find short period of wheel motion for motion correlation.
if data_for_keys(wheel_keys, self.data['wheel']) and self.data['timestamps'] is not None:
self.data['wheel'].period = self.get_active_wheel_period(self.data['wheel'])
# load in camera times
self.data['camera_times'] = log.timestamp.values
# Gather information from video file
if load_video:
_log.info('Inspecting video file...')
self.load_video_data()
[docs]
def ensure_required_data(self):
"""
Ensures the datasets required for QC are local. If the download_data attribute is True,
any missing data are downloaded. If all the data are not present locally at the end of
it an exception is raised. If the stream attribute is True, the video file is not
required to be local, however it must be remotely accessible.
NB: Requires a valid instance of ONE and a valid session eid.
:return:
"""
assert self.one is not None, 'ONE required to download data'
sess_params = {}
if self.download_data:
dset = self.one.list_datasets(self.session_path, '*experiment.description*', details=True)
if self.one._check_filesystem(dset):
sess_params = read_params(self.session_path) or {}
else:
sess_params = read_params(self.session_path) or {}
self._set_sync(sess_params)
# dataset collections outside this list are ignored (e.g. probe00, raw_passive_data)
collections = (
'alf', self.sync_collection, get_task_collection(sess_params),
get_video_collection(sess_params, self.label))
# Get extractor type
dtypes = self.dstypes + self.dstypes_fpga
assert_unique = True
for dstype in dtypes:
datasets = self.one.type2datasets(self.eid, dstype, details=True)
if 'camera' in dstype.lower(): # Download individual camera file
datasets = filter_datasets(datasets, filename=f'.*{self.label}.*')
else: # Ignore probe datasets, etc.
datasets = filter_datasets(datasets, collection=collections,
assert_unique=assert_unique)
if any(x.endswith('.mp4') for x in datasets.rel_path) and self.stream:
names = [x.split('/')[-1] for x in self.one.list_datasets(self.eid, details=False)]
assert f'_iblrig_{self.label}Camera.raw.mp4' in names, 'No remote video file found'
continue
optional = ('camera.times', '_iblrig_Camera.raw', 'wheel.position', 'wheel.timestamps')
present = (
self.one._check_filesystem(datasets)
if self.download_data
else (next(self.session_path.rglob(d), None) for d in datasets['rel_path'])
)
required = (dstype not in optional)
all_present = not datasets.empty and all(present)
assert all_present or not required, f'Dataset {dstype} not found'
[docs]
def check_camera_times(self):
"""Check that the number of raw camera timestamps matches the number of video frames"""
if not data_for_keys(('camera_times', 'video'), self.data):
return 'NOT_SET'
length_match = len(self.data['camera_times']) == self.data['video'].length
outcome = 'PASS' if length_match else 'WARNING'
# 1 / np.median(np.diff(self.data.camera_times))
return outcome, len(self.data['camera_times']) - self.data['video'].length
[docs]
def data_for_keys(keys, data):
"""Check keys exist in 'data' dict and contain values other than None"""
return data is not None and all(k in data and data.get(k, None) is not None for k in keys)
[docs]
def get_task_collection(sess_params):
"""
Returns the first task collection from the experiment description whose task name does not
contain 'passive', otherwise returns 'raw_behavior_data'.
Parameters
----------
sess_params : dict
The loaded experiment description file.
Returns
-------
str:
The collection presumed to contain wheel data.
"""
sess_params = sess_params or {}
tasks = (chain(*map(dict.items, sess_params.get('tasks', []))))
return next((v['collection'] for k, v in tasks if 'passive' not in k), 'raw_behavior_data')
[docs]
def get_video_collection(sess_params, label):
"""
Returns the collection containing the raw video data for a given camera.
Parameters
----------
sess_params : dict
The loaded experiment description file.
label : str
The camera label.
Returns
-------
str:
The collection presumed to contain the video data.
"""
DEFAULT = 'raw_video_data'
value = sess_params or {}
for key in ('devices', 'cameras', label, 'collection'):
value = value.get(key)
if not value:
return DEFAULT
return value
[docs]
def run_all_qc(session, cameras=('left', 'right', 'body'), **kwargs):
"""Run QC for all cameras
Run the camera QC for left, right and body cameras.
:param session: A session path or eid.
:param update: If True, QC fields are updated on Alyx.
:param cameras: A list of camera names to perform QC on.
:param stream: If true and local video files not available, the data are streamed from
the remote source.
:return: dict of CameraCQ objects
"""
qc = {}
camlog = kwargs.pop('camlog', False)
CamQC = CameraQCCamlog if camlog else CameraQC
run_args = {k: kwargs.pop(k) for k in ('download_data', 'extract_times', 'update')
if k in kwargs.keys()}
for camera in cameras:
qc[camera] = CamQC(session, camera, **kwargs)
qc[camera].run(**run_args)
return qc