Source code for ibllib.io.extractors.training_audio

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pathlib import Path
import logging

import numpy as np
import scipy.signal
import scipy.ndimage
from scipy.io import wavfile


from ibldsp.utils import WindowGenerator
from ibldsp import fourier
import ibllib.io.raw_data_loaders as ioraw
from ibllib.io.extractors.training_trials import GoCueTimes


logger_ = logging.getLogger(__name__)

NS_WIN = 2 ** 18  # 2 ** np.ceil(np.log2(1 * fs))
OVERLAP = NS_WIN / 2
NS_WELCH = 512
FTONE = 5000
UNIT = 'dBFS'  # dBFS or dbSPL
READY_TONE_SPL = 85


[docs] def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8): """ Detects a transient sinusoid signal in a time-serie :param w: audio time seried :param fs: sampling frequency (Hz) :param ftone: frequency of the tone to detect :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8) :return: """ # get envelope of DC free signal and envelope of BP signal around freq of interest h = np.abs(scipy.signal.hilbert(w - np.median(w))) fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1])))) dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold return np.where(np.diff(dtect.astype(int)) == 1)[0]
# tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs) # tone = tone / np.sum(tone ** 2) # xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone))) def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL): # 3 approaches here (not exclusive): # a- get the mic sensitivity, the preamp gain and DAC parameters and do the math # b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab) # c- use calibrated ready tone # The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa). # Usual calibration is 1 Pa (94 dBSPL) at 1 kHz # c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between # 1 and 5 kHz, and observing the peak value on the 5k at the microphone. if unit == 'dBFS': return 1.0 distance_to_the_mic = .155 peak_value_observed = 60 rms_value_observed = np.sqrt(2) / 2 * peak_value_observed fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic return fac
[docs] def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None): """ Computes a spectrogram on a very large audio file. :param fs: sampling frequency (Hz) :param wav: wav signal (vector or memmap) :param nswin: n samples of the sliding window :param overlap: n samples of the overlap between windows :param nperseg: n samples for the computation of the spectrogram :param detect_kwargs: specified paramaters for detection :return: tscale, fscale, downsampled_spectrogram """ ns = wav.shape[0] window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap) nwin = window_generator.nwin fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True) W = np.zeros((nwin, len(fscale))) tscale = window_generator.tscale(fs=fs) detect = [] for first, last in window_generator.firstlast: # load the current window into memory w = np.float64(wav[first:last]) * _get_conversion_factor() # detection of ready tones detect_kwargs = detect_kwargs or {} a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)] if len(a): detect += a # the last window may not allow a pwelch if (last - first) < nperseg: continue # compute PSD estimate for the current window iw = window_generator.iw _, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1, detrend='constant', return_onesided=True, scaling='density') # the onset detection may have duplicates with sliding window, average them and remove detect = np.sort(np.array(detect)) / fs ind = np.where(np.diff(detect) < 0.1)[0] detect[ind] = (detect[ind] + detect[ind + 1]) / 2 detect = np.delete(detect, ind + 1) return tscale, fscale, W, detect
[docs] def extract_sound(ses_path, task_collection='raw_behavior_data', device_collection='raw_behavior_data', save=True, force=False, delete=False): """ Simple audio features extraction for ambient sound characterization. From a wav file, generates several ALF files to be registered on Alyx :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001) :param delete: if True, removes the wav file after processing :return: list of output files """ ses_path = Path(ses_path) wav_file = ses_path.joinpath(device_collection, '_iblrig_micData.raw.wav') out_folder = ses_path.joinpath(device_collection) files_out = {'power': out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'), 'frequencies': out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'), 'onset_times': out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'), 'times_microphone': out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'), } if not wav_file.exists(): logger_.warning(f"Wav file doesn't exist: {wav_file}") return [files_out[k] for k in files_out if files_out[k].exists()] # crunch the wav file fs, wav = wavfile.read(wav_file, mmap=False) if len(wav) == 0: status = _fix_wav_file(wav_file) if status != 0: logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}") return else: fs, wav = wavfile.read(wav_file, mmap=False) tscale, fscale, W, detect = welchogram(fs, wav) # save files if save: out_folder.mkdir(exist_ok=True) np.save(file=files_out['power'], arr=W.astype(np.single)) np.save(file=files_out['frequencies'], arr=fscale[None, :].astype(np.single)) np.save(file=files_out['onset_times'], arr=detect) np.save(file=files_out['times_microphone'], arr=tscale[:, None].astype(np.single)) # for the time scale, attempt to synchronize using onset sound detection and task data data = ioraw.load_data(ses_path, task_collection=task_collection) if data is None: # if no session data, we're done if delete: wav_file.unlink() return tgocue, _ = GoCueTimes(ses_path).extract(task_collection=task_collection, save=False, bpod_trials=data) ilast = min(len(tgocue), len(detect)) dt = tgocue[:ilast] - detect[: ilast] # only save if dt is consistent for the whole session if np.std(dt) < 0.2 and save: files_out['times'] = out_folder / '_iblmic_audioSpectrogram.times.npy' tscale += np.median(dt) np.save(file=files_out['times'], arr=tscale[:, None].astype(np.single)) if delete: wav_file.unlink() return [files_out[k] for k in files_out]
def _fix_wav_file(wav_file): import platform import subprocess status = -1 if platform.system() != 'Linux': return status wav_file_tmp = wav_file.with_suffix('.wav_') wav_file.rename(wav_file_tmp) command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}' process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process.communicate() if process.returncode == 0: wav_file_tmp.unlink() else: wav_file_tmp.rename(wav_file) return process.returncode