Source code for ibllib.io.raw_daq_loaders
"""Loader functions for various DAQ data formats."""
from pathlib import Path
import logging
from collections import OrderedDict, defaultdict
import json
import nptdms
import numpy as np
import ibldsp.utils
import one.alf.io as alfio
import one.alf.exceptions as alferr
from one.alf.spec import to_alf
from ibllib.io.extractors.default_channel_maps import all_default_labels
logger = logging.getLogger(__name__)
[docs]
def load_raw_daq_tdms(path) -> 'nptdms.tdms.TdmsFile':
"""
Load a raw DAQ TDMS file.
Parameters
----------
path : str, pathlib.Path
The location of the .tdms file to laod.
Returns
-------
nptdms.tdms.TdmsFile
The loaded TDMS object.
"""
from nptdms import TdmsFile
# If path is a directory, glob for a tdms file
if (path := Path(path)).is_dir(): # cast to pathlib.Path
file_path = next(path.glob('*.tdms'), None)
else:
file_path = path
if not file_path or not file_path.exists():
raise FileNotFoundError
return TdmsFile.read(file_path)
[docs]
def load_channels_tdms(path, chmap=None):
"""
Note: This currently cannot deal with arbitrary groups.
Parameters
----------
path : str, pathlib.Path
The file or folder path of the raw TDMS data file.
chmap: dictionary mapping devices names to channel codes: example {"photometry": 'AI0', 'bpod': 'AI1'}
if None, will read all of available channel from the DAQ
Returns
-------
"""
def _load_digital_channels(data_file, group='Digital', ch='AuxPort'):
# the digital channels are encoded on a single uint8 channel where each bit corresponds to an input channel
ddata = data_file[group][ch].data.astype(np.uint8)
nc = int(2 ** np.floor(np.log2(np.max(ddata))))
ddata = np.unpackbits(ddata[:, np.newaxis], axis=1, count=nc, bitorder='little')
data = {}
for i in range(ddata.shape[1]):
data[f'DI{i}'] = ddata[:, i]
return data
data_file = load_raw_daq_tdms(path)
data = {}
digital_channels = None
fs = np.nan
if chmap:
for name, ch in chmap.items():
if ch.lower()[0] == 'a':
data[name] = data_file['Analog'][ch.upper()].data
fs = data_file['Analog'].properties['ScanRate']
elif ch.lower()[0] == 'd':
# do not attempt to load digital channels several times
digital_channels = digital_channels or _load_digital_channels(data_file)
data[name] = digital_channels[ch.upper()]
fs = data_file['Digital'].properties['ScanRate']
else:
raise NotImplementedError(f'Extraction of channel "{ch}" not implemented')
else:
for group in (x.name for x in data_file.groups()):
for ch in (x.name for x in data_file[group].channels()):
if group == 'Digital' and ch == 'AuxPort':
data = {**data, **_load_digital_channels(data_file, group, ch)}
else:
data[ch] = data_file[group][ch.upper()].data
fs = data_file[group].properties['ScanRate'] # from daqami it's unclear that fs could be set per channel
return data, fs
[docs]
def load_sync_tdms(path, sync_map, fs=None, threshold=2.5, floor_percentile=10):
"""
Load a sync channels from a raw DAQ TDMS file.
Parameters
----------
path : str, pathlib.Path
The file or folder path of the raw TDMS data file.
sync_map : dict
A map of channel names and channel IDs.
fs : float
Sampling rate in Hz.
threshold : float
The threshold for applying to analogue channels
floor_percentile : float
10% removes the percentile value of the analog trace before thresholding. This is to avoid
DC offset drift.
Returns
-------
one.alf.io.AlfBunch
A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
the corresponding channel numbers.
dict
A map of channel names and their corresponding indices.
"""
data_file = load_raw_daq_tdms(path)
sync = {}
if any(x.lower()[0] != 'a' for x in sync_map.values()):
raise NotImplementedError('Non-analogue or arbitrary group channel extraction not supported')
raw_channels = [ch for ch in data_file['Analog'].channels() if ch.name.lower() in sync_map.values()]
analogue = np.vstack([ch.data for ch in raw_channels])
channel_ids = OrderedDict([(ch.name.lower(), ch.properties['ChannelKey']) for ch in raw_channels])
offset = np.percentile(analogue, floor_percentile, axis=0)
logger.info(f'estimated analogue channel DC Offset approx. {np.mean(offset):.2f}')
analogue -= offset
ttl = analogue > threshold
ind, sign = ibldsp.utils.fronts(ttl.astype(int))
try: # attempt to get the times from the meta data
times = np.vstack([ch.time_track() for ch in raw_channels])
times = times[tuple(ind)]
except KeyError:
assert fs
times = ind[1].astype(float) * 1/fs # noqa
# Sort by times
ind_sorted = np.argsort(times)
sync['times'] = times[ind_sorted]
# Map index to channel key then reindex by sorted times
sync['channels'] = np.fromiter(channel_ids.values(), dtype=int)[ind[0]][ind_sorted]
sync['polarities'] = sign[ind_sorted]
# Map sync name to channel key
sync_map = {v.lower(): k for k, v in sync_map.items()} # turn inside-out
chmap = {sync_map[k]: v for k, v in channel_ids.items()}
return sync, chmap
[docs]
def correct_counter_discontinuities(raw, overflow=2**32):
"""
Correct over- and underflow wrap around values for DAQ counter channel.
Parameters
----------
raw : numpy.array
An array of counts.
overflow : int
The maximum representable value of the data before it was cast to float64.
Returns
-------
numpy.array
An array of counts with the over- and underflow discontinuities removed.
"""
flowmax = overflow - 1
d = np.diff(raw)
# correct for counter flow discontinuities
d[d >= flowmax] = d[d >= flowmax] - flowmax
d[d <= -flowmax] = d[d <= -flowmax] + flowmax
return np.cumsum(np.r_[0, d]) + raw[0] # back to position
[docs]
def load_timeline_sync_and_chmap(alf_path, chmap=None, timeline=None, save=True):
"""Load the sync and channel map from disk.
If the sync files do not exist, they are extracted from the raw DAQ data and saved.
Parameters
----------
alf_path : str, pathlib.Path
The folder containing the sync file and raw DAQ data.
chmap : dict
An optional channel map, otherwise extracted based on the union of timeline meta data and
default extractor channel map names.
timeline : dict
An optional timeline object, otherwise is loaded from alf_path.
save : bool
If true, save the sync files if they don't already exist.
Returns
-------
one.alf.io.AlfBunch
A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
the corresponding channel numbers.
dict, optional
A map of channel names and their corresponding indices for sync channels, if chmap is None.
"""
if not chmap:
if not timeline:
meta = alfio.load_object(alf_path, 'DAQdata', namespace='timeline', attribute='meta')['meta']
else:
meta = timeline['meta']
chmap = timeline_meta2chmap(meta, include_channels=all_default_labels())
try:
sync = alfio.load_object(alf_path, 'sync')
except alferr.ALFObjectNotFound:
if not timeline:
timeline = alfio.load_object(alf_path, 'DAQdata')
sync = extract_sync_timeline(timeline, chmap=chmap)
if save:
alfio.save_object_npy(alf_path, sync, object='sync', namespace='timeline')
return sync, chmap
[docs]
def extract_sync_timeline(timeline, chmap=None, floor_percentile=10, threshold=None):
"""
Load sync channels from a timeline object.
Note: Because the scan frequency is typically faster than the sample rate, the position and
edge count channels may detect more than one front between samples. Therefore for these, the
raw data is more accurate than the extracted polarities.
Parameters
----------
timeline : dict, str, pathlib.Path
A timeline object or the file or folder path of the _timeline_DAQdata files.
chmap : dict
A map of channel names and channel IDs.
floor_percentile : float
10% removes the percentile value of the analog trace before thresholding. This is to avoid
DC offset drift.
threshold : float, dict of str: float
The threshold for applying to analogue channels. If None, take mean after subtracting
floor percentile offset.
Returns
-------
one.alf.io.AlfBunch
A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
the corresponding channel numbers.
dict, optional
A map of channel names and their corresponding indices for sync channels, if chmap is None.
"""
if isinstance(timeline, (str, Path)):
timeline = alfio.load_object(timeline, 'DAQdata', namespace='timeline')
assert timeline.keys() >= {'timestamps', 'raw', 'meta'}, 'Timeline object missing attributes'
# If no channel map was passed, load it from 'wiring' file, or extract from meta file
return_chmap = chmap is None
chmap = chmap or timeline.get('wiring') or timeline_meta2chmap(timeline['meta'])
# Initialize sync object
sync = alfio.AlfBunch((k, np.array([], dtype=d)) for k, d in
(('times', 'f'), ('channels', 'u1'), ('polarities', 'i1')))
for label, i in chmap.items():
try:
info = next(x for x in timeline['meta']['inputs'] if x['name'].lower() == label.lower())
except StopIteration:
logger.warning('sync channel "%s" not found', label)
continue
raw = timeline['raw'][:, info['arrayColumn'] - 1] # -1 because MATLAB indexes from 1
if info['measurement'] == 'Voltage':
# Get TLLs by applying a threshold to the diff of voltage samples
offset = np.percentile(raw, floor_percentile, axis=0)
daqID = info['daqChannelID']
logger.debug(f'{label} ({daqID}): estimated analogue channel DC Offset approx. {np.mean(offset):.2f}')
step = threshold.get(label) if isinstance(threshold, dict) else threshold
if step is None:
step = np.max(raw - offset) / 2
iup = ibldsp.utils.rises(raw - offset, step=step, analog=True)
idown = ibldsp.utils.falls(raw - offset, step=step, analog=True)
pol = np.r_[np.ones_like(iup), -np.ones_like(idown)].astype('i1')
ind = np.r_[iup, idown]
sync.polarities = np.concatenate((sync.polarities, pol))
elif info['measurement'] == 'EdgeCount':
# Monotonically increasing values; extract indices where delta == 1
raw = correct_counter_discontinuities(raw)
ind, = np.where(np.diff(raw))
ind += 1
sync.polarities = np.concatenate((sync.polarities, np.ones_like(ind, dtype='i1')))
elif info['measurement'] == 'Position':
# Bidirectional; extract indices where delta != 0
raw = correct_counter_discontinuities(raw)
d = np.diff(raw)
ind, = np.where(~np.isclose(d, 0))
sync.polarities = np.concatenate((sync.polarities, np.sign(d[ind]).astype('i1')))
ind += 1
else:
raise NotImplementedError(f'{info["measurement"]} sync extraction')
# Append timestamps of indices and channel index to sync arrays
sync.times = np.concatenate((sync.times, timeline['timestamps'][ind]))
sync.channels = np.concatenate((sync.channels, np.full(ind.shape, i, dtype='u1')))
# Sort arrays by time
assert sync.check_dimensions == 0
t_ind = np.argsort(sync.times)
for k in sync:
sync[k] = sync[k][t_ind]
if return_chmap:
return sync, chmap
else:
return sync
[docs]
def timeline_meta2wiring(path, save=False):
"""
Given a timeline meta data object, return a dictionary of wiring info.
Parameters
----------
path : str, pathlib.Path
The path of the timeline meta file, _timeline_DAQdata.meta.
save : bool
If true, save the timeline wiring file in the same location as the meta file,
_timeline_DAQData.wiring.json.
Returns
-------
dict
A dictionary with base keys {'SYSTEM', 'SYNC_WIRING_DIGITAL', 'SYNC_WIRING_ANALOG'}, the
latter of which contain maps of channel names and their IDs.
pathlib.Path
If save=True, returns the path of the wiring file.
"""
meta = alfio.load_object(path, 'DAQdata', namespace='timeline', attribute='meta').get('meta')
assert meta, 'No meta data in timeline object'
wiring = defaultdict(dict, SYSTEM='timeline')
for input in meta['inputs']:
key = 'SYNC_WIRING_' + ('ANALOG' if input['measurement'] == 'Voltage' else 'DIGITAL')
wiring[key][input['daqChannelID']] = input['name']
if save:
out_path = Path(path) / to_alf('DAQ data', 'wiring', 'json', namespace='timeline')
with open(out_path, 'w') as fp:
json.dump(wiring, fp)
return dict(wiring), out_path
return dict(wiring)
[docs]
def timeline_meta2chmap(meta, exclude_channels=None, include_channels=None):
"""
Convert a timeline meta object to a sync channel map.
Parameters
----------
meta : dict
A loaded timeline metadata file, i.e. _timeline_DAQdata.meta.
exclude_channels : list
An optional list of channels to exclude from the channel map.
include_channels : list
An optional list of channels to include from the channel map, takes priority over the
exclude list.
Returns
-------
dict
A map of channel names and their corresponding indices for sync channels.
"""
chmap = {}
for input in meta.get('inputs', []):
if (include_channels is not None and input['name'] not in include_channels) or \
(exclude_channels and input['name'] in exclude_channels):
continue
chmap[input['name']] = input['arrayColumn']
return chmap
[docs]
def timeline_get_channel(timeline, channel_name):
"""
Given a timeline object, returns the vector of values recorded from a given channel name.
Parameters
----------
timeline : one.alf.io.AlfBunch
A loaded timeline object.
channel_name : str
The name of a channel to extract.
Returns
-------
numpy.array
The channel data.
"""
idx = next(ch['arrayColumn'] for ch in timeline['meta']['inputs'] if ch['name'] == channel_name)
return timeline['raw'][:, idx - 1] # -1 because MATLAB indices start from 1, not 0