Source code for ibllib.io.raw_daq_loaders

"""Loader functions for various DAQ data formats."""
from pathlib import Path
import logging
from collections import OrderedDict, defaultdict
import json

import nptdms
import numpy as np
import neurodsp.utils
import one.alf.io as alfio
import one.alf.exceptions as alferr
from one.alf.spec import to_alf

from ibllib.io.extractors.default_channel_maps import all_default_labels

logger = logging.getLogger(__name__)


[docs] def load_raw_daq_tdms(path) -> 'nptdms.tdms.TdmsFile': """ Load a raw DAQ TDMS file. Parameters ---------- path : str, pathlib.Path The location of the .tdms file to laod. Returns ------- nptdms.tdms.TdmsFile The loaded TDMS object. """ from nptdms import TdmsFile # If path is a directory, glob for a tdms file if (path := Path(path)).is_dir(): # cast to pathlib.Path file_path = next(path.glob('*.tdms'), None) else: file_path = path if not file_path or not file_path.exists(): raise FileNotFoundError return TdmsFile.read(file_path)
[docs] def load_channels_tdms(path, chmap=None): """ Note: This currently cannot deal with arbitrary groups. Parameters ---------- path : str, pathlib.Path The file or folder path of the raw TDMS data file. chmap: dictionary mapping devices names to channel codes: example {"photometry": 'AI0', 'bpod': 'AI1'} if None, will read all of available channel from the DAQ Returns ------- """ def _load_digital_channels(data_file, group='Digital', ch='AuxPort'): # the digital channels are encoded on a single uint8 channel where each bit corresponds to an input channel ddata = data_file[group][ch].data.astype(np.uint8) nc = int(2 ** np.floor(np.log2(np.max(ddata)))) ddata = np.unpackbits(ddata[:, np.newaxis], axis=1, count=nc, bitorder='little') data = {} for i in range(ddata.shape[1]): data[f'DI{i}'] = ddata[:, i] return data data_file = load_raw_daq_tdms(path) data = {} digital_channels = None fs = np.nan if chmap: for name, ch in chmap.items(): if ch.lower()[0] == 'a': data[name] = data_file['Analog'][ch.upper()].data fs = data_file['Analog'].properties['ScanRate'] elif ch.lower()[0] == 'd': # do not attempt to load digital channels several times digital_channels = digital_channels or _load_digital_channels(data_file) data[name] = digital_channels[ch.upper()] fs = data_file['Digital'].properties['ScanRate'] else: raise NotImplementedError(f'Extraction of channel "{ch}" not implemented') else: for group in (x.name for x in data_file.groups()): for ch in (x.name for x in data_file[group].channels()): if group == 'Digital' and ch == 'AuxPort': data = {**data, **_load_digital_channels(data_file, group, ch)} else: data[ch] = data_file[group][ch.upper()].data fs = data_file[group].properties['ScanRate'] # from daqami it's unclear that fs could be set per channel return data, fs
[docs] def load_sync_tdms(path, sync_map, fs=None, threshold=2.5, floor_percentile=10): """ Load a sync channels from a raw DAQ TDMS file. Parameters ---------- path : str, pathlib.Path The file or folder path of the raw TDMS data file. sync_map : dict A map of channel names and channel IDs. fs : float Sampling rate in Hz. threshold : float The threshold for applying to analogue channels floor_percentile : float 10% removes the percentile value of the analog trace before thresholding. This is to avoid DC offset drift. Returns ------- one.alf.io.AlfBunch A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and the corresponding channel numbers. dict A map of channel names and their corresponding indices. """ data_file = load_raw_daq_tdms(path) sync = {} if any(x.lower()[0] != 'a' for x in sync_map.values()): raise NotImplementedError('Non-analogue or arbitrary group channel extraction not supported') raw_channels = [ch for ch in data_file['Analog'].channels() if ch.name.lower() in sync_map.values()] analogue = np.vstack([ch.data for ch in raw_channels]) channel_ids = OrderedDict([(ch.name.lower(), ch.properties['ChannelKey']) for ch in raw_channels]) offset = np.percentile(analogue, floor_percentile, axis=0) logger.info(f'estimated analogue channel DC Offset approx. {np.mean(offset):.2f}') analogue -= offset ttl = analogue > threshold ind, sign = neurodsp.utils.fronts(ttl.astype(int)) try: # attempt to get the times from the meta data times = np.vstack([ch.time_track() for ch in raw_channels]) times = times[tuple(ind)] except KeyError: assert fs times = ind[1].astype(float) * 1/fs # noqa # Sort by times ind_sorted = np.argsort(times) sync['times'] = times[ind_sorted] # Map index to channel key then reindex by sorted times sync['channels'] = np.fromiter(channel_ids.values(), dtype=int)[ind[0]][ind_sorted] sync['polarities'] = sign[ind_sorted] # Map sync name to channel key sync_map = {v.lower(): k for k, v in sync_map.items()} # turn inside-out chmap = {sync_map[k]: v for k, v in channel_ids.items()} return sync, chmap
[docs] def correct_counter_discontinuities(raw, overflow=2**32): """ Correct over- and underflow wrap around values for DAQ counter channel. Parameters ---------- raw : numpy.array An array of counts. overflow : int The maximum representable value of the data before it was cast to float64. Returns ------- numpy.array An array of counts with the over- and underflow discontinuities removed. """ flowmax = overflow - 1 d = np.diff(raw) # correct for counter flow discontinuities d[d >= flowmax] = d[d >= flowmax] - flowmax d[d <= -flowmax] = d[d <= -flowmax] + flowmax return np.cumsum(np.r_[0, d]) + raw[0] # back to position
[docs] def load_timeline_sync_and_chmap(alf_path, chmap=None, timeline=None, save=True): """Load the sync and channel map from disk. If the sync files do not exist, they are extracted from the raw DAQ data and saved. Parameters ---------- alf_path : str, pathlib.Path The folder containing the sync file and raw DAQ data. chmap : dict An optional channel map, otherwise extracted based on the union of timeline meta data and default extractor channel map names. timeline : dict An optional timeline object, otherwise is loaded from alf_path. save : bool If true, save the sync files if they don't already exist. Returns ------- one.alf.io.AlfBunch A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and the corresponding channel numbers. dict, optional A map of channel names and their corresponding indices for sync channels, if chmap is None. """ if not chmap: if not timeline: meta = alfio.load_object(alf_path, 'DAQdata', namespace='timeline', attribute='meta')['meta'] else: meta = timeline['meta'] chmap = timeline_meta2chmap(meta, include_channels=all_default_labels()) try: sync = alfio.load_object(alf_path, 'sync') except alferr.ALFObjectNotFound: if not timeline: timeline = alfio.load_object(alf_path, 'DAQdata') sync = extract_sync_timeline(timeline, chmap=chmap) if save: alfio.save_object_npy(alf_path, sync, object='sync', namespace='timeline') return sync, chmap
[docs] def extract_sync_timeline(timeline, chmap=None, floor_percentile=10, threshold=None): """ Load sync channels from a timeline object. Note: Because the scan frequency is typically faster than the sample rate, the position and edge count channels may detect more than one front between samples. Therefore for these, the raw data is more accurate than the extracted polarities. Parameters ---------- timeline : dict, str, pathlib.Path A timeline object or the file or folder path of the _timeline_DAQdata files. chmap : dict A map of channel names and channel IDs. floor_percentile : float 10% removes the percentile value of the analog trace before thresholding. This is to avoid DC offset drift. threshold : float, dict of str: float The threshold for applying to analogue channels. If None, take mean after subtracting floor percentile offset. Returns ------- one.alf.io.AlfBunch A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and the corresponding channel numbers. dict, optional A map of channel names and their corresponding indices for sync channels, if chmap is None. """ if isinstance(timeline, (str, Path)): timeline = alfio.load_object(timeline, 'DAQdata', namespace='timeline') assert timeline.keys() >= {'timestamps', 'raw', 'meta'}, 'Timeline object missing attributes' # If no channel map was passed, load it from 'wiring' file, or extract from meta file return_chmap = chmap is None chmap = chmap or timeline.get('wiring') or timeline_meta2chmap(timeline['meta']) # Initialize sync object sync = alfio.AlfBunch((k, np.array([], dtype=d)) for k, d in (('times', 'f'), ('channels', 'u1'), ('polarities', 'i1'))) for label, i in chmap.items(): try: info = next(x for x in timeline['meta']['inputs'] if x['name'].lower() == label.lower()) except StopIteration: logger.warning('sync channel "%s" not found', label) continue raw = timeline['raw'][:, info['arrayColumn'] - 1] # -1 because MATLAB indexes from 1 if info['measurement'] == 'Voltage': # Get TLLs by applying a threshold to the diff of voltage samples offset = np.percentile(raw, floor_percentile, axis=0) daqID = info['daqChannelID'] logger.debug(f'{label} ({daqID}): estimated analogue channel DC Offset approx. {np.mean(offset):.2f}') step = threshold.get(label) if isinstance(threshold, dict) else threshold if step is None: step = np.max(raw - offset) / 2 iup = neurodsp.utils.rises(raw - offset, step=step, analog=True) idown = neurodsp.utils.falls(raw - offset, step=step, analog=True) pol = np.r_[np.ones_like(iup), -np.ones_like(idown)].astype('i1') ind = np.r_[iup, idown] sync.polarities = np.concatenate((sync.polarities, pol)) elif info['measurement'] == 'EdgeCount': # Monotonically increasing values; extract indices where delta == 1 raw = correct_counter_discontinuities(raw) ind, = np.where(np.diff(raw)) ind += 1 sync.polarities = np.concatenate((sync.polarities, np.ones_like(ind, dtype='i1'))) elif info['measurement'] == 'Position': # Bidirectional; extract indices where delta != 0 raw = correct_counter_discontinuities(raw) d = np.diff(raw) ind, = np.where(~np.isclose(d, 0)) sync.polarities = np.concatenate((sync.polarities, np.sign(d[ind]).astype('i1'))) ind += 1 else: raise NotImplementedError(f'{info["measurement"]} sync extraction') # Append timestamps of indices and channel index to sync arrays sync.times = np.concatenate((sync.times, timeline['timestamps'][ind])) sync.channels = np.concatenate((sync.channels, np.full(ind.shape, i, dtype='u1'))) # Sort arrays by time assert sync.check_dimensions == 0 t_ind = np.argsort(sync.times) for k in sync: sync[k] = sync[k][t_ind] if return_chmap: return sync, chmap else: return sync
[docs] def timeline_meta2wiring(path, save=False): """ Given a timeline meta data object, return a dictionary of wiring info. Parameters ---------- path : str, pathlib.Path The path of the timeline meta file, _timeline_DAQdata.meta. save : bool If true, save the timeline wiring file in the same location as the meta file, _timeline_DAQData.wiring.json. Returns ------- dict A dictionary with base keys {'SYSTEM', 'SYNC_WIRING_DIGITAL', 'SYNC_WIRING_ANALOG'}, the latter of which contain maps of channel names and their IDs. pathlib.Path If save=True, returns the path of the wiring file. """ meta = alfio.load_object(path, 'DAQdata', namespace='timeline', attribute='meta').get('meta') assert meta, 'No meta data in timeline object' wiring = defaultdict(dict, SYSTEM='timeline') for input in meta['inputs']: key = 'SYNC_WIRING_' + ('ANALOG' if input['measurement'] == 'Voltage' else 'DIGITAL') wiring[key][input['daqChannelID']] = input['name'] if save: out_path = Path(path) / to_alf('DAQ data', 'wiring', 'json', namespace='timeline') with open(out_path, 'w') as fp: json.dump(wiring, fp) return dict(wiring), out_path return dict(wiring)
[docs] def timeline_meta2chmap(meta, exclude_channels=None, include_channels=None): """ Convert a timeline meta object to a sync channel map. Parameters ---------- meta : dict A loaded timeline metadata file, i.e. _timeline_DAQdata.meta. exclude_channels : list An optional list of channels to exclude from the channel map. include_channels : list An optional list of channels to include from the channel map, takes priority over the exclude list. Returns ------- dict A map of channel names and their corresponding indices for sync channels. """ chmap = {} for input in meta.get('inputs', []): if (include_channels is not None and input['name'] not in include_channels) or \ (exclude_channels and input['name'] in exclude_channels): continue chmap[input['name']] = input['arrayColumn'] return chmap
[docs] def timeline_get_channel(timeline, channel_name): """ Given a timeline object, returns the vector of values recorded from a given channel name. Parameters ---------- timeline : one.alf.io.AlfBunch A loaded timeline object. channel_name : str The name of a channel to extract. Returns ------- numpy.array The channel data. """ idx = next(ch['arrayColumn'] for ch in timeline['meta']['inputs'] if ch['name'] == channel_name) return timeline['raw'][:, idx - 1] # -1 because MATLAB indices start from 1, not 0