Source code for brainbox.io.io

import time
import numpy as np
# (Previously required `os.path` to get file info before memmapping)
# import os.path as op
from ibllib.io import spikeglx


[docs]def extract_waveforms(ephys_file, ts, ch, t=2.0, sr=30000, n_ch_probe=385, dtype='int16', offset=0, car=True): ''' Extracts spike waveforms from binary ephys data file, after (optionally) common-average-referencing (CAR) spatial noise. Parameters ---------- ephys_file : string The file path to the binary ephys data. ts : ndarray_like The timestamps (in s) of the spikes. ch : ndarray_like The channels on which to extract the waveforms. t : numeric (optional) The time (in ms) of each returned waveform. sr : int (optional) The sampling rate (in hz) that the ephys data was acquired at. n_ch_probe : int (optional) The number of channels of the recording. dtype: str (optional) The datatype represented by the bytes in `ephys_file`. offset: int (optional) The offset (in bytes) from the start of `ephys_file`. car: bool (optional) A flag to perform CAR before extracting waveforms. Returns ------- waveforms : ndarray An array of shape (#spikes, #samples, #channels) containing the waveforms. Examples -------- 1) Extract all the waveforms for unit1 with and without CAR. >>> import numpy as np >>> import brainbox as bb >>> import alf.io as aio >>> import ibllib.ephys.spikes as e_spks (*Note, if there is no 'alf' directory, make 'alf' directory from 'ks2' output directory): >>> e_spks.ks2_to_alf(path_to_ks_out, path_to_alf_out) # Get a clusters bunch and a units bunch from a spikes bunch from an alf directory. >>> clstrs_b = aio.load_object(path_to_alf_out, 'clusters') >>> spks_b = aio.load_object(path_to_alf_out, 'spikes') >>> units_b = bb.processing.get_units_bunch(spks, ['times']) # Get the timestamps and 20 channels around the max amp channel for unit1, and extract the # two sets of waveforms. >>> ts = units_b['times']['1'] >>> max_ch = max_ch = clstrs_b['channels'][1] >>> if max_ch < 10: # take only channels greater than `max_ch`. >>> ch = np.arange(max_ch, max_ch + 20) >>> elif (max_ch + 10) > 385: # take only channels less than `max_ch`. >>> ch = np.arange(max_ch - 20, max_ch) >>> else: # take `n_c_ch` around `max_ch`. >>> ch = np.arange(max_ch - 10, max_ch + 10) >>> wf = bb.io.extract_waveforms(path_to_ephys_file, ts, ch, car=False) >>> wf_car = bb.io.extract_waveforms(path_to_ephys_file, ts, ch, car=True) ''' # (Previously memmaped the file manually, but now use `spikeglx.Reader`) # item_bytes = np.dtype(dtype).itemsize # n_samples = (op.getsize(ephys_file) - offset) // (item_bytes * n_ch_probe) # file_m = np.memmap(ephys_file, shape=(n_samples, n_ch_probe), dtype=dtype, mode='r') # Get memmapped array of `ephys_file` s_reader = spikeglx.Reader(ephys_file) file_m = s_reader.data # the memmapped array n_wf_samples = np.int(sr / 1000 * (t / 2)) # number of samples to return on each side of a ts ts_samples = np.array(ts * sr).astype(int) # the samples corresponding to `ts` t_sample_first = ts_samples[0] - n_wf_samples # Exception handling for impossible channels ch = np.asarray(ch) ch = ch.reshape((ch.size, 1)) if ch.size == 1 else ch if np.any(ch < 0) or np.any(ch > n_ch_probe): raise Exception('At least one specified channel number is impossible. The minimum channel' ' number was {}, and the maximum channel number was {}. Check specified' ' channel numbers and try again.'.format(np.min(ch), np.max(ch))) if car: # compute spatial noise in chunks # see https://github.com/int-brain-lab/iblenv/issues/5 raise NotImplementedError("CAR option is not available") # Initialize `waveforms`, extract waveforms from `file_m`, and CAR. waveforms = np.zeros((len(ts), 2 * n_wf_samples, ch.size)) # Give time estimate for extracting waveforms. t0 = time.perf_counter() for i in range(5): waveforms[i, :, :] = \ file_m[i * n_wf_samples * 2 + t_sample_first: i * n_wf_samples * 2 + t_sample_first + n_wf_samples * 2, ch].reshape( (n_wf_samples * 2, ch.size)) dt = time.perf_counter() - t0 print('Performing waveform extraction. Estimated time is {:.2f} mins. ({})' .format(dt * len(ts) / 60 / 5, time.ctime())) for spk, _ in enumerate(ts): # extract waveforms spk_ts_sample = ts_samples[spk] spk_samples = np.arange(spk_ts_sample - n_wf_samples, spk_ts_sample + n_wf_samples) # have to reshape to add an axis to broadcast `file_m` into `waveforms` waveforms[spk, :, :] = \ file_m[spk_samples[0]:spk_samples[-1] + 1, ch].reshape((spk_samples.size, ch.size)) print('Done. ({})'.format(time.ctime())) return waveforms