Source code for ibllib.io.raw_data_loaders

"""
Raw Data Loader functions for PyBpod rig.

Module contains one loader function per raw datafile.
"""
import re
import json
import logging
import wave
from collections import OrderedDict
from datetime import datetime
from pathlib import Path, PureWindowsPath
from typing import Union

from dateutil import parser as dateparser
from packaging import version
import numpy as np
import pandas as pd

from iblutil.io import jsonable
from ibllib.io.video import assert_valid_label
from ibllib.time import uncycle_pgts, convert_pgts, date2isostr

_logger = logging.getLogger(__name__)


[docs] def trial_times_to_times(raw_trial): """ Parse and convert all trial timestamps to "absolute" time. Float64 seconds from session start. 0---BpodStart---TrialStart0---------TrialEnd0-----TrialStart1---TrialEnd1...0---ts0---ts1--- tsN...absTS = tsN + TrialStartN - BpodStart Bpod timestamps are in microseconds (µs) PyBpod timestamps are is seconds (s) :param raw_trial: raw trial data :type raw_trial: dict :return: trial data with modified timestamps :rtype: dict """ ts_bs = raw_trial['behavior_data']['Bpod start timestamp'] ts_ts = raw_trial['behavior_data']['Trial start timestamp'] # ts_te = raw_trial['behavior_data']['Trial end timestamp'] def convert(ts): return ts + ts_ts - ts_bs converted_events = {} for k, v in raw_trial['behavior_data']['Events timestamps'].items(): converted_events.update({k: [convert(i) for i in v]}) raw_trial['behavior_data']['Events timestamps'] = converted_events converted_states = {} for k, v in raw_trial['behavior_data']['States timestamps'].items(): converted_states.update({k: [[convert(i) for i in x] for x in v]}) raw_trial['behavior_data']['States timestamps'] = converted_states shift = raw_trial['behavior_data']['Bpod start timestamp'] raw_trial['behavior_data']['Bpod start timestamp'] -= shift raw_trial['behavior_data']['Trial start timestamp'] -= shift raw_trial['behavior_data']['Trial end timestamp'] -= shift assert raw_trial['behavior_data']['Bpod start timestamp'] == 0 return raw_trial
[docs] def load_bpod(session_path, task_collection='raw_behavior_data'): """ Load both settings and data from bpod (.json and .jsonable) :param session_path: Absolute path of session folder :param task_collection: Collection within sesison path with behavior data :return: dict settings and list of dicts data """ return load_settings(session_path, task_collection), load_data(session_path, task_collection)
[docs] def load_data(session_path: Union[str, Path], task_collection='raw_behavior_data', time='absolute'): """ Load PyBpod data files (.jsonable). Bpod timestamps are in microseconds (µs) PyBpod timestamps are is seconds (s) :param session_path: Absolute path of session folder :type session_path: str, Path :return: A list of len ntrials each trial being a dictionary :rtype: list of dicts """ if session_path is None: _logger.warning('No data loaded: session_path is None') return path = Path(session_path).joinpath(task_collection) path = next(path.glob('_iblrig_taskData.raw*.jsonable'), None) if not path: _logger.warning('No data loaded: could not find raw data file') return None data = jsonable.read(path) if time == 'absolute': data = [trial_times_to_times(t) for t in data] return data
[docs] def load_camera_frameData(session_path, camera: str = 'left', raw: bool = False) -> pd.DataFrame: """Loads binary frame data from Bonsai camera recording workflow. Args: session_path (StrPath): Path to session folder camera (str, optional): Load FramsData for specific camera. Defaults to 'left'. raw (bool, optional): Whether to return raw or parsed data. Defaults to False. Returns: parsed: (raw=False, Default) pandas.DataFrame: 4 int64 columns: { Timestamp, # float64 (seconds from session start) embeddedTimeStamp, # float64 (seconds from session start) embeddedFrameCounter, # int64 (Frame number from session start) embeddedGPIOPinState # object (State of each of the 4 GPIO pins as a # list of numpy boolean arrays # e.g. np.array([True, False, False, False]) } raw: pandas.DataFrame: 4 int64 columns: { Timestamp, # UTC ticks from BehaviorPC # (100's of ns since midnight 1/1/0001) embeddedTimeStamp, # Camera timestamp (Needs unclycling and conversion) embeddedFrameCounter, # Frame counter (int) embeddedGPIOPinState # GPIO pin state integer representation of 4 pins } """ camera = assert_valid_label(camera) fpath = Path(session_path).joinpath("raw_video_data") fpath = next(fpath.glob(f"_iblrig_{camera}Camera.frameData*.bin"), None) assert fpath, f"{fpath}\nFile not Found: Could not find bin file for cam <{camera}>" rdata = np.fromfile(fpath, dtype=np.float64) assert rdata.size % 4 == 0, "Dimension mismatch: bin file length is not mod 4" rows = int(rdata.size / 4) data = np.reshape(rdata.astype(np.int64), (rows, 4)) df_dict = dict.fromkeys( ["Timestamp", "embeddedTimeStamp", "embeddedFrameCounter", "embeddedGPIOPinState"] ) df = pd.DataFrame(data, columns=df_dict.keys()) if raw: return df df_dict["Timestamp"] = (data[:, 0] - data[0, 0]) / 10_000_000 # in seconds from first frame camerats = uncycle_pgts(convert_pgts(data[:, 1])) df_dict["embeddedTimeStamp"] = camerats - camerats[0] # in seconds from first frame df_dict["embeddedFrameCounter"] = data[:, 2] - data[0, 2] # from start gpio = (np.right_shift(np.tile(data[:, 3], (4, 1)).T, np.arange(31, 27, -1)) & 0x1) == 1 df_dict["embeddedGPIOPinState"] = [np.array(x) for x in gpio.tolist()] parsed_df = pd.DataFrame.from_dict(df_dict) return parsed_df
[docs] def load_camera_ssv_times(session_path, camera: str): """ Load the bonsai frame and camera timestamps from Camera.timestamps.ssv NB: For some sessions the frame times are in the first column, in others the order is reversed. NB: If using the new bin file the bonsai_times is a float in seconds since first frame :param session_path: Absolute path of session folder :param camera: Name of the camera to load, e.g. 'left' :return: array of datetimes, array of frame times in seconds """ camera = assert_valid_label(camera) video_path = Path(session_path).joinpath('raw_video_data') if next(video_path.glob(f'_iblrig_{camera}Camera.frameData*.bin'), None): df = load_camera_frameData(session_path, camera=camera) return df['Timestamp'].values, df['embeddedTimeStamp'].values file = next(video_path.glob(f'_iblrig_{camera.lower()}Camera.timestamps*.ssv'), None) if not file: file = str(video_path.joinpath(f'_iblrig_{camera.lower()}Camera.timestamps.ssv')) raise FileNotFoundError(file + ' not found') # NB: Numpy has deprecated support for non-naive timestamps. # Converting them is extremely slow: 6000 timestamps takes 0.8615s vs 0.0352s. # from datetime import timezone # c = {0: lambda x: datetime.fromisoformat(x).astimezone(timezone.utc).replace(tzinfo=None)} # Determine the order of the columns by reading one line and testing whether the first value # is an integer or not. with open(file, 'r') as f: line = f.readline() type_map = OrderedDict(bonsai='<M8[ns]', camera='<u4') try: int(line.split(' ')[1]) except ValueError: type_map.move_to_end('bonsai') ssv_params = dict(names=type_map.keys(), dtype=','.join(type_map.values()), delimiter=' ') ssv_times = np.genfromtxt(file, **ssv_params) # np.loadtxt is slower for some reason bonsai_times = ssv_times['bonsai'] camera_times = uncycle_pgts(convert_pgts(ssv_times['camera'])) return bonsai_times, camera_times
[docs] def load_embedded_frame_data(session_path, label: str, raw=False): """ Load the embedded frame count and GPIO for a given session. If the file doesn't exist, or is empty, None values are returned. :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param raw: If True the raw data are returned without preprocessing, otherwise frame count is returned starting from 0 and the GPIO is returned as a dict of indices :return: The frame count, GPIO """ count = load_camera_frame_count(session_path, label, raw=raw) gpio = load_camera_gpio(session_path, label, as_dicts=not raw) return count, gpio
[docs] def load_camera_frame_count(session_path, label: str, raw=True): """ Load the embedded frame count for a given session. If the file doesn't exist, or is empty, a None value is returned. :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param raw: If True the raw data are returned without preprocessing, otherwise frame count is returned starting from 0 :return: The frame count """ if session_path is None: return label = assert_valid_label(label) video_path = Path(session_path).joinpath('raw_video_data') if next(video_path.glob(f'_iblrig_{label}Camera.frameData*.bin'), None): df = load_camera_frameData(session_path, camera=label) return df['embeddedFrameCounter'].values # Load frame count glob = video_path.glob(f'_iblrig_{label}Camera.frame_counter*.bin') count_file = next(glob, None) count = np.fromfile(count_file, dtype=np.float64).astype(int) if count_file else [] if len(count) == 0: return if not raw: count -= count[0] # start from zero return count
[docs] def load_camera_gpio(session_path, label: str, as_dicts=False): """ Load the GPIO for a given session. If the file doesn't exist, or is empty, a None value is returned. The raw binary file contains uint32 values (saved as doubles) where the first 4 bits represent the state of each of the 4 GPIO pins. The array is expanded to an n x 4 array by shifting each bit to the end and checking whether it is 0 (low state) or 1 (high state). :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param as_dicts: If False the raw data are returned boolean array with shape (n_frames, n_pins) otherwise GPIO is returned as a list of dictionaries with keys ('indices', 'polarities'). :return: An nx4 boolean array where columns represent state of GPIO pins 1-4. If as_dicts is True, a list of dicts is returned with keys ('indices', 'polarities'), or None if the dictionary is empty. """ if session_path is None: return raw_path = Path(session_path).joinpath('raw_video_data') label = assert_valid_label(label) # Load pin state if next(raw_path.glob(f'_iblrig_{label}Camera.frameData*.bin'), False): df = load_camera_frameData(session_path, camera=label, raw=False) gpio = np.array([x for x in df['embeddedGPIOPinState'].values]) if len(gpio) == 0: return [None] * 4 if as_dicts else None else: GPIO_file = next(raw_path.glob(f'_iblrig_{label}Camera.GPIO*.bin'), None) # This deals with missing and empty files the same gpio = np.fromfile(GPIO_file, dtype=np.float64).astype(np.uint32) if GPIO_file else [] # Check values make sense (4 pins = 16 possible values) if not np.isin(gpio, np.left_shift(np.arange(2 ** 4, dtype=np.uint32), 32 - 4)).all(): _logger.warning('Unexpected GPIO values; decoding may fail') if len(gpio) == 0: return [None] * 4 if as_dicts else None # 4 pins represented as uint32 # For each pin, shift its bit to the end and check the bit is set gpio = (np.right_shift(np.tile(gpio, (4, 1)).T, np.arange(31, 27, -1)) & 0x1) == 1 if as_dicts: if not gpio.any(): _logger.error('No GPIO changes') return [None] * 4 # Find state changes for each pin and construct a dict of indices and polarities for each edges = np.vstack((gpio[0, :], np.diff(gpio.astype(int), axis=0))) # gpio = [(ind := np.where(edges[:, i])[0], edges[ind, i]) for i in range(4)] # gpio = [dict(zip(('indices', 'polarities'), x)) for x in gpio_] # py3.8 gpio = [{'indices': np.where(edges[:, i])[0], 'polarities': edges[edges[:, i] != 0, i]} for i in range(4)] # Replace empty dicts with None gpio = [None if x['indices'].size == 0 else x for x in gpio] return gpio
def _read_settings_json_compatibility_enforced(settings): """ Patch iblrig settings for compatibility across rig versions. Parameters ---------- settings : pathlib.Path, dict Either a _iblrig_taskSettings.raw.json file path or the loaded settings. Returns ------- dict The task settings patched for compatibility. """ if isinstance(settings, dict): md = settings.copy() else: with open(settings) as js: md = json.load(js) if 'IS_MOCK' not in md: md['IS_MOCK'] = False # Many v < 8 sessions had both version and version tag keys. v > 8 have a version tag. # Some sessions have neither key. From v8 onwards we will use IBLRIG_VERSION to test rig # version, however some places may still use the version tag. if 'IBLRIG_VERSION_TAG' not in md.keys(): md['IBLRIG_VERSION_TAG'] = md.get('IBLRIG_VERSION', '') if 'IBLRIG_VERSION' not in md.keys(): md['IBLRIG_VERSION'] = md['IBLRIG_VERSION_TAG'] elif all([md['IBLRIG_VERSION'], md['IBLRIG_VERSION_TAG']]): # This may not be an issue; not sure what the intended difference between these keys was assert md['IBLRIG_VERSION'] == md['IBLRIG_VERSION_TAG'], 'version and version tag mismatch' # Test version can be parsed. If not, log an error and set the version to nothing try: version.parse(md['IBLRIG_VERSION'] or '0') except version.InvalidVersion as ex: _logger.error('%s in iblrig settings, this may affect extraction', ex) # try a more relaxed version parse laxed_parse = re.search(r'^\d+\.\d+\.\d+', md['IBLRIG_VERSION']) # Set the tag as the invalid version md['IBLRIG_VERSION_TAG'] = md['IBLRIG_VERSION'] # overwrite version with either successfully parsed one or an empty string md['IBLRIG_VERSION'] = laxed_parse.group() if laxed_parse else '' if 'device_sound' not in md: # sound device must be defined in version 8 and later # FIXME this assertion will cause tests to break assert version.parse(md['IBLRIG_VERSION'] or '0') < version.parse('8.0.0') # in v7 we must infer the device from the sampling frequency if SD is None if 'sounddevice' in md.get('SD', ''): device = 'xonar' else: freq_map = {192000: 'xonar', 96000: 'harp', 44100: 'sysdefault'} device = freq_map.get(md.get('SOUND_SAMPLE_FREQ'), 'unknown') md['device_sound'] = {'OUTPUT': device} # 2018-12-05 Version 3.2.3 fixes (permanent fixes in IBL_RIG from 3.2.4 on) if md['IBLRIG_VERSION'] == '': pass elif version.parse(md['IBLRIG_VERSION']) >= version.parse('8.0.0'): md['SESSION_NUMBER'] = str(md['SESSION_NUMBER']).zfill(3) md['PYBPOD_BOARD'] = md['RIG_NAME'] md['PYBPOD_CREATOR'] = (md['ALYX_USER'], '') md['SESSION_DATE'] = md['SESSION_START_TIME'][:10] md['SESSION_DATETIME'] = md['SESSION_START_TIME'] elif version.parse(md['IBLRIG_VERSION']) <= version.parse('3.2.3'): if 'LAST_TRIAL_DATA' in md.keys(): md.pop('LAST_TRIAL_DATA') if 'weighings' in md['PYBPOD_SUBJECT_EXTRA'].keys(): md['PYBPOD_SUBJECT_EXTRA'].pop('weighings') if 'water_administration' in md['PYBPOD_SUBJECT_EXTRA'].keys(): md['PYBPOD_SUBJECT_EXTRA'].pop('water_administration') if 'IBLRIG_COMMIT_HASH' not in md.keys(): md['IBLRIG_COMMIT_HASH'] = 'f9d8905647dbafe1f9bdf78f73b286197ae2647b' # parse the date format to Django supported ISO dt = dateparser.parse(md['SESSION_DATETIME']) md['SESSION_DATETIME'] = date2isostr(dt) # add the weight key if it doesn't already exist if 'SUBJECT_WEIGHT' not in md: md['SUBJECT_WEIGHT'] = None return md
[docs] def load_settings(session_path: Union[str, Path], task_collection='raw_behavior_data'): """ Load PyBpod Settings files (.json). [description] :param session_path: Absolute path of session folder :type session_path: str, Path :return: Settings dictionary :rtype: dict """ if session_path is None: _logger.warning("No data loaded: session_path is None") return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_taskSettings.raw*.json"), None) if not path: _logger.warning("No data loaded: could not find raw settings file") return None settings = _read_settings_json_compatibility_enforced(path) return settings
[docs] def load_stim_position_screen(session_path, task_collection='raw_behavior_data'): path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_stimPositionScreen.raw*.csv"), None) data = pd.read_csv(path, sep=',', header=None, on_bad_lines='skip') data.columns = ['contrast', 'position', 'bns_ts'] data['bns_ts'] = pd.to_datetime(data['bns_ts']) return data
[docs] def load_encoder_events(session_path, task_collection='raw_behavior_data', settings=False): """ Load Rotary Encoder (RE) events raw data file. Assumes that a folder called "raw_behavior_data" exists in folder. Events number correspond to following bpod states: 1: correct / hide_stim 2: stim_on 3: closed_loop 4: freeze_error / freeze_correct >>> data.columns >>> ['re_ts', # Rotary Encoder Timestamp (ms) 'numpy.int64' 'sm_ev', # State Machine Event 'numpy.int64' 'bns_ts'] # Bonsai Timestamp (int) 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: [description] :type session_path: [type] :return: dataframe w/ 3 cols and (ntrials * 3) lines :rtype: Pandas.DataFrame """ if session_path is None: return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_encoderEvents.raw*.ssv"), None) if not settings: settings = load_settings(session_path, task_collection=task_collection) if settings is None or not settings.get('IBLRIG_VERSION'): settings = {'IBLRIG_VERSION': '100.0.0'} # auto-detect old files when version is not labeled with open(path) as fid: line = fid.readline() if line.startswith('Event') and 'StateMachine' in line: settings = {'IBLRIG_VERSION': '0.0.0'} if not path: return None if version.parse(settings['IBLRIG_VERSION']) >= version.parse('5.0.0'): return _load_encoder_events_file_ge5(path) else: return _load_encoder_events_file_lt5(path)
def _load_encoder_ssv_file(file_path, **kwargs): file_path = Path(file_path) if file_path.stat().st_size == 0: _logger.error(f"{file_path.name} is an empty file. ") raise ValueError(f"{file_path.name} is an empty file. ABORT EXTRACTION. ") return pd.read_csv(file_path, sep=' ', header=None, on_bad_lines='skip', **kwargs) def _load_encoder_positions_file_lt5(file_path): """ File loader without the session overhead :param file_path: :return: dataframe of encoder events """ data = _load_encoder_ssv_file(file_path, names=['_', 're_ts', 're_pos', 'bns_ts', '__'], usecols=['re_ts', 're_pos', 'bns_ts']) return _groom_wheel_data_lt5(data, label='_iblrig_encoderPositions.raw.ssv', path=file_path) def _load_encoder_positions_file_ge5(file_path): """ File loader without the session overhead :param file_path: :return: dataframe of encoder events """ data = _load_encoder_ssv_file(file_path, names=['re_ts', 're_pos', '_'], usecols=['re_ts', 're_pos']) return _groom_wheel_data_ge5(data, label='_iblrig_encoderPositions.raw.ssv', path=file_path) def _load_encoder_events_file_lt5(file_path): """ File loader without the session overhead :param file_path: :return: dataframe of encoder events """ data = _load_encoder_ssv_file(file_path, names=['_', 're_ts', '__', 'sm_ev', 'bns_ts', '___'], usecols=['re_ts', 'sm_ev', 'bns_ts']) return _groom_wheel_data_lt5(data, label='_iblrig_encoderEvents.raw.ssv', path=file_path) def _load_encoder_events_file_ge5(file_path): """ File loader without the session overhead :param file_path: :return: dataframe of encoder events """ data = _load_encoder_ssv_file(file_path, names=['re_ts', 'sm_ev', '_'], usecols=['re_ts', 'sm_ev']) return _groom_wheel_data_ge5(data, label='_iblrig_encoderEvents.raw.ssv', path=file_path)
[docs] def load_encoder_positions(session_path, task_collection='raw_behavior_data', settings=False): """ Load Rotary Encoder (RE) positions from raw data file within a session path. Assumes that a folder called "raw_behavior_data" exists in folder. Positions are RE ticks [-512, 512] == [-180º, 180º] 0 == trial stim init position Positive nums are rightwards movements (mouse) or RE CW (mouse) Variable line number, depends on movements. Raw datafile Columns: Position, RE timestamp, RE Position, Bonsai Timestamp Position is always equal to 'Position' so this column was dropped. >>> data.columns >>> ['re_ts', # Rotary Encoder Timestamp (ms) 'numpy.int64' 're_pos', # Rotary Encoder position (ticks) 'numpy.int64' 'bns_ts'] # Bonsai Timestamp 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: Absolute path of session folder :type session_path: str :return: dataframe w/ 3 cols and N positions :rtype: Pandas.DataFrame """ if session_path is None: return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_encoderPositions.raw*.ssv"), None) if not settings: settings = load_settings(session_path, task_collection=task_collection) if settings is None or not settings.get('IBLRIG_VERSION'): settings = {'IBLRIG_VERSION': '100.0.0'} # auto-detect old files when version is not labeled with open(path) as fid: line = fid.readline() if line.startswith('Position'): settings = {'IBLRIG_VERSION': '0.0.0'} if not path: _logger.warning("No data loaded: could not find raw encoderPositions file") return None if version.parse(settings['IBLRIG_VERSION']) >= version.parse('5.0.0'): return _load_encoder_positions_file_ge5(path) else: return _load_encoder_positions_file_lt5(path)
[docs] def load_encoder_trial_info(session_path, task_collection='raw_behavior_data'): """ Load Rotary Encoder trial info from raw data file. Assumes that a folder calles "raw_behavior_data" exists in folder. NOTE: Last trial probably inexistent data (Trial info is sent on trial start and data is only saved on trial exit...) max(trialnum) should be N+1 if N is the amount of trial data saved. Raw datafile Columns: >>> data.columns >>> ['trial_num', # Trial Number 'numpy.int64' 'stim_pos_init', # Initial position of visual stim 'numpy.int64' 'stim_contrast', # Contrast of visual stimulus 'numpy.float64' 'stim_freq', # Frequency of gabor patch 'numpy.float64' 'stim_angle', # Angle of Gabor 0 = Vertical 'numpy.float64' 'stim_gain', # Wheel gain (mm/º of stim) 'numpy.float64' 'stim_sigma', # Size of patch 'numpy.float64' 'stim_phase', # Phase of gabor 'numpy.float64' 'bns_ts' ] # Bonsai Timestamp 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: Absoulte path of session folder :type session_path: str :return: dataframe w/ 9 cols and ntrials lines :rtype: Pandas.DataFrame """ if session_path is None: return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_encoderTrialInfo.raw*.ssv"), None) if not path: return None data = pd.read_csv(path, sep=' ', header=None) data = data.drop([9], axis=1) data.columns = ['trial_num', 'stim_pos_init', 'stim_contrast', 'stim_freq', 'stim_angle', 'stim_gain', 'stim_sigma', 'stim_phase', 'bns_ts'] # return _groom_wheel_data_lt5(data, label='_iblrig_encoderEvents.raw.ssv', path=path) return data
[docs] def load_ambient_sensor(session_path, task_collection='raw_behavior_data'): """ Load Ambient Sensor data from session. Probably could be extracted to DatasetTypes: _ibl_trials.temperature_C, _ibl_trials.airPressure_mb, _ibl_trials.relativeHumidity Returns a list of dicts one dict per trial. dict keys are: dict_keys(['Temperature_C', 'AirPressure_mb', 'RelativeHumidity']) :param session_path: Absoulte path of session folder :type session_path: str :return: list of dicts :rtype: list """ if session_path is None: return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_ambientSensorData.raw*.jsonable"), None) if not path: return None data = [] with open(path, 'r') as f: for line in f: data.append(json.loads(line)) return data
[docs] def load_mic(session_path, task_collection='raw_behavior_data'): """ Load Microphone wav file to np.array of len nSamples :param session_path: Absolute path of session folder :type session_path: str :return: An array of values of the sound waveform :rtype: numpy.array """ if session_path is None: return path = Path(session_path).joinpath(task_collection) path = next(path.glob("_iblrig_micData.raw*.wav"), None) if not path: return None fp = wave.open(path) nchan = fp.getnchannels() N = fp.getnframes() dstr = fp.readframes(N * nchan) data = np.frombuffer(dstr, np.int16) data = np.reshape(data, (-1, nchan)) return data
def _clean_wheel_dataframe(data, label, path): if np.any(data.isna()): _logger.warning(label + ' has missing/incomplete records \n %s', path) # first step is to re-interpret as numeric objects if not already done for col in data.columns: if data[col].dtype == object and col not in ['bns_ts']: data[col] = pd.to_numeric(data[col], errors='coerce') # then drop Nans and duplicates data.dropna(inplace=True) data.drop_duplicates(keep='first', inplace=True) data.reset_index(inplace=True) # handle the clock resets when microseconds exceed uint32 max value drop_first = False data['re_ts'] = data['re_ts'].astype(np.double, copy=False) if any(np.diff(data['re_ts']) < 0): ind = np.where(np.diff(data['re_ts']) < 0)[0] for i in ind: # the first sample may be corrupt, in this case throw away if i <= 1: drop_first = i _logger.warning(label + ' rotary encoder positions timestamps' ' first sample corrupt ' + str(path)) # if it's an uint32 wraparound, the diff should be close to 2 ** 32 elif 32 - np.log2(data['re_ts'][i] - data['re_ts'][i + 1]) < 0.2: data.loc[i + 1:, 're_ts'] = data.loc[i + 1:, 're_ts'] + 2 ** 32 # there is also the case where 2 positions are swapped and need to be swapped back elif data['re_ts'][i] > data['re_ts'][i + 1] > data['re_ts'][i - 1]: _logger.warning(label + ' rotary encoder timestamps swapped at index: ' + str(i) + ' ' + str(path)) a, b = data.iloc[i].copy(), data.iloc[i + 1].copy() data.iloc[i], data.iloc[i + 1] = b, a # if none of those 3 cases apply, raise an error else: _logger.error(label + ' Rotary encoder timestamps are not sorted.' + str(path)) data.sort_values('re_ts', inplace=True) data.reset_index(inplace=True) if drop_first is not False: data.drop(data.loc[:drop_first].index, inplace=True) data = data.reindex() return data def _groom_wheel_data_lt5(data, label='file ', path=''): """ The whole purpose of this function is to account for variability and corruption in the wheel position files. There are many possible errors described below, but nothing excludes getting new ones. """ data = _clean_wheel_dataframe(data, label, path) data.drop(data.loc[data.bns_ts.apply(len) != 33].index, inplace=True) # check if the time scale is in ms sess_len_sec = (datetime.strptime(data['bns_ts'].iloc[-1][:25], '%Y-%m-%dT%H:%M:%S.%f') - datetime.strptime(data['bns_ts'].iloc[0][:25], '%Y-%m-%dT%H:%M:%S.%f')).seconds if data['re_ts'].iloc[-1] / (sess_len_sec + 1e-6) < 1e5: # should be 1e6 normally _logger.warning('Rotary encoder reset logs events in ms instead of us: ' + 'RE firmware needs upgrading and wheel velocity is potentially inaccurate') data['re_ts'] = data['re_ts'] * 1000 return data def _groom_wheel_data_ge5(data, label='file ', path=''): """ The whole purpose of this function is to account for variability and corruption in the wheel position files. There are many possible errors described below, but nothing excludes getting new ones. """ data = _clean_wheel_dataframe(data, label, path) # check if the time scale is in ms if (data['re_ts'].iloc[-1] - data['re_ts'].iloc[0]) / 1e6 < 20: _logger.warning('Rotary encoder reset logs events in ms instead of us: ' + 'RE firmware needs upgrading and wheel velocity is potentially inaccurate') data['re_ts'] = data['re_ts'] * 1000 return data
[docs] def sync_trials_robust(t0, t1, diff_threshold=0.001, drift_threshold_ppm=200, max_shift=5, return_index=False): """ Attempts to find matching timestamps in 2 time-series that have an offset, are drifting, and are most likely incomplete: sizes don't have to match, some pulses may be missing in any series. Only works with irregular time series as it relies on the derivative to match sync. :param t0: :param t1: :param diff_threshold: :param drift_threshold_ppm: (150) :param max_shift: (200) :param return_index (False) :return: """ nsync = min(t0.size, t1.size) dt0 = np.diff(t0) dt1 = np.diff(t1) ind = np.zeros_like(dt0) * np.nan i0 = 0 i1 = 0 cdt = np.nan # the current time difference between the two series to compute drift while i0 < (nsync - 1): # look in the next max_shift events the ones whose derivative match isearch = np.arange(i1, min(max_shift + i1, dt1.size)) dec = np.abs(dt0[i0] - dt1[isearch]) < diff_threshold # another constraint is to check the dt for the maximum drift if ~np.isnan(cdt): drift_ppm = np.abs((cdt - (t0[i0] - t1[isearch])) / dt1[isearch]) * 1e6 dec = np.logical_and(dec, drift_ppm <= drift_threshold_ppm) # if one is found if np.any(dec): ii1 = np.where(dec)[0][0] ind[i0] = i1 + ii1 i1 += ii1 + 1 cdt = t0[i0 + 1] - t1[i1 + ii1] i0 += 1 it0 = np.where(~np.isnan(ind))[0] it1 = ind[it0].astype(int) ind0 = np.unique(np.r_[it0, it0 + 1]) ind1 = np.unique(np.r_[it1, it1 + 1]) if return_index: return t0[ind0], t1[ind1], ind0, ind1 else: return t0[ind0], t1[ind1]
[docs] def load_bpod_fronts(session_path: str, data: list = False, task_collection: str = 'raw_behavior_data') -> list: """load_bpod_fronts Loads BNC1 and BNC2 bpod channels times and polarities from session_path :param session_path: a valid session_path :type session_path: str :param data: pre-loaded raw data dict, defaults to False :type data: list, optional :return: List of dicts BNC1 and BNC2 {"times": np.array, "polarities":np.array} :rtype: list """ if not data: data = load_data(session_path, task_collection) BNC1_fronts = np.array([[np.nan, np.nan]]) BNC2_fronts = np.array([[np.nan, np.nan]]) for tr in data: BNC1_fronts = np.append( BNC1_fronts, np.array( [ [x, 1] for x in tr["behavior_data"]["Events timestamps"].get("BNC1High", [np.nan]) ] ), axis=0, ) BNC1_fronts = np.append( BNC1_fronts, np.array( [ [x, -1] for x in tr["behavior_data"]["Events timestamps"].get("BNC1Low", [np.nan]) ] ), axis=0, ) BNC2_fronts = np.append( BNC2_fronts, np.array( [ [x, 1] for x in tr["behavior_data"]["Events timestamps"].get("BNC2High", [np.nan]) ] ), axis=0, ) BNC2_fronts = np.append( BNC2_fronts, np.array( [ [x, -1] for x in tr["behavior_data"]["Events timestamps"].get("BNC2Low", [np.nan]) ] ), axis=0, ) BNC1_fronts = BNC1_fronts[1:, :] BNC1_fronts = BNC1_fronts[BNC1_fronts[:, 0].argsort()] BNC2_fronts = BNC2_fronts[1:, :] BNC2_fronts = BNC2_fronts[BNC2_fronts[:, 0].argsort()] BNC1 = {"times": BNC1_fronts[:, 0], "polarities": BNC1_fronts[:, 1]} BNC2 = {"times": BNC2_fronts[:, 0], "polarities": BNC2_fronts[:, 1]} return [BNC1, BNC2]
[docs] def get_port_events(trial: dict, name: str = '') -> list: """get_port_events Return all event timestamps from bpod raw data trial that match 'name' --> looks in trial['behavior_data']['Events timestamps'] :param trial: raw trial dict :type trial: dict :param name: name of event, defaults to '' :type name: str, optional :return: Sorted list of event timestamps :rtype: list TODO: add polarities? """ out: list = [] events = trial['behavior_data']['Events timestamps'] for k in events: if name in k: out.extend(events[k]) out = sorted(out) return out
[docs] def load_widefield_mmap(session_path, dtype=np.uint16, shape=(540, 640), n_frames=None, mode='r'): """ TODO Document this function Parameters ---------- session_path Returns ------- """ filepath = Path(session_path).joinpath('raw_widefield_data').glob('widefield.raw.*.dat') filepath = next(filepath, None) if not filepath: _logger.warning("No data loaded: could not find raw data file") return None if type(dtype) is str: dtype = np.dtype(dtype) if n_frames is None: # Get the number of samples from the file size n_frames = int(filepath.stat().st_size / (np.prod(shape) * dtype.itemsize)) return np.memmap(str(filepath), mode=mode, dtype=dtype, shape=(int(n_frames), *shape))
[docs] def patch_settings(session_path, collection='raw_behavior_data', new_collection=None, subject=None, number=None, date=None): """Modify various details in a settings file. This function makes it easier to change things like subject name in a settings as it will modify the subject name in the myriad paths. NB: This saves the settings into the same location it was loaded from. Parameters ---------- session_path : str, pathlib.Path The session path containing the settings file. collection : str The subfolder containing the settings file. new_collection : str An optional new subfolder to change in the settings paths. subject : str An optional new subject name to change in the settings. number : str, int An optional new number to change in the settings. date : str, datetime.date An optional date to change in the settings. Returns ------- dict The modified settings. Examples -------- File is in /data/subject/2020-01-01/002/raw_behavior_data. Patch the file then move to new location. >>> patch_settings('/data/subject/2020-01-01/002', number='001') >>> shutil.move('/data/subject/2020-01-01/002/raw_behavior_data/', '/data/subject/2020-01-01/001/raw_behavior_data/') File is moved into new collection within the same session, then patched. >>> shutil.move('./subject/2020-01-01/002/raw_task_data_00/', './subject/2020-01-01/002/raw_task_data_01/') >>> patch_settings('/data/subject/2020-01-01/002', collection='raw_task_data_01', new_collection='raw_task_data_01') Update subject, date and number. >>> new_session_path = Path('/data/foobar/2024-02-24/002') >>> old_session_path = Path('/data/baz/2024-02-23/001') >>> patch_settings(old_session_path, collection='raw_task_data_00', ... subject=new_session_path.parts[-3], date=new_session_path.parts[-2], number=new_session_path.parts[-1]) >>> shutil.move(old_session_path, new_session_path) """ settings = load_settings(session_path, collection) if not settings: raise IOError('Settings file not found') filename = PureWindowsPath(settings.get('SETTINGS_FILE_PATH', '_iblrig_taskSettings.raw.json')).name file_path = Path(session_path).joinpath(collection, filename) if subject: # Patch subject name old_subject = settings['SUBJECT_NAME'] settings['SUBJECT_NAME'] = subject for k in settings.keys(): if isinstance(settings[k], str): settings[k] = settings[k].replace(f'\\Subjects\\{old_subject}', f'\\Subjects\\{subject}') if 'SESSION_NAME' in settings: settings['SESSION_NAME'] = '\\'.join([subject, *settings['SESSION_NAME'].split('\\')[1:]]) settings.pop('PYBPOD_SUBJECT_EXTRA', None) # Get rid of Alyx subject info if date: # Patch session datetime date = str(date) old_date = settings['SESSION_DATE'] settings['SESSION_DATE'] = date for k in settings.keys(): if isinstance(settings[k], str): settings[k] = settings[k].replace( f'\\{settings["SUBJECT_NAME"]}\\{old_date}', f'\\{settings["SUBJECT_NAME"]}\\{date}' ) settings['SESSION_DATETIME'] = date + settings['SESSION_DATETIME'][10:] if 'SESSION_END_TIME' in settings: settings['SESSION_END_TIME'] = date + settings['SESSION_END_TIME'][10:] if 'SESSION_START_TIME' in settings: settings['SESSION_START_TIME'] = date + settings['SESSION_START_TIME'][10:] if number: # Patch session number old_number = settings['SESSION_NUMBER'] if isinstance(number, int): number = f'{number:03}' settings['SESSION_NUMBER'] = number for k in settings.keys(): if isinstance(settings[k], str): settings[k] = settings[k].replace( f'\\{settings["SESSION_DATE"]}\\{old_number}', f'\\{settings["SESSION_DATE"]}\\{number}' ) if new_collection: if 'SESSION_RAW_DATA_FOLDER' not in settings: _logger.warning('SESSION_RAW_DATA_FOLDER key not in settings; collection not updated') else: old_path = settings['SESSION_RAW_DATA_FOLDER'] new_path = PureWindowsPath(settings['SESSION_RAW_DATA_FOLDER']).with_name(new_collection) for k in settings.keys(): if isinstance(settings[k], str): settings[k] = settings[k].replace(old_path, str(new_path)) with open(file_path, 'w') as fp: json.dump(settings, fp, indent=' ') return settings