Source code for

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pathlib import Path
import logging

import numpy as np
import scipy.signal
import scipy.ndimage
from import wavfile

from ibldsp.utils import WindowGenerator
from ibldsp import fourier
import as ioraw
from import GoCueTimes

logger_ = logging.getLogger(__name__)

NS_WIN = 2 ** 18  # 2 ** np.ceil(np.log2(1 * fs))
NS_WELCH = 512
FTONE = 5000
UNIT = 'dBFS'  # dBFS or dbSPL

[docs] def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8): """ Detects a transient sinusoid signal in a time-serie :param w: audio time seried :param fs: sampling frequency (Hz) :param ftone: frequency of the tone to detect :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8) :return: """ # get envelope of DC free signal and envelope of BP signal around freq of interest h = np.abs(scipy.signal.hilbert(w - np.median(w))) fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1])))) dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold return np.where(np.diff(dtect.astype(int)) == 1)[0]
# tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs) # tone = tone / np.sum(tone ** 2) # xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone))) def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL): # 3 approaches here (not exclusive): # a- get the mic sensitivity, the preamp gain and DAC parameters and do the math # b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab) # c- use calibrated ready tone # The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa). # Usual calibration is 1 Pa (94 dBSPL) at 1 kHz # c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between # 1 and 5 kHz, and observing the peak value on the 5k at the microphone. if unit == 'dBFS': return 1.0 distance_to_the_mic = .155 peak_value_observed = 60 rms_value_observed = np.sqrt(2) / 2 * peak_value_observed fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic return fac
[docs] def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None): """ Computes a spectrogram on a very large audio file. :param fs: sampling frequency (Hz) :param wav: wav signal (vector or memmap) :param nswin: n samples of the sliding window :param overlap: n samples of the overlap between windows :param nperseg: n samples for the computation of the spectrogram :param detect_kwargs: specified paramaters for detection :return: tscale, fscale, downsampled_spectrogram """ ns = wav.shape[0] window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap) nwin = window_generator.nwin fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True) W = np.zeros((nwin, len(fscale))) tscale = window_generator.tscale(fs=fs) detect = [] for first, last in window_generator.firstlast: # load the current window into memory w = np.float64(wav[first:last]) * _get_conversion_factor() # detection of ready tones detect_kwargs = detect_kwargs or {} a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)] if len(a): detect += a # the last window may not allow a pwelch if (last - first) < nperseg: continue # compute PSD estimate for the current window iw = window_generator.iw _, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1, detrend='constant', return_onesided=True, scaling='density') # the onset detection may have duplicates with sliding window, average them and remove detect = np.sort(np.array(detect)) / fs ind = np.where(np.diff(detect) < 0.1)[0] detect[ind] = (detect[ind] + detect[ind + 1]) / 2 detect = np.delete(detect, ind + 1) return tscale, fscale, W, detect
[docs] def extract_sound(ses_path, task_collection='raw_behavior_data', device_collection='raw_behavior_data', save=True, force=False, delete=False): """ Simple audio features extraction for ambient sound characterization. From a wav file, generates several ALF files to be registered on Alyx :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001) :param delete: if True, removes the wav file after processing :return: list of output files """ ses_path = Path(ses_path) wav_file = ses_path.joinpath(device_collection, '_iblrig_micData.raw.wav') out_folder = ses_path.joinpath(device_collection) files_out = {'power': out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'), 'frequencies': out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'), 'onset_times': out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'), 'times_microphone': out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'), } if not wav_file.exists(): logger_.warning(f"Wav file doesn't exist: {wav_file}") return [files_out[k] for k in files_out if files_out[k].exists()] # crunch the wav file fs, wav =, mmap=False) if len(wav) == 0: status = _fix_wav_file(wav_file) if status != 0: logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}") return else: fs, wav =, mmap=False) tscale, fscale, W, detect = welchogram(fs, wav) # save files if save: out_folder.mkdir(exist_ok=True)['power'], arr=W.astype(np.single))['frequencies'], arr=fscale[None, :].astype(np.single))['onset_times'], arr=detect)['times_microphone'], arr=tscale[:, None].astype(np.single)) # for the time scale, attempt to synchronize using onset sound detection and task data data = ioraw.load_data(ses_path, task_collection=task_collection) if data is None: # if no session data, we're done if delete: wav_file.unlink() return tgocue, _ = GoCueTimes(ses_path).extract(task_collection=task_collection, save=False, bpod_trials=data) ilast = min(len(tgocue), len(detect)) dt = tgocue[:ilast] - detect[: ilast] # only save if dt is consistent for the whole session if np.std(dt) < 0.2 and save: files_out['times'] = out_folder / '_iblmic_audioSpectrogram.times.npy' tscale += np.median(dt)['times'], arr=tscale[:, None].astype(np.single)) if delete: wav_file.unlink() return [files_out[k] for k in files_out]
def _fix_wav_file(wav_file): import platform import subprocess status = -1 if platform.system() != 'Linux': return status wav_file_tmp = wav_file.with_suffix('.wav_') wav_file.rename(wav_file_tmp) command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}' process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process.communicate() if process.returncode == 0: wav_file_tmp.unlink() else: wav_file_tmp.rename(wav_file) return process.returncode