""" Camera extractor functions.
This module handles extraction of camera timestamps for both Bpod and DAQ.
"""
import logging
import cv2
import numpy as np
import matplotlib.pyplot as plt
from iblutil.util import range_str
import ibldsp.utils as dsp
from ibllib.plots import squares, vertical_lines
from ibllib.io.video import assert_valid_label, VideoStreamer
from iblutil.numerical import within_ranges
from ibllib.io.extractors.ephys_fpga import get_sync_fronts
import ibllib.io.raw_data_loaders as raw
import ibllib.io.extractors.video_motion as vmotion
from ibllib.io.extractors.base import (
BaseBpodTrialsExtractor,
BaseExtractor,
)
_logger = logging.getLogger(__name__)
[docs]
def get_video_length(video_path):
"""
Returns video length.
:param video_path: A path to the video
:return:
"""
is_url = isinstance(video_path, str) and video_path.startswith('http')
cap = VideoStreamer(video_path).cap if is_url else cv2.VideoCapture(str(video_path))
assert cap.isOpened(), f'Failed to open video file {video_path}'
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return length
[docs]
class CameraTimestampsFPGA(BaseExtractor):
"""Extractor for videos using DAQ sync and channel map."""
def __init__(self, label, session_path=None):
super().__init__(session_path)
self.label = assert_valid_label(label)
self.save_names = f'_ibl_{label}Camera.times.npy'
self.var_names = f'{label}_camera_timestamps'
self._log_level = _logger.level
_logger.setLevel(logging.DEBUG)
def __del__(self):
_logger.setLevel(self._log_level)
def _extract(self, sync=None, chmap=None, video_path=None, sync_label='audio',
display=False, extrapolate_missing=True, **kwargs):
"""
The raw timestamps are taken from the DAQ. These are the times of the camera's frame TTLs.
If the pin state file exists, these timestamps are aligned to the video frames using
task TTLs (typically the audio TTLs). Frames missing from the embedded frame count are
removed from the timestamps array.
If the pin state file does not exist, the left and right camera timestamps may be aligned
using the wheel data.
Parameters
----------
sync : dict
Dictionary 'times', 'polarities' of fronts detected on sync trace.
chmap : dict
Dictionary containing channel indices. Default to constant.
video_path : str, pathlib.Path, int
An optional path for fetching the number of frames. If None, the video is loaded from
the session path. If an int is provided this is taken to be the total number of frames.
sync_label : str
The sync label of the channel that's wired to the GPIO for synchronising the times.
display : bool
If true, the TTL and GPIO fronts are plotted.
extrapolate_missing : bool
If true, any missing timestamps at the beginning and end of the session are
extrapolated based on the median frame rate, otherwise they will be NaNs.
**kwargs
Extra keyword arguments (unused).
Returns
-------
numpy.array
The extracted camera timestamps.
"""
fpga_times = extract_camera_sync(sync=sync, chmap=chmap)
count, (*_, gpio) = raw.load_embedded_frame_data(self.session_path, self.label)
raw_ts = fpga_times[self.label]
if video_path is None:
filename = f'_iblrig_{self.label}Camera.raw.mp4'
video_path = self.session_path.joinpath('raw_video_data', filename)
# Permit the video path to be the length for development and debugging purposes
length = (video_path if isinstance(video_path, int) else get_video_length(video_path))
_logger.debug(f'Number of video frames = {length}')
if gpio is not None and gpio['indices'].size > 1 and sync_label is not None:
_logger.info(f'Aligning to {sync_label} TTLs')
# Extract sync TTLs
ttl = get_sync_fronts(sync, chmap[sync_label])
_, ts = raw.load_camera_ssv_times(self.session_path, self.label)
try:
"""
NB: Some of the sync TTLs occur very close together, and are therefore not
reflected in the pin state. This function removes those. Also converts frame
times to DAQ time.
"""
gpio, ttl, ts = groom_pin_state(gpio, ttl, ts, display=display)
"""
The length of the count and pin state are regularly longer than the length of
the video file. Here we assert that the video is either shorter or the same
length as the arrays, and we make an assumption that the missing frames are
right at the end of the video. We therefore simply shorten the arrays to match
the length of the video.
"""
if count.size > length:
count = count[:length]
else:
assert length == count.size, 'fewer counts than frames'
assert raw_ts.shape[0] > 0, 'no timestamps found in channel indicated for ' \
f'{self.label} camera'
return align_with_gpio(raw_ts, ttl, gpio, count,
display=display,
extrapolate_missing=extrapolate_missing)
except AssertionError as ex:
_logger.critical('Failed to extract using %s: %s', sync_label, ex)
# If you reach here extracting using sync TTLs was not possible, we attempt to align using wheel motion energy
_logger.warning('Attempting to align using wheel')
try:
if self.label not in ['left', 'right']:
# Can only use wheel alignment for left and right cameras
raise ValueError(f'Wheel alignment not supported for {self.label} camera')
motion_class = vmotion.MotionAlignmentFullSession(self.session_path, self.label, sync='nidq', upload=True)
new_times = motion_class.process()
if not motion_class.qc_outcome:
raise ValueError(f'Wheel alignment for {self.label} camera failed to pass qc: {motion_class.qc}')
else:
_logger.warning(f'Wheel alignment for {self.label} camera successful, qc: {motion_class.qc}')
return new_times
except Exception as err:
_logger.critical(f'Failed to align with wheel for {self.label} camera: {err}')
if length < raw_ts.size:
df = raw_ts.size - length
_logger.info(f'Discarding first {df} pulses')
raw_ts = raw_ts[df:]
return raw_ts
[docs]
class CameraTimestampsCamlog(BaseExtractor):
def __init__(self, label, session_path=None):
super().__init__(session_path)
self.label = assert_valid_label(label)
self.save_names = f'_ibl_{label}Camera.times.npy'
self.var_names = f'{label}_camera_timestamps'
self._log_level = _logger.level
_logger.setLevel(logging.DEBUG)
def __del__(self):
_logger.setLevel(self._log_level)
def _extract(self, sync=None, chmap=None, video_path=None,
display=False, extrapolate_missing=True, **kwargs):
fpga_times = extract_camera_sync(sync=sync, chmap=chmap)
video_frames = get_video_length(self.session_path.joinpath('raw_video_data', f'_iblrig_{self.label}Camera.raw.mp4'))
raw_ts = fpga_times[self.label]
# For left camera sometimes we have one extra pulse than video frame
if (raw_ts.size - video_frames) == 1:
_logger.warning(f'One extra sync pulse detected for {self.label} camera')
raw_ts = raw_ts[:-1]
elif (raw_ts.size - video_frames) == -1:
_logger.warning(f'One extra video frame detected for {self.label} camera')
med_time = np.median(np.diff(raw_ts))
raw_ts = np.r_[raw_ts, np.array([raw_ts[-1] + med_time])]
assert video_frames == raw_ts.size, f'dimension mismatch between video frames and TTL pulses for {self.label} camera' \
f' by {np.abs(video_frames - raw_ts.size)} frames'
return raw_ts
[docs]
class CameraTimestampsBpod(BaseBpodTrialsExtractor):
"""
Get the camera timestamps from the Bpod
The camera events are logged only during the events not in between, so the times need
to be interpolated
"""
save_names = '_ibl_leftCamera.times.npy'
var_names = 'left_camera_timestamps'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._log_level = _logger.level
_logger.setLevel(logging.DEBUG)
def __del__(self):
_logger.setLevel(self._log_level)
def _extract(self, video_path=None, display=False, extrapolate_missing=True, **kwargs):
"""
The raw timestamps are taken from the Bpod. These are the times of the camera's frame TTLs.
If the pin state file exists, these timestamps are aligned to the video frames using the
sync TTLs. Frames missing from the embedded frame count are removed from the timestamps
array.
If the pin state file does not exist, the left camera timestamps may be aligned using the
wheel data.
:param video_path: an optional path for fetching the number of frames. If None,
the video is loaded from the session path. If an int is provided this is taken to be
the total number of frames.
:param display: if True, the TTL and GPIO fronts are plotted.
:param extrapolate_missing: if True, any missing timestamps at the beginning and end of
the session are extrapolated based on the median frame rate, otherwise they will be NaNs.
:return: a numpy array of camera timestamps
"""
raw_ts = self._times_from_bpod()
count, (*_, gpio) = raw.load_embedded_frame_data(self.session_path, 'left')
if video_path is None:
filename = '_iblrig_leftCamera.raw.mp4'
video_path = self.session_path.joinpath('raw_video_data', filename)
# Permit the video path to be the length for development and debugging purposes
length = video_path if isinstance(video_path, int) else get_video_length(video_path)
_logger.debug(f'Number of video frames = {length}')
# Check if the GPIO is usable for extraction. GPIO is None if the file does not exist,
# is empty, or contains only one value (i.e. doesn't change)
if gpio is not None and gpio['indices'].size > 1:
_logger.info('Aligning to sync TTLs')
# Extract audio TTLs
_, audio = raw.load_bpod_fronts(self.session_path, data=self.bpod_trials,
task_collection=self.task_collection)
_, ts = raw.load_camera_ssv_times(self.session_path, 'left')
"""
There are many sync TTLs that are for some reason missed by the GPIO. Conversely
the last GPIO doesn't often correspond to any audio TTL. These will be removed.
The drift appears to be less severe than the DAQ, so when assigning TTLs we'll take
the nearest TTL within 500ms. The go cue TTLs comprise two short pulses ~3ms apart.
We will fuse any TTLs less than 5ms apart to make assignment more accurate.
"""
try:
gpio, audio, ts = groom_pin_state(gpio, audio, ts, take='nearest',
tolerance=.5, min_diff=5e-3, display=display)
if count.size > length:
count = count[:length]
else:
assert length == count.size, 'fewer counts than frames'
return align_with_gpio(raw_ts, audio, gpio, count,
extrapolate_missing, display=display)
except AssertionError as ex:
_logger.critical('Failed to extract using audio: %s', ex)
# If you reach here extracting using audio TTLs was not possible
_logger.warning('Alignment by wheel data not yet implemented')
# Extrapolate at median frame rate
n_missing = length - raw_ts.size
if n_missing > 0:
_logger.warning(f'{n_missing} fewer Bpod timestamps than frames; '
f'{"extrapolating" if extrapolate_missing else "appending nans"}')
frate = np.median(np.diff(raw_ts))
to_app = ((np.arange(n_missing, ) + 1) / frate + raw_ts[-1]
if extrapolate_missing
else np.full(n_missing, np.nan))
raw_ts = np.r_[raw_ts, to_app] # Append the missing times
elif n_missing < 0:
_logger.warning(f'{abs(n_missing)} fewer frames than Bpod timestamps')
_logger.info(f'Discarding first {abs(n_missing)} pulses')
raw_ts = raw_ts[abs(n_missing):]
return raw_ts
def _times_from_bpod(self):
ntrials = len(self.bpod_trials)
cam_times = []
n_frames = 0
n_out_of_sync = 0
missed_trials = []
for ind in range(ntrials):
# get upgoing and downgoing fronts
events = self.bpod_trials[ind]['behavior_data']['Events timestamps']
pin = np.array(events.get('Port1In') or [np.nan])
pout = np.array(events.get('Port1Out') or [np.nan])
# some trials at startup may not have the camera working, discard
if np.isnan(pin).all():
missed_trials.append(ind)
continue
# if the trial starts in the middle of a square, discard the first downgoing front
if pout[0] < pin[0]:
pout = pout[1:]
# same if the last sample is during an upgoing front,
# always put size as it happens last
pin = pin[:pout.size]
frate = np.median(np.diff(pin))
if ind > 0:
"""
assert that the pulses have the same length and that we don't miss frames during
the trial, the refresh rate of bpod is 100us
"""
test1 = np.all(np.abs(1 - (pin - pout) / np.median(pin - pout)) < 0.1)
test2 = np.all(np.abs(np.diff(pin) - frate) <= 0.00011)
if not all([test1, test2]):
n_out_of_sync += 1
# grow a list of cam times for ech trial
cam_times.append(pin)
n_frames += pin.size
if missed_trials:
_logger.debug('trial(s) %s missing TTL events', range_str(missed_trials))
if n_out_of_sync > 0:
_logger.warning(f"{n_out_of_sync} trials with bpod camera frame times not within"
f" 10% of the expected sampling rate")
t_first_frame = np.array([c[0] for c in cam_times])
t_last_frame = np.array([c[-1] for c in cam_times])
frate = 1 / np.nanmedian(np.array([np.median(np.diff(c)) for c in cam_times]))
intertrial_duration = t_first_frame[1:] - t_last_frame[:-1]
intertrial_missed_frames = np.int32(np.round(intertrial_duration * frate)) - 1
# initialize the full times array
frame_times = np.zeros(n_frames + int(np.sum(intertrial_missed_frames)))
ii = 0
for trial, cam_time in enumerate(cam_times):
if cam_time is not None:
# populate first the recovered times within the trials
frame_times[ii: ii + cam_time.size] = cam_time
ii += cam_time.size
if trial == (len(cam_times) - 1):
break
# then extrapolate in-between
nmiss = intertrial_missed_frames[trial]
frame_times[ii: ii + nmiss] = (cam_time[-1] + intertrial_duration[trial] /
(nmiss + 1) * (np.arange(nmiss) + 1))
ii += nmiss
assert all(np.diff(frame_times) > 0) # negative diffs implies a big problem
return frame_times
[docs]
def align_with_gpio(timestamps, ttl, pin_state, count, extrapolate_missing=True, display=False):
"""
Groom the raw DAQ or Bpod camera timestamps using the frame embedded GPIO and frame counter.
Parameters
----------
timestamps : numpy.array
An array of raw DAQ or Bpod camera timestamps.
ttl : dict
A dictionary of DAQ sync TTLs, with keys {'times', 'polarities'}.
pin_state : dict
A dictionary containing GPIO pin state values, with keys {'indices', 'polarities'}.
count : numpy.array
An array of frame numbers.
extrapolate_missing : bool
If true and the number of timestamps is fewer than the number of frame counts, the
remaining timestamps are extrapolated based on the frame rate, otherwise they are NaNs.
display : bool
Plot the resulting timestamps.
Returns
-------
numpy.array
The corrected frame timestamps.
"""
# Some assertions made on the raw data
# assert count.size == pin_state.size, 'frame count and pin state size mismatch'
assert all(np.diff(count) > 0), 'frame count not strictly increasing'
assert all(np.diff(timestamps) > 0), 'DAQ/Bpod camera times not strictly increasing'
same_n_ttl = pin_state['times'].size == ttl['times'].size
assert same_n_ttl, 'more ttl TTLs detected on camera than TTLs sent'
"""Here we will ensure that the DAQ camera times match the number of video frames in
length. We will make the following assumptions:
1. The number of DAQ camera times is equal to or greater than the number of video frames.
2. No TTLs were missed between the camera and DAQ.
3. No pin states were missed by Bonsai.
4 No pixel count data was missed by Bonsai.
In other words the count and pin state arrays accurately reflect the number of frames
sent by the camera and should therefore be the same length, and the length of the frame
counter should match the number of saved video frames.
The missing frame timestamps are removed in three stages:
1. Remove any timestamps that occurred before video frame acquisition in Bonsai.
2. Remove any timestamps where the frame counter reported missing frames, i.e. remove the
dropped frames which occurred throughout the session.
3. Remove the trailing timestamps at the end of the session if the camera was turned off
in the wrong order.
"""
# Align on first pin state change
first_uptick = pin_state['indices'][0]
first_ttl = np.searchsorted(timestamps, ttl['times'][0])
"""Here we find up to which index in the DAQ times we discard by taking the difference
between the index of the first pin state change (when the sync TTL was reported by the
camera) and the index of the first sync TTL in DAQ time. We subtract the difference
between the frame count at the first pin state change and the index to account for any
video frames that were not saved during this period (we will remove those from the
camera DAQ times later).
"""
# Minus any frames that were dropped between the start of frame acquisition and the first TTL
start = first_ttl - first_uptick - (count[first_uptick] - first_uptick)
# Get approximate frame rate for extrapolating timestamps (if required)
frate = round(1 / np.nanmedian(np.diff(timestamps)))
if start < 0:
n_missing = abs(start)
_logger.warning(f'{n_missing} missing DAQ/Bpod timestamp(s) at start; '
f'{"extrapolating" if extrapolate_missing else "prepending nans"}')
to_app = (timestamps[0] - (np.arange(n_missing, 0, -1) + 1) / frate
if extrapolate_missing
else np.full(n_missing, np.nan))
timestamps = np.r_[to_app, timestamps] # Prepend the missing times
start = 0
# Remove the extraneous timestamps from the beginning and end
end = count[-1] + 1 + start
ts = timestamps[start:end]
if (n_missing := count[-1] - ts.size + 1) > 0:
"""
For ephys sessions there may be fewer DAQ times than frame counts if DAQ acquisition is
turned off before the video acquisition workflow. For Bpod this always occurs because Bpod
finishes before the camera workflow. For Bpod the times are already extrapolated for
these late frames."""
_logger.warning(f'{n_missing} fewer DAQ/Bpod timestamps than frame counts; '
f'{"extrapolating" if extrapolate_missing else "appending nans"}')
to_app = ((np.arange(n_missing, ) + 1) / frate + ts[-1]
if extrapolate_missing
else np.full(n_missing, np.nan))
ts = np.r_[ts, to_app] # Append the missing times
assert ts.size >= count.size, 'fewer timestamps than frame counts'
assert ts.size == count[-1] + 1, 'more frames recorded in frame count than timestamps '
# Remove the rest of the dropped frames
ts = ts[count]
assert np.searchsorted(ts, ttl['times'][0]) == first_uptick, \
'time of first sync TTL doesn\'t match after alignment'
if ts.size != count.size:
_logger.error('number of timestamps and frames don\'t match after alignment')
if display:
# Plot to check
fig, axes = plt.subplots(1, 1)
y = within_ranges(np.arange(ts.size), pin_state['indices'].reshape(-1, 2)).astype(float)
y *= 1e-5 # For scale when zoomed in
axes.plot(ts, y, marker='d', color='blue', drawstyle='steps-pre', label='GPIO')
axes.plot(ts, np.zeros_like(ts), 'kx', label='DAQ timestamps')
vertical_lines(ttl['times'], ymin=0, ymax=1e-5,
color='r', linestyle=':', ax=axes, label='sync TTL')
plt.legend()
return ts
[docs]
def attribute_times(arr, events, tol=.1, injective=True, take='first'):
"""
Returns the values of the first array that correspond to those of the second.
Given two arrays of timestamps, the function will return the values of the first array
that most likely correspond to the values of the second. For each of the values in the
second array, the absolute difference is taken and the index of either the first sufficiently
close value, or simply the closest one, is assigned.
If injective is True, once a value has been assigned to an event it can't be assigned to
another. In other words there is a one-to-one mapping between the two arrays.
Parameters
----------
arr : numpy.array
An array of event times to attribute to those in `events`.
events : numpy.array
An array of event times considered a subset of `arr`.
tol : float
The max absolute difference between values in order to be considered a match.
injective : bool
If true, once a value has been assigned it will not be assigned again.
take : {'first', 'nearest', 'after'}
If 'first' the first value within tolerance is assigned; if 'nearest' the
closest value is assigned; if 'after' assign the first event after.
Returns
-------
numpy.array
An array the same length as `events` containing indices of `arr` corresponding to each
event.
"""
if (take := take.lower()) not in ('first', 'nearest', 'after'):
raise ValueError('Parameter `take` must be either "first", "nearest", or "after"')
stack = np.ma.masked_invalid(arr, copy=False)
stack.fill_value = np.inf
# If there are no invalid values, the mask is False so let's ensure it's a bool array
if stack.mask is np.bool_(0):
stack.mask = np.zeros(arr.shape, dtype=bool)
assigned = np.full(events.shape, -1, dtype=int) # Initialize output array
min_tol = 0 if take == 'after' else -tol
for i, x in enumerate(events):
dx = stack.filled() - x
candidates = np.logical_and(min_tol < dx, dx < tol)
if any(candidates): # is any value within tolerance
idx = np.abs(dx).argmin() if take == 'nearest' else np.where(candidates)[0][0]
assigned[i] = idx
stack.mask[idx] = injective # If one-to-one, remove the assigned value
return assigned
[docs]
def groom_pin_state(gpio, ttl, ts, tolerance=2., display=False, take='first', min_diff=0.):
"""
Align the GPIO pin state to the DAQ sync TTLs. Any sync TTLs not reflected in the pin
state are removed from the dict and the times of the detected fronts are converted to DAQ
time. At the end of this the number of GPIO fronts should equal the number of TTLs.
Note:
- This function is ultra safe: we probably don't need assign all the ups and down fronts.
separately and could potentially even align the timestamps without removing the missed fronts
- The input gpio and TTL dicts may be modified by this function.
- For training sessions the frame rate is only 30Hz and the TTLs tend to be broken up by
small gaps. Setting the min_diff to 5ms helps the timestamp assignment accuracy.
Parameters
----------
gpio : dict
A dictionary containing GPIO pin state values, with keys {'indices', 'polarities'}.
ttl : dict
A dictionary of DAQ sync TTLs, with keys {'times', 'polarities'}.
ts : numpy.array
The camera frame times (the camera frame TTLs acquired by the main DAQ).
tolerance : float
Two pulses need to be within this many seconds to be considered related.
display : bool
If true, the resulting timestamps are plotted against the raw audio signal.
take : {'first', 'nearest'}
If 'first' the first value within tolerance is assigned; if 'nearest' the
closest value is assigned.
min_diff : float
Sync TTL fronts less than min_diff seconds apart will be removed.
Returns
-------
dict
Dictionary of GPIO DAQ front indices, polarities and DAQ aligned times.
dict
Sync TTL times and polarities sans the TTLs not detected in the frame data.
numpy.array
Frame times in DAQ time.
See Also
--------
ibllib.io.extractors.ephys_fpga._get_sync_fronts
"""
# Check that the dimensions match
if np.any(gpio['indices'] >= ts.size):
_logger.warning('GPIO events occurring beyond timestamps array length')
keep = gpio['indices'] < ts.size
gpio = {k: gpio[k][keep] for k, v in gpio.items()}
assert ttl and ttl['times'].size > 0, 'no sync TTLs for session'
assert ttl['times'].size == ttl['polarities'].size, 'sync TTL data dimension mismatch'
# make sure that there are no 2 consecutive fall or consecutive rise events
assert np.all(np.abs(np.diff(ttl['polarities'])) == 2), 'consecutive high/low sync TTL events'
# make sure first TTL is high
assert ttl['polarities'][0] == 1
# make sure ttl times in order
assert np.all(np.diff(ttl['times']) > 0)
# make sure raw timestamps increase
assert np.all(np.diff(ts) > 0), 'timestamps must strictly increase'
# make sure there are state changes
assert gpio['indices'].any(), 'no TTLs detected in GPIO'
# # make sure first GPIO state is high
assert gpio['polarities'][0] == 1
"""
Some sync TTLs appear to be so short that they are not recorded by the camera. These can
be as short as a few microseconds. Applying a cutoff based on framerate was unsuccessful.
Assigning each sync TTL to each pin state change is not easy because some onsets occur very
close together (sometimes < 70ms), on the order of the delay between TTL and frame time.
Also, the two clocks have some degree of drift, so the delay between sync TTL and pin state
change may be zero or even negative.
Here we split the events into sync TTL onsets (lo->hi) and TTL offsets (hi->lo). For each
uptick in the GPIO pin state, we take the first TTL onset time that was within 100ms of it.
We ensure that each sync TTL is assigned only once, so a TTL that is closer to frame 3 than
frame 1 may still be assigned to frame 1.
"""
ifronts = gpio['indices'] # The pin state flips
sync_times = ttl['times']
if ifronts.size != ttl['times'].size:
_logger.warning('more sync TTLs than GPIO state changes, assigning timestamps')
to_remove = np.zeros(ifronts.size, dtype=bool) # unassigned GPIO fronts to remove
low2high = ifronts[gpio['polarities'] == 1]
high2low = ifronts[gpio['polarities'] == -1]
assert low2high.size >= high2low.size
# Remove and/or fuse short TTLs
if min_diff > 0:
short, = np.where(np.diff(ttl['times']) < min_diff)
sync_times = np.delete(ttl['times'], np.r_[short, short + 1])
_logger.debug(f'Removed {short.size * 2} fronts TLLs less than '
f'{min_diff * 1e3:.0f}ms apart')
assert sync_times.size > 0, f'all sync TTLs less than {min_diff}s'
# Onsets
ups = ts[low2high] - ts[low2high][0] # times relative to first GPIO high
onsets = sync_times[::2] - sync_times[0] # TTL times relative to first onset
# assign GPIO fronts to ttl onset
assigned = attribute_times(onsets, ups, tol=tolerance, take=take)
unassigned = np.setdiff1d(np.arange(onsets.size), assigned[assigned > -1])
if unassigned.size > 0:
_logger.debug(f'{unassigned.size} sync TTL rises were not detected by the camera')
# Check that all pin state upticks could be attributed to an onset TTL
if np.any(missed := assigned == -1):
_logger.warning(f'{sum(missed)} pin state rises could not be attributed to a sync TTL')
if display:
ax = plt.subplot()
vertical_lines(ups[assigned > -1],
linestyle='-', color='g', ax=ax,
label='assigned GPIO up state')
vertical_lines(ups[missed],
linestyle='-', color='r', ax=ax,
label='unassigned GPIO up state')
vertical_lines(onsets[unassigned],
linestyle=':', color='k', ax=ax,
alpha=0.3, label='sync TTL onset')
vertical_lines(onsets[assigned],
linestyle=':', color='b', ax=ax, label='assigned TTL onset')
plt.legend()
plt.show()
# Remove the missed fronts
to_remove = np.in1d(gpio['indices'], low2high[missed])
assigned = assigned[~missed]
onsets_ = sync_times[::2][assigned]
# Offsets
downs = ts[high2low] - ts[high2low][0]
offsets = sync_times[1::2] - sync_times[1]
assigned = attribute_times(offsets, downs, tol=tolerance, take=take)
unassigned = np.setdiff1d(np.arange(offsets.size), assigned[assigned > -1])
if unassigned.size > 0:
_logger.debug(f'{unassigned.size} sync TTL falls were not detected by the camera')
# Check that all pin state downticks could be attributed to an offset TTL
if np.any(missed := assigned == -1):
_logger.warning(f'{sum(missed)} pin state falls could not be attributed to a sync TTL')
# Remove the missed fronts
to_remove |= np.in1d(gpio['indices'], high2low[missed])
assigned = assigned[~missed]
offsets_ = sync_times[1::2][assigned]
# Sync TTLs groomed
if np.any(to_remove):
# Check for any orphaned fronts (only one pin state edge was assigned)
to_remove = np.pad(to_remove, (0, to_remove.size % 2), 'edge') # Ensure even size
# Perform xor to find GPIOs where only onset or offset is marked for removal
orphaned = to_remove.reshape(-1, 2).sum(axis=1) == 1
if orphaned.any():
"""If there are orphaned GPIO fronts (i.e. only one edge was assigned to a sync
TTL front), remove the orphaned front its assigned sync TTL. In other words
if both edges cannot be assigned to a sync TTL, we ignore the TTL entirely.
This is a sign that the assignment was bad and extraction may fail."""
_logger.warning('Some onsets but not offsets (or vice versa) were not assigned; '
'this may be a sign of faulty wiring or clock drift')
# Find indices of GPIO upticks where only the downtick was marked for removal
orphaned_onsets, = np.where(~to_remove.reshape(-1, 2)[:, 0] & orphaned)
# The onsets_ array already has the other TTLs removed (same size as to_remove ==
# False) so subtract the number of removed elements from index.
for i, v in enumerate(orphaned_onsets):
orphaned_onsets[i] -= to_remove.reshape(-1, 2)[:v, 0].sum()
# Same for offsets...
orphaned_offsets, = np.where(~to_remove.reshape(-1, 2)[:, 1] & orphaned)
for i, v in enumerate(orphaned_offsets):
orphaned_offsets[i] -= to_remove.reshape(-1, 2)[:v, 1].sum()
# Remove orphaned ttl onsets and offsets
onsets_ = np.delete(onsets_, orphaned_onsets[orphaned_onsets < onsets_.size])
offsets_ = np.delete(offsets_, orphaned_offsets[orphaned_offsets < offsets_.size])
_logger.debug(f'{orphaned.sum()} orphaned TTLs removed')
to_remove.reshape(-1, 2)[orphaned] = True
# Remove those unassigned GPIOs
gpio = {k: v[~to_remove[:v.size]] for k, v in gpio.items()}
ifronts = gpio['indices']
# Assert that we've removed discrete TTLs
# A failure means e.g. an up-going front of one TTL was missed but not the down-going one.
assert np.all(np.abs(np.diff(gpio['polarities'])) == 2)
assert gpio['polarities'][0] == 1
ttl_ = {'times': np.empty(ifronts.size), 'polarities': gpio['polarities']}
ttl_['times'][::2] = onsets_
ttl_['times'][1::2] = offsets_
else:
ttl_ = ttl.copy()
# Align the frame times to DAQ
fcn_a2b, drift_ppm = dsp.sync_timestamps(ts[ifronts], ttl_['times'])
_logger.debug(f'frame ttl alignment drift = {drift_ppm:.2f}ppm')
# Add times to GPIO dict
gpio['times'] = fcn_a2b(ts[ifronts])
if display:
# Plot all the onsets and offsets
ax = plt.subplot()
# All sync TTLs
squares(ttl['times'], ttl['polarities'],
ax=ax, label='sync TTLs', linestyle=':', color='k', yrange=[0, 1], alpha=0.3)
# GPIO
x = np.insert(gpio['times'], 0, 0)
y = np.arange(x.size) % 2
squares(x, y, ax=ax, label='GPIO')
y = within_ranges(np.arange(ts.size), ifronts.reshape(-1, 2)) # 0 or 1 for each frame
ax.plot(fcn_a2b(ts), y, 'kx', label='cam times')
# Assigned ttl
squares(ttl_['times'], ttl_['polarities'],
ax=ax, label='assigned sync TTL', linestyle=':', color='g', yrange=[0, 1])
ax.legend()
plt.xlabel('DAQ time (s)')
ax.set_yticks([0, 1])
ax.set_title('GPIO - sync TTL alignment')
plt.show()
return gpio, ttl_, fcn_a2b(ts)