"""Raw Data Loader functions for PyBpod rig.Module contains one loader function per raw datafile."""importreimportjsonimportloggingimportwavefromcollectionsimportOrderedDictfromdatetimeimportdatetimefrompathlibimportPath,PureWindowsPathfromtypingimportUnionfromdateutilimportparserasdateparserfrompackagingimportversionimportnumpyasnpimportpandasaspdfromiblutil.ioimportjsonablefromibllib.io.videoimportassert_valid_labelfromibllib.timeimportuncycle_pgts,convert_pgts,date2isostr_logger=logging.getLogger(__name__)
[docs]deftrial_times_to_times(raw_trial):""" Parse and convert all trial timestamps to "absolute" time. Float64 seconds from session start. 0---BpodStart---TrialStart0---------TrialEnd0-----TrialStart1---TrialEnd1...0---ts0---ts1--- tsN...absTS = tsN + TrialStartN - BpodStart Bpod timestamps are in microseconds (µs) PyBpod timestamps are is seconds (s) :param raw_trial: raw trial data :type raw_trial: dict :return: trial data with modified timestamps :rtype: dict """ts_bs=raw_trial['behavior_data']['Bpod start timestamp']ts_ts=raw_trial['behavior_data']['Trial start timestamp']# ts_te = raw_trial['behavior_data']['Trial end timestamp']defconvert(ts):returnts+ts_ts-ts_bsconverted_events={}fork,vinraw_trial['behavior_data']['Events timestamps'].items():converted_events.update({k:[convert(i)foriinv]})raw_trial['behavior_data']['Events timestamps']=converted_eventsconverted_states={}fork,vinraw_trial['behavior_data']['States timestamps'].items():converted_states.update({k:[[convert(i)foriinx]forxinv]})raw_trial['behavior_data']['States timestamps']=converted_statesshift=raw_trial['behavior_data']['Bpod start timestamp']raw_trial['behavior_data']['Bpod start timestamp']-=shiftraw_trial['behavior_data']['Trial start timestamp']-=shiftraw_trial['behavior_data']['Trial end timestamp']-=shiftassertraw_trial['behavior_data']['Bpod start timestamp']==0returnraw_trial
[docs]defload_bpod(session_path,task_collection='raw_behavior_data'):""" Load both settings and data from bpod (.json and .jsonable) :param session_path: Absolute path of session folder :param task_collection: Collection within sesison path with behavior data :return: dict settings and list of dicts data """returnload_settings(session_path,task_collection),load_data(session_path,task_collection)
[docs]defload_data(session_path:Union[str,Path],task_collection='raw_behavior_data',time='absolute'):""" Load PyBpod data files (.jsonable). Bpod timestamps are in microseconds (µs) PyBpod timestamps are is seconds (s) :param session_path: Absolute path of session folder :type session_path: str, Path :return: A list of len ntrials each trial being a dictionary :rtype: list of dicts """ifsession_pathisNone:_logger.warning('No data loaded: session_path is None')returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob('_iblrig_taskData.raw*.jsonable'),None)ifnotpath:_logger.warning('No data loaded: could not find raw data file')returnNonedata=jsonable.read(path)iftime=='absolute':data=[trial_times_to_times(t)fortindata]returndata
[docs]defload_camera_frameData(session_path,camera:str='left',raw:bool=False)->pd.DataFrame:"""Loads binary frame data from Bonsai camera recording workflow. Args: session_path (StrPath): Path to session folder camera (str, optional): Load FramsData for specific camera. Defaults to 'left'. raw (bool, optional): Whether to return raw or parsed data. Defaults to False. Returns: parsed: (raw=False, Default) pandas.DataFrame: 4 int64 columns: { Timestamp, # float64 (seconds from session start) embeddedTimeStamp, # float64 (seconds from session start) embeddedFrameCounter, # int64 (Frame number from session start) embeddedGPIOPinState # object (State of each of the 4 GPIO pins as a # list of numpy boolean arrays # e.g. np.array([True, False, False, False]) } raw: pandas.DataFrame: 4 int64 columns: { Timestamp, # UTC ticks from BehaviorPC # (100's of ns since midnight 1/1/0001) embeddedTimeStamp, # Camera timestamp (Needs unclycling and conversion) embeddedFrameCounter, # Frame counter (int) embeddedGPIOPinState # GPIO pin state integer representation of 4 pins } """camera=assert_valid_label(camera)fpath=Path(session_path).joinpath("raw_video_data")fpath=next(fpath.glob(f"_iblrig_{camera}Camera.frameData*.bin"),None)assertfpath,f"{fpath}\nFile not Found: Could not find bin file for cam <{camera}>"rdata=np.fromfile(fpath,dtype=np.float64)assertrdata.size%4==0,"Dimension mismatch: bin file length is not mod 4"rows=int(rdata.size/4)data=np.reshape(rdata.astype(np.int64),(rows,4))df_dict=dict.fromkeys(["Timestamp","embeddedTimeStamp","embeddedFrameCounter","embeddedGPIOPinState"])df=pd.DataFrame(data,columns=df_dict.keys())ifraw:returndfdf_dict["Timestamp"]=(data[:,0]-data[0,0])/10_000_000# in seconds from first framecamerats=uncycle_pgts(convert_pgts(data[:,1]))df_dict["embeddedTimeStamp"]=camerats-camerats[0]# in seconds from first framedf_dict["embeddedFrameCounter"]=data[:,2]-data[0,2]# from startgpio=(np.right_shift(np.tile(data[:,3],(4,1)).T,np.arange(31,27,-1))&0x1)==1df_dict["embeddedGPIOPinState"]=[np.array(x)forxingpio.tolist()]parsed_df=pd.DataFrame.from_dict(df_dict)returnparsed_df
[docs]defload_camera_ssv_times(session_path,camera:str):""" Load the bonsai frame and camera timestamps from Camera.timestamps.ssv NB: For some sessions the frame times are in the first column, in others the order is reversed. NB: If using the new bin file the bonsai_times is a float in seconds since first frame :param session_path: Absolute path of session folder :param camera: Name of the camera to load, e.g. 'left' :return: array of datetimes, array of frame times in seconds """camera=assert_valid_label(camera)video_path=Path(session_path).joinpath('raw_video_data')ifnext(video_path.glob(f'_iblrig_{camera}Camera.frameData*.bin'),None):df=load_camera_frameData(session_path,camera=camera)returndf['Timestamp'].values,df['embeddedTimeStamp'].valuesfile=next(video_path.glob(f'_iblrig_{camera.lower()}Camera.timestamps*.ssv'),None)ifnotfile:file=str(video_path.joinpath(f'_iblrig_{camera.lower()}Camera.timestamps.ssv'))raiseFileNotFoundError(file+' not found')# NB: Numpy has deprecated support for non-naive timestamps.# Converting them is extremely slow: 6000 timestamps takes 0.8615s vs 0.0352s.# from datetime import timezone# c = {0: lambda x: datetime.fromisoformat(x).astimezone(timezone.utc).replace(tzinfo=None)}# Determine the order of the columns by reading one line and testing whether the first value# is an integer or not.withopen(file,'r')asf:line=f.readline()type_map=OrderedDict(bonsai='<M8[ns]',camera='<u4')try:int(line.split(' ')[1])exceptValueError:type_map.move_to_end('bonsai')ssv_params=dict(names=type_map.keys(),dtype=','.join(type_map.values()),delimiter=' ')ssv_times=np.genfromtxt(file,**ssv_params)# np.loadtxt is slower for some reasonbonsai_times=ssv_times['bonsai']camera_times=uncycle_pgts(convert_pgts(ssv_times['camera']))returnbonsai_times,camera_times
[docs]defload_embedded_frame_data(session_path,label:str,raw=False):""" Load the embedded frame count and GPIO for a given session. If the file doesn't exist, or is empty, None values are returned. :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param raw: If True the raw data are returned without preprocessing, otherwise frame count is returned starting from 0 and the GPIO is returned as a dict of indices :return: The frame count, GPIO """count=load_camera_frame_count(session_path,label,raw=raw)gpio=load_camera_gpio(session_path,label,as_dicts=notraw)returncount,gpio
[docs]defload_camera_frame_count(session_path,label:str,raw=True):""" Load the embedded frame count for a given session. If the file doesn't exist, or is empty, a None value is returned. :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param raw: If True the raw data are returned without preprocessing, otherwise frame count is returned starting from 0 :return: The frame count """ifsession_pathisNone:returnlabel=assert_valid_label(label)video_path=Path(session_path).joinpath('raw_video_data')ifnext(video_path.glob(f'_iblrig_{label}Camera.frameData*.bin'),None):df=load_camera_frameData(session_path,camera=label)returndf['embeddedFrameCounter'].values# Load frame countglob=video_path.glob(f'_iblrig_{label}Camera.frame_counter*.bin')count_file=next(glob,None)count=np.fromfile(count_file,dtype=np.float64).astype(int)ifcount_fileelse[]iflen(count)==0:returnifnotraw:count-=count[0]# start from zeroreturncount
[docs]defload_camera_gpio(session_path,label:str,as_dicts=False):""" Load the GPIO for a given session. If the file doesn't exist, or is empty, a None value is returned. The raw binary file contains uint32 values (saved as doubles) where the first 4 bits represent the state of each of the 4 GPIO pins. The array is expanded to an n x 4 array by shifting each bit to the end and checking whether it is 0 (low state) or 1 (high state). :param session_path: Absolute path of session folder :param label: The specific video to load, one of ('left', 'right', 'body') :param as_dicts: If False the raw data are returned boolean array with shape (n_frames, n_pins) otherwise GPIO is returned as a list of dictionaries with keys ('indices', 'polarities'). :return: An nx4 boolean array where columns represent state of GPIO pins 1-4. If as_dicts is True, a list of dicts is returned with keys ('indices', 'polarities'), or None if the dictionary is empty. """ifsession_pathisNone:returnraw_path=Path(session_path).joinpath('raw_video_data')label=assert_valid_label(label)# Load pin stateifnext(raw_path.glob(f'_iblrig_{label}Camera.frameData*.bin'),False):df=load_camera_frameData(session_path,camera=label,raw=False)gpio=np.array([xforxindf['embeddedGPIOPinState'].values])iflen(gpio)==0:return[None]*4ifas_dictselseNoneelse:GPIO_file=next(raw_path.glob(f'_iblrig_{label}Camera.GPIO*.bin'),None)# This deals with missing and empty files the samegpio=np.fromfile(GPIO_file,dtype=np.float64).astype(np.uint32)ifGPIO_fileelse[]# Check values make sense (4 pins = 16 possible values)ifnotnp.isin(gpio,np.left_shift(np.arange(2**4,dtype=np.uint32),32-4)).all():_logger.warning('Unexpected GPIO values; decoding may fail')iflen(gpio)==0:return[None]*4ifas_dictselseNone# 4 pins represented as uint32# For each pin, shift its bit to the end and check the bit is setgpio=(np.right_shift(np.tile(gpio,(4,1)).T,np.arange(31,27,-1))&0x1)==1ifas_dicts:ifnotgpio.any():_logger.error('No GPIO changes')return[None]*4# Find state changes for each pin and construct a dict of indices and polarities for eachedges=np.vstack((gpio[0,:],np.diff(gpio.astype(int),axis=0)))# gpio = [(ind := np.where(edges[:, i])[0], edges[ind, i]) for i in range(4)]# gpio = [dict(zip(('indices', 'polarities'), x)) for x in gpio_] # py3.8gpio=[{'indices':np.where(edges[:,i])[0],'polarities':edges[edges[:,i]!=0,i]}foriinrange(4)]# Replace empty dicts with Nonegpio=[Noneifx['indices'].size==0elsexforxingpio]returngpio
def_read_settings_json_compatibility_enforced(settings):""" Patch iblrig settings for compatibility across rig versions. Parameters ---------- settings : pathlib.Path, dict Either a _iblrig_taskSettings.raw.json file path or the loaded settings. Returns ------- dict The task settings patched for compatibility. """ifisinstance(settings,dict):md=settings.copy()else:withopen(settings)asjs:md=json.load(js)if'IS_MOCK'notinmd:md['IS_MOCK']=False# Many v < 8 sessions had both version and version tag keys. v > 8 have a version tag.# Some sessions have neither key. From v8 onwards we will use IBLRIG_VERSION to test rig# version, however some places may still use the version tag.if'IBLRIG_VERSION_TAG'notinmd.keys():md['IBLRIG_VERSION_TAG']=md.get('IBLRIG_VERSION','')if'IBLRIG_VERSION'notinmd.keys():md['IBLRIG_VERSION']=md['IBLRIG_VERSION_TAG']elifall([md['IBLRIG_VERSION'],md['IBLRIG_VERSION_TAG']]):# This may not be an issue; not sure what the intended difference between these keys wasassertmd['IBLRIG_VERSION']==md['IBLRIG_VERSION_TAG'],'version and version tag mismatch'# Test version can be parsed. If not, log an error and set the version to nothingtry:version.parse(md['IBLRIG_VERSION']or'0')exceptversion.InvalidVersionasex:_logger.error('%s in iblrig settings, this may affect extraction',ex)# try a more relaxed version parselaxed_parse=re.search(r'^\d+\.\d+\.\d+',md['IBLRIG_VERSION'])# Set the tag as the invalid versionmd['IBLRIG_VERSION_TAG']=md['IBLRIG_VERSION']# overwrite version with either successfully parsed one or an empty stringmd['IBLRIG_VERSION']=laxed_parse.group()iflaxed_parseelse''if'device_sound'notinmd:# sound device must be defined in version 8 and later # FIXME this assertion will cause tests to breakassertversion.parse(md['IBLRIG_VERSION']or'0')<version.parse('8.0.0')# in v7 we must infer the device from the sampling frequency if SD is Noneif'sounddevice'inmd.get('SD',''):device='xonar'else:freq_map={192000:'xonar',96000:'harp',44100:'sysdefault'}device=freq_map.get(md.get('SOUND_SAMPLE_FREQ'),'unknown')md['device_sound']={'OUTPUT':device}# 2018-12-05 Version 3.2.3 fixes (permanent fixes in IBL_RIG from 3.2.4 on)ifmd['IBLRIG_VERSION']=='':passelifversion.parse(md['IBLRIG_VERSION'])>=version.parse('8.0.0'):md['SESSION_NUMBER']=str(md['SESSION_NUMBER']).zfill(3)md['PYBPOD_BOARD']=md['RIG_NAME']md['PYBPOD_CREATOR']=(md['ALYX_USER'],'')md['SESSION_DATE']=md['SESSION_START_TIME'][:10]md['SESSION_DATETIME']=md['SESSION_START_TIME']elifversion.parse(md['IBLRIG_VERSION'])<=version.parse('3.2.3'):if'LAST_TRIAL_DATA'inmd.keys():md.pop('LAST_TRIAL_DATA')if'weighings'inmd['PYBPOD_SUBJECT_EXTRA'].keys():md['PYBPOD_SUBJECT_EXTRA'].pop('weighings')if'water_administration'inmd['PYBPOD_SUBJECT_EXTRA'].keys():md['PYBPOD_SUBJECT_EXTRA'].pop('water_administration')if'IBLRIG_COMMIT_HASH'notinmd.keys():md['IBLRIG_COMMIT_HASH']='f9d8905647dbafe1f9bdf78f73b286197ae2647b'# parse the date format to Django supported ISOdt=dateparser.parse(md['SESSION_DATETIME'])md['SESSION_DATETIME']=date2isostr(dt)# add the weight key if it doesn't already existif'SUBJECT_WEIGHT'notinmd:md['SUBJECT_WEIGHT']=Nonereturnmd
[docs]defload_settings(session_path:Union[str,Path],task_collection='raw_behavior_data'):""" Load PyBpod Settings files (.json). [description] :param session_path: Absolute path of session folder :type session_path: str, Path :return: Settings dictionary :rtype: dict """ifsession_pathisNone:_logger.warning("No data loaded: session_path is None")returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_taskSettings.raw*.json"),None)ifnotpath:_logger.warning("No data loaded: could not find raw settings file")returnNonesettings=_read_settings_json_compatibility_enforced(path)returnsettings
[docs]defload_encoder_events(session_path,task_collection='raw_behavior_data',settings=False):""" Load Rotary Encoder (RE) events raw data file. Assumes that a folder called "raw_behavior_data" exists in folder. Events number correspond to following bpod states: 1: correct / hide_stim 2: stim_on 3: closed_loop 4: freeze_error / freeze_correct >>> data.columns >>> ['re_ts', # Rotary Encoder Timestamp (ms) 'numpy.int64' 'sm_ev', # State Machine Event 'numpy.int64' 'bns_ts'] # Bonsai Timestamp (int) 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: [description] :type session_path: [type] :return: dataframe w/ 3 cols and (ntrials * 3) lines :rtype: Pandas.DataFrame """ifsession_pathisNone:returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_encoderEvents.raw*.ssv"),None)ifnotsettings:settings=load_settings(session_path,task_collection=task_collection)ifsettingsisNoneornotsettings.get('IBLRIG_VERSION'):settings={'IBLRIG_VERSION':'100.0.0'}# auto-detect old files when version is not labeledwithopen(path)asfid:line=fid.readline()ifline.startswith('Event')and'StateMachine'inline:settings={'IBLRIG_VERSION':'0.0.0'}ifnotpath:returnNoneifversion.parse(settings['IBLRIG_VERSION'])>=version.parse('5.0.0'):return_load_encoder_events_file_ge5(path)else:return_load_encoder_events_file_lt5(path)
def_load_encoder_ssv_file(file_path,**kwargs):file_path=Path(file_path)iffile_path.stat().st_size==0:_logger.error(f"{file_path.name} is an empty file. ")raiseValueError(f"{file_path.name} is an empty file. ABORT EXTRACTION. ")returnpd.read_csv(file_path,sep=' ',header=None,on_bad_lines='skip',**kwargs)def_load_encoder_positions_file_lt5(file_path):""" File loader without the session overhead :param file_path: :return: dataframe of encoder events """data=_load_encoder_ssv_file(file_path,names=['_','re_ts','re_pos','bns_ts','__'],usecols=['re_ts','re_pos','bns_ts'])return_groom_wheel_data_lt5(data,label='_iblrig_encoderPositions.raw.ssv',path=file_path)def_load_encoder_positions_file_ge5(file_path):""" File loader without the session overhead :param file_path: :return: dataframe of encoder events """data=_load_encoder_ssv_file(file_path,names=['re_ts','re_pos','_'],usecols=['re_ts','re_pos'])return_groom_wheel_data_ge5(data,label='_iblrig_encoderPositions.raw.ssv',path=file_path)def_load_encoder_events_file_lt5(file_path):""" File loader without the session overhead :param file_path: :return: dataframe of encoder events """data=_load_encoder_ssv_file(file_path,names=['_','re_ts','__','sm_ev','bns_ts','___'],usecols=['re_ts','sm_ev','bns_ts'])return_groom_wheel_data_lt5(data,label='_iblrig_encoderEvents.raw.ssv',path=file_path)def_load_encoder_events_file_ge5(file_path):""" File loader without the session overhead :param file_path: :return: dataframe of encoder events """data=_load_encoder_ssv_file(file_path,names=['re_ts','sm_ev','_'],usecols=['re_ts','sm_ev'])return_groom_wheel_data_ge5(data,label='_iblrig_encoderEvents.raw.ssv',path=file_path)
[docs]defload_encoder_positions(session_path,task_collection='raw_behavior_data',settings=False):""" Load Rotary Encoder (RE) positions from raw data file within a session path. Assumes that a folder called "raw_behavior_data" exists in folder. Positions are RE ticks [-512, 512] == [-180º, 180º] 0 == trial stim init position Positive nums are rightwards movements (mouse) or RE CW (mouse) Variable line number, depends on movements. Raw datafile Columns: Position, RE timestamp, RE Position, Bonsai Timestamp Position is always equal to 'Position' so this column was dropped. >>> data.columns >>> ['re_ts', # Rotary Encoder Timestamp (ms) 'numpy.int64' 're_pos', # Rotary Encoder position (ticks) 'numpy.int64' 'bns_ts'] # Bonsai Timestamp 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: Absolute path of session folder :type session_path: str :return: dataframe w/ 3 cols and N positions :rtype: Pandas.DataFrame """ifsession_pathisNone:returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_encoderPositions.raw*.ssv"),None)ifnotsettings:settings=load_settings(session_path,task_collection=task_collection)ifsettingsisNoneornotsettings.get('IBLRIG_VERSION'):settings={'IBLRIG_VERSION':'100.0.0'}# auto-detect old files when version is not labeledwithopen(path)asfid:line=fid.readline()ifline.startswith('Position'):settings={'IBLRIG_VERSION':'0.0.0'}ifnotpath:_logger.warning("No data loaded: could not find raw encoderPositions file")returnNoneifversion.parse(settings['IBLRIG_VERSION'])>=version.parse('5.0.0'):return_load_encoder_positions_file_ge5(path)else:return_load_encoder_positions_file_lt5(path)
[docs]defload_encoder_trial_info(session_path,task_collection='raw_behavior_data'):""" Load Rotary Encoder trial info from raw data file. Assumes that a folder calles "raw_behavior_data" exists in folder. NOTE: Last trial probably inexistent data (Trial info is sent on trial start and data is only saved on trial exit...) max(trialnum) should be N+1 if N is the amount of trial data saved. Raw datafile Columns: >>> data.columns >>> ['trial_num', # Trial Number 'numpy.int64' 'stim_pos_init', # Initial position of visual stim 'numpy.int64' 'stim_contrast', # Contrast of visual stimulus 'numpy.float64' 'stim_freq', # Frequency of gabor patch 'numpy.float64' 'stim_angle', # Angle of Gabor 0 = Vertical 'numpy.float64' 'stim_gain', # Wheel gain (mm/º of stim) 'numpy.float64' 'stim_sigma', # Size of patch 'numpy.float64' 'stim_phase', # Phase of gabor 'numpy.float64' 'bns_ts' ] # Bonsai Timestamp 'pandas.Timestamp' # pd.to_datetime(data.bns_ts) to work in datetimes :param session_path: Absoulte path of session folder :type session_path: str :return: dataframe w/ 9 cols and ntrials lines :rtype: Pandas.DataFrame """ifsession_pathisNone:returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_encoderTrialInfo.raw*.ssv"),None)ifnotpath:returnNonedata=pd.read_csv(path,sep=' ',header=None)data=data.drop([9],axis=1)data.columns=['trial_num','stim_pos_init','stim_contrast','stim_freq','stim_angle','stim_gain','stim_sigma','stim_phase','bns_ts']# return _groom_wheel_data_lt5(data, label='_iblrig_encoderEvents.raw.ssv', path=path)returndata
[docs]defload_ambient_sensor(session_path,task_collection='raw_behavior_data'):""" Load Ambient Sensor data from session. Probably could be extracted to DatasetTypes: _ibl_trials.temperature_C, _ibl_trials.airPressure_mb, _ibl_trials.relativeHumidity Returns a list of dicts one dict per trial. dict keys are: dict_keys(['Temperature_C', 'AirPressure_mb', 'RelativeHumidity']) :param session_path: Absoulte path of session folder :type session_path: str :return: list of dicts :rtype: list """ifsession_pathisNone:returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_ambientSensorData.raw*.jsonable"),None)ifnotpath:returnNonedata=[]withopen(path,'r')asf:forlineinf:data.append(json.loads(line))returndata
[docs]defload_mic(session_path,task_collection='raw_behavior_data'):""" Load Microphone wav file to np.array of len nSamples :param session_path: Absolute path of session folder :type session_path: str :return: An array of values of the sound waveform :rtype: numpy.array """ifsession_pathisNone:returnpath=Path(session_path).joinpath(task_collection)path=next(path.glob("_iblrig_micData.raw*.wav"),None)ifnotpath:returnNonefp=wave.open(path)nchan=fp.getnchannels()N=fp.getnframes()dstr=fp.readframes(N*nchan)data=np.frombuffer(dstr,np.int16)data=np.reshape(data,(-1,nchan))returndata
def_clean_wheel_dataframe(data,label,path):ifnp.any(data.isna()):_logger.warning(label+' has missing/incomplete records \n%s',path)# first step is to re-interpret as numeric objects if not already doneforcolindata.columns:ifdata[col].dtype==objectandcolnotin['bns_ts']:data[col]=pd.to_numeric(data[col],errors='coerce')# then drop Nans and duplicatesdata.dropna(inplace=True)data.drop_duplicates(keep='first',inplace=True)data.reset_index(inplace=True)# handle the clock resets when microseconds exceed uint32 max valuedrop_first=Falsedata['re_ts']=data['re_ts'].astype(np.double,copy=False)ifany(np.diff(data['re_ts'])<0):ind=np.where(np.diff(data['re_ts'])<0)[0]foriinind:# the first sample may be corrupt, in this case throw awayifi<=1:drop_first=i_logger.warning(label+' rotary encoder positions timestamps'' first sample corrupt '+str(path))# if it's an uint32 wraparound, the diff should be close to 2 ** 32elif32-np.log2(data['re_ts'][i]-data['re_ts'][i+1])<0.2:data.loc[i+1:,'re_ts']=data.loc[i+1:,'re_ts']+2**32# there is also the case where 2 positions are swapped and need to be swapped backelifdata['re_ts'][i]>data['re_ts'][i+1]>data['re_ts'][i-1]:_logger.warning(label+' rotary encoder timestamps swapped at index: '+str(i)+' '+str(path))a,b=data.iloc[i].copy(),data.iloc[i+1].copy()data.iloc[i],data.iloc[i+1]=b,a# if none of those 3 cases apply, raise an errorelse:_logger.error(label+' Rotary encoder timestamps are not sorted.'+str(path))data.sort_values('re_ts',inplace=True)data.reset_index(inplace=True)ifdrop_firstisnotFalse:data.drop(data.loc[:drop_first].index,inplace=True)data=data.reindex()returndatadef_groom_wheel_data_lt5(data,label='file ',path=''):""" The whole purpose of this function is to account for variability and corruption in the wheel position files. There are many possible errors described below, but nothing excludes getting new ones. """data=_clean_wheel_dataframe(data,label,path)data.drop(data.loc[data.bns_ts.apply(len)!=33].index,inplace=True)# check if the time scale is in mssess_len_sec=(datetime.strptime(data['bns_ts'].iloc[-1][:25],'%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(data['bns_ts'].iloc[0][:25],'%Y-%m-%dT%H:%M:%S.%f')).secondsifdata['re_ts'].iloc[-1]/(sess_len_sec+1e-6)<1e5:# should be 1e6 normally_logger.warning('Rotary encoder reset logs events in ms instead of us: '+'RE firmware needs upgrading and wheel velocity is potentially inaccurate')data['re_ts']=data['re_ts']*1000returndatadef_groom_wheel_data_ge5(data,label='file ',path=''):""" The whole purpose of this function is to account for variability and corruption in the wheel position files. There are many possible errors described below, but nothing excludes getting new ones. """data=_clean_wheel_dataframe(data,label,path)# check if the time scale is in msif(data['re_ts'].iloc[-1]-data['re_ts'].iloc[0])/1e6<20:_logger.warning('Rotary encoder reset logs events in ms instead of us: '+'RE firmware needs upgrading and wheel velocity is potentially inaccurate')data['re_ts']=data['re_ts']*1000returndata
[docs]defsync_trials_robust(t0,t1,diff_threshold=0.001,drift_threshold_ppm=200,max_shift=5,return_index=False):""" Attempts to find matching timestamps in 2 time-series that have an offset, are drifting, and are most likely incomplete: sizes don't have to match, some pulses may be missing in any series. Only works with irregular time series as it relies on the derivative to match sync. :param t0: :param t1: :param diff_threshold: :param drift_threshold_ppm: (150) :param max_shift: (200) :param return_index (False) :return: """nsync=min(t0.size,t1.size)dt0=np.diff(t0)dt1=np.diff(t1)ind=np.zeros_like(dt0)*np.nani0=0i1=0cdt=np.nan# the current time difference between the two series to compute driftwhilei0<(nsync-1):# look in the next max_shift events the ones whose derivative matchisearch=np.arange(i1,min(max_shift+i1,dt1.size))dec=np.abs(dt0[i0]-dt1[isearch])<diff_threshold# another constraint is to check the dt for the maximum driftif~np.isnan(cdt):drift_ppm=np.abs((cdt-(t0[i0]-t1[isearch]))/dt1[isearch])*1e6dec=np.logical_and(dec,drift_ppm<=drift_threshold_ppm)# if one is foundifnp.any(dec):ii1=np.where(dec)[0][0]ind[i0]=i1+ii1i1+=ii1+1cdt=t0[i0+1]-t1[i1+ii1]i0+=1it0=np.where(~np.isnan(ind))[0]it1=ind[it0].astype(int)ind0=np.unique(np.r_[it0,it0+1])ind1=np.unique(np.r_[it1,it1+1])ifreturn_index:returnt0[ind0],t1[ind1],ind0,ind1else:returnt0[ind0],t1[ind1]
[docs]defload_bpod_fronts(session_path:str,data:list=False,task_collection:str='raw_behavior_data')->list:"""load_bpod_fronts Loads BNC1 and BNC2 bpod channels times and polarities from session_path :param session_path: a valid session_path :type session_path: str :param data: pre-loaded raw data dict, defaults to False :type data: list, optional :return: List of dicts BNC1 and BNC2 {"times": np.array, "polarities":np.array} :rtype: list """ifnotdata:data=load_data(session_path,task_collection)BNC1_fronts=np.array([[np.nan,np.nan]])BNC2_fronts=np.array([[np.nan,np.nan]])fortrindata:BNC1_fronts=np.append(BNC1_fronts,np.array([[x,1]forxintr["behavior_data"]["Events timestamps"].get("BNC1High",[np.nan])]),axis=0,)BNC1_fronts=np.append(BNC1_fronts,np.array([[x,-1]forxintr["behavior_data"]["Events timestamps"].get("BNC1Low",[np.nan])]),axis=0,)BNC2_fronts=np.append(BNC2_fronts,np.array([[x,1]forxintr["behavior_data"]["Events timestamps"].get("BNC2High",[np.nan])]),axis=0,)BNC2_fronts=np.append(BNC2_fronts,np.array([[x,-1]forxintr["behavior_data"]["Events timestamps"].get("BNC2Low",[np.nan])]),axis=0,)BNC1_fronts=BNC1_fronts[1:,:]BNC1_fronts=BNC1_fronts[BNC1_fronts[:,0].argsort()]BNC2_fronts=BNC2_fronts[1:,:]BNC2_fronts=BNC2_fronts[BNC2_fronts[:,0].argsort()]BNC1={"times":BNC1_fronts[:,0],"polarities":BNC1_fronts[:,1]}BNC2={"times":BNC2_fronts[:,0],"polarities":BNC2_fronts[:,1]}return[BNC1,BNC2]
[docs]defget_port_events(trial:dict,name:str='')->list:"""get_port_events Return all event timestamps from bpod raw data trial that match 'name' --> looks in trial['behavior_data']['Events timestamps'] :param trial: raw trial dict :type trial: dict :param name: name of event, defaults to '' :type name: str, optional :return: Sorted list of event timestamps :rtype: list TODO: add polarities? """out:list=[]events=trial['behavior_data']['Events timestamps']forkinevents:ifnameink:out.extend(events[k])out=sorted(out)returnout
[docs]defload_widefield_mmap(session_path,dtype=np.uint16,shape=(540,640),n_frames=None,mode='r'):""" TODO Document this function Parameters ---------- session_path Returns ------- """filepath=Path(session_path).joinpath('raw_widefield_data').glob('widefield.raw.*.dat')filepath=next(filepath,None)ifnotfilepath:_logger.warning("No data loaded: could not find raw data file")returnNoneiftype(dtype)isstr:dtype=np.dtype(dtype)ifn_framesisNone:# Get the number of samples from the file sizen_frames=int(filepath.stat().st_size/(np.prod(shape)*dtype.itemsize))returnnp.memmap(str(filepath),mode=mode,dtype=dtype,shape=(int(n_frames),*shape))
[docs]defpatch_settings(session_path,collection='raw_behavior_data',new_collection=None,subject=None,number=None,date=None):"""Modify various details in a settings file. This function makes it easier to change things like subject name in a settings as it will modify the subject name in the myriad paths. NB: This saves the settings into the same location it was loaded from. Parameters ---------- session_path : str, pathlib.Path The session path containing the settings file. collection : str The subfolder containing the settings file. new_collection : str An optional new subfolder to change in the settings paths. subject : str An optional new subject name to change in the settings. number : str, int An optional new number to change in the settings. date : str, datetime.date An optional date to change in the settings. Returns ------- dict The modified settings. Examples -------- File is in /data/subject/2020-01-01/002/raw_behavior_data. Patch the file then move to new location. >>> patch_settings('/data/subject/2020-01-01/002', number='001') >>> shutil.move('/data/subject/2020-01-01/002/raw_behavior_data/', '/data/subject/2020-01-01/001/raw_behavior_data/') File is moved into new collection within the same session, then patched. >>> shutil.move('./subject/2020-01-01/002/raw_task_data_00/', './subject/2020-01-01/002/raw_task_data_01/') >>> patch_settings('/data/subject/2020-01-01/002', collection='raw_task_data_01', new_collection='raw_task_data_01') Update subject, date and number. >>> new_session_path = Path('/data/foobar/2024-02-24/002') >>> old_session_path = Path('/data/baz/2024-02-23/001') >>> patch_settings(old_session_path, collection='raw_task_data_00', ... subject=new_session_path.parts[-3], date=new_session_path.parts[-2], number=new_session_path.parts[-1]) >>> shutil.move(old_session_path, new_session_path) """settings=load_settings(session_path,collection)ifnotsettings:raiseIOError('Settings file not found')filename=PureWindowsPath(settings.get('SETTINGS_FILE_PATH','_iblrig_taskSettings.raw.json')).namefile_path=Path(session_path).joinpath(collection,filename)ifsubject:# Patch subject nameold_subject=settings['SUBJECT_NAME']settings['SUBJECT_NAME']=subjectforkinsettings.keys():ifisinstance(settings[k],str):settings[k]=settings[k].replace(f'\\Subjects\\{old_subject}',f'\\Subjects\\{subject}')if'SESSION_NAME'insettings:settings['SESSION_NAME']='\\'.join([subject,*settings['SESSION_NAME'].split('\\')[1:]])settings.pop('PYBPOD_SUBJECT_EXTRA',None)# Get rid of Alyx subject infoifdate:# Patch session datetimedate=str(date)old_date=settings['SESSION_DATE']settings['SESSION_DATE']=dateforkinsettings.keys():ifisinstance(settings[k],str):settings[k]=settings[k].replace(f'\\{settings["SUBJECT_NAME"]}\\{old_date}',f'\\{settings["SUBJECT_NAME"]}\\{date}')settings['SESSION_DATETIME']=date+settings['SESSION_DATETIME'][10:]if'SESSION_END_TIME'insettings:settings['SESSION_END_TIME']=date+settings['SESSION_END_TIME'][10:]if'SESSION_START_TIME'insettings:settings['SESSION_START_TIME']=date+settings['SESSION_START_TIME'][10:]ifnumber:# Patch session numberold_number=settings['SESSION_NUMBER']ifisinstance(number,int):number=f'{number:03}'settings['SESSION_NUMBER']=numberforkinsettings.keys():ifisinstance(settings[k],str):settings[k]=settings[k].replace(f'\\{settings["SESSION_DATE"]}\\{old_number}',f'\\{settings["SESSION_DATE"]}\\{number}')ifnew_collection:if'SESSION_RAW_DATA_FOLDER'notinsettings:_logger.warning('SESSION_RAW_DATA_FOLDER key not in settings; collection not updated')else:old_path=settings['SESSION_RAW_DATA_FOLDER']new_path=PureWindowsPath(settings['SESSION_RAW_DATA_FOLDER']).with_name(new_collection)forkinsettings.keys():ifisinstance(settings[k],str):settings[k]=settings[k].replace(old_path,str(new_path))withopen(file_path,'w')asfp:json.dump(settings,fp,indent=' ')returnsettings