#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pathlib import Path
import logging
import numpy as np
import scipy.signal
import scipy.ndimage
from scipy.io import wavfile
from ibldsp.utils import WindowGenerator
from ibldsp import fourier
import ibllib.io.raw_data_loaders as ioraw
from ibllib.io.extractors.training_trials import GoCueTimes
logger_ = logging.getLogger(__name__)
NS_WIN = 2 ** 18 # 2 ** np.ceil(np.log2(1 * fs))
OVERLAP = NS_WIN / 2
NS_WELCH = 512
FTONE = 5000
UNIT = 'dBFS' # dBFS or dbSPL
READY_TONE_SPL = 85
[docs]
def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8):
"""
Detects a transient sinusoid signal in a time-series
:param w: audio time seried
:param fs: sampling frequency (Hz)
:param ftone: frequency of the tone to detect
:param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8)
:return:
"""
# get envelope of DC free signal and envelope of BP signal around freq of interest
h = np.abs(scipy.signal.hilbert(w - np.median(w)))
fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1]))))
dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold
return np.where(np.diff(dtect.astype(int)) == 1)[0]
# tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs)
# tone = tone / np.sum(tone ** 2)
# xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone)))
def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL):
# 3 approaches here (not exclusive):
# a- get the mic sensitivity, the preamp gain and DAC parameters and do the math
# b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab)
# c- use calibrated ready tone
# The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa).
# Usual calibration is 1 Pa (94 dBSPL) at 1 kHz
# c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between
# 1 and 5 kHz, and observing the peak value on the 5k at the microphone.
if unit == 'dBFS':
return 1.0
distance_to_the_mic = .155
peak_value_observed = 60
rms_value_observed = np.sqrt(2) / 2 * peak_value_observed
fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic
return fac
[docs]
def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None):
"""
Computes a spectrogram on a very large audio file.
:param fs: sampling frequency (Hz)
:param wav: wav signal (vector or memmap)
:param nswin: n samples of the sliding window
:param overlap: n samples of the overlap between windows
:param nperseg: n samples for the computation of the spectrogram
:param detect_kwargs: specified paramaters for detection
:return: tscale, fscale, downsampled_spectrogram
"""
ns = wav.shape[0]
window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap)
nwin = window_generator.nwin
fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True)
W = np.zeros((nwin, len(fscale)))
tscale = window_generator.tscale(fs=fs)
detect = []
for first, last in window_generator.firstlast:
# load the current window into memory
w = np.float64(wav[first:last]) * _get_conversion_factor()
# detection of ready tones
detect_kwargs = detect_kwargs or {}
a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)]
if len(a):
detect += a
# the last window may not allow a pwelch
if (last - first) < nperseg:
continue
# compute PSD estimate for the current window
iw = window_generator.iw
_, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1,
detrend='constant', return_onesided=True, scaling='density')
# the onset detection may have duplicates with sliding window, average them and remove
detect = np.sort(np.array(detect)) / fs
ind = np.where(np.diff(detect) < 0.1)[0]
detect[ind] = (detect[ind] + detect[ind + 1]) / 2
detect = np.delete(detect, ind + 1)
return tscale, fscale, W, detect
def _fix_wav_file(wav_file):
import platform
import subprocess
status = -1
if platform.system() != 'Linux':
return status
wav_file_tmp = wav_file.with_suffix('.wav_')
wav_file.rename(wav_file_tmp)
command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}'
process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.communicate()
if process.returncode == 0:
wav_file_tmp.unlink()
else:
wav_file_tmp.rename(wav_file)
return process.returncode