[docs]defdetect_ready_tone(w,fs,ftone=FTONE,threshold=0.8):""" Detects a transient sinusoid signal in a time-series :param w: audio time seried :param fs: sampling frequency (Hz) :param ftone: frequency of the tone to detect :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8) :return: """# get envelope of DC free signal and envelope of BP signal around freq of interesth=np.abs(scipy.signal.hilbert(w-np.median(w)))fh=np.abs(scipy.signal.hilbert(fourier.bp(w,si=1/fs,b=ftone*np.array([0.9,0.95,1.15,1.1]))))dtect=scipy.ndimage.uniform_filter1d(fh/(h+1e-3),int(fs*0.1))>thresholdreturnnp.where(np.diff(dtect.astype(int))==1)[0]
# tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs)# tone = tone / np.sum(tone ** 2)# xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone)))def_get_conversion_factor(unit=UNIT,ready_tone_spl=READY_TONE_SPL):# 3 approaches here (not exclusive):# a- get the mic sensitivity, the preamp gain and DAC parameters and do the math# b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab)# c- use calibrated ready tone# The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa).# Usual calibration is 1 Pa (94 dBSPL) at 1 kHz# c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between# 1 and 5 kHz, and observing the peak value on the 5k at the microphone.ifunit=='dBFS':return1.0distance_to_the_mic=.155peak_value_observed=60rms_value_observed=np.sqrt(2)/2*peak_value_observedfac=10**((ready_tone_spl-20*np.log10(rms_value_observed))/20)*distance_to_the_micreturnfac
[docs]defwelchogram(fs,wav,nswin=NS_WIN,overlap=OVERLAP,nperseg=NS_WELCH,detect_kwargs=None):""" Computes a spectrogram on a very large audio file. :param fs: sampling frequency (Hz) :param wav: wav signal (vector or memmap) :param nswin: n samples of the sliding window :param overlap: n samples of the overlap between windows :param nperseg: n samples for the computation of the spectrogram :param detect_kwargs: specified paramaters for detection :return: tscale, fscale, downsampled_spectrogram """ns=wav.shape[0]window_generator=WindowGenerator(ns=ns,nswin=nswin,overlap=overlap)nwin=window_generator.nwinfscale=fourier.fscale(nperseg,1/fs,one_sided=True)W=np.zeros((nwin,len(fscale)))tscale=window_generator.tscale(fs=fs)detect=[]forfirst,lastinwindow_generator.firstlast:# load the current window into memoryw=np.float64(wav[first:last])*_get_conversion_factor()# detection of ready tonesdetect_kwargs=detect_kwargsor{}a=[d+firstfordindetect_ready_tone(w,fs,**detect_kwargs)]iflen(a):detect+=a# the last window may not allow a pwelchif(last-first)<nperseg:continue# compute PSD estimate for the current windowiw=window_generator.iw_,W[iw,:]=scipy.signal.welch(w,fs=fs,window='hann',nperseg=nperseg,axis=-1,detrend='constant',return_onesided=True,scaling='density')# the onset detection may have duplicates with sliding window, average them and removedetect=np.sort(np.array(detect))/fsind=np.where(np.diff(detect)<0.1)[0]detect[ind]=(detect[ind]+detect[ind+1])/2detect=np.delete(detect,ind+1)returntscale,fscale,W,detect
[docs]defextract_sound(ses_path,task_collection='raw_behavior_data',device_collection='raw_behavior_data',save=True,force=False,delete=False):""" Simple audio features extraction for ambient sound characterization. From a wav file, generates several ALF files to be registered on Alyx :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001) :param delete: if True, removes the wav file after processing :return: list of output files """ses_path=Path(ses_path)wav_file=ses_path.joinpath(device_collection,'_iblrig_micData.raw.wav')out_folder=ses_path.joinpath(device_collection)files_out={'power':out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'),'frequencies':out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'),'onset_times':out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'),'times_microphone':out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'),}ifnotwav_file.exists():logger_.warning(f"Wav file doesn't exist: {wav_file}")return[files_out[k]forkinfiles_outiffiles_out[k].exists()]# crunch the wav filefs,wav=wavfile.read(wav_file,mmap=False)iflen(wav)==0:status=_fix_wav_file(wav_file)ifstatus!=0:logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}")returnelse:fs,wav=wavfile.read(wav_file,mmap=False)tscale,fscale,W,detect=welchogram(fs,wav)# save filesifsave:out_folder.mkdir(exist_ok=True)np.save(file=files_out['power'],arr=W.astype(np.single))np.save(file=files_out['frequencies'],arr=fscale[None,:].astype(np.single))np.save(file=files_out['onset_times'],arr=detect)np.save(file=files_out['times_microphone'],arr=tscale[:,None].astype(np.single))# for the time scale, attempt to synchronize using onset sound detection and task datadata=ioraw.load_data(ses_path,task_collection=task_collection)ifdataisNone:# if no session data, we're doneifdelete:wav_file.unlink()returntgocue,_=GoCueTimes(ses_path).extract(task_collection=task_collection,save=False,bpod_trials=data)ilast=min(len(tgocue),len(detect))dt=tgocue[:ilast]-detect[:ilast]# only save if dt is consistent for the whole sessionifnp.std(dt)<0.2andsave:files_out['times']=out_folder/'_iblmic_audioSpectrogram.times.npy'tscale+=np.median(dt)np.save(file=files_out['times'],arr=tscale[:,None].astype(np.single))ifdelete:wav_file.unlink()return[files_out[k]forkinfiles_out]