Source code for ibllib.ephys.spikes

from pathlib import Path
import logging
import json
import shutil
import tarfile

import numpy as np
from one.alf.path import get_session_path
import spikeglx

from iblutil.util import Bunch
import phylib.io.alf
from ibllib.ephys.sync_probes import apply_sync
import ibllib.ephys.ephysqc as ephysqc
from ibllib.ephys import sync_probes

_logger = logging.getLogger(__name__)


[docs] def probes_description(ses_path, one): """ Aggregate probes information into ALF files Register alyx probe insertions and Micro-manipulator trajectories Input: raw_ephys_data/probeXX/ Output: alf/probes.description.npy """ eid = one.path2eid(ses_path, query_type='remote') ses_path = Path(ses_path) meta_files = spikeglx.glob_ephys_files(ses_path, ext='meta') ap_meta_files = [(ep.ap.parent, ep.label, ep) for ep in meta_files if ep.get('ap')] # If we don't detect any meta files exit function if len(ap_meta_files) == 0: return subdirs, labels, efiles_sorted = zip(*sorted(ap_meta_files)) def _create_insertion(md, label, eid): # create json description description = {'label': label, 'model': md.neuropixelVersion, 'serial': int(md.serial), 'raw_file_name': md.fileName} # create or update probe insertion on alyx alyx_insertion = {'session': eid, 'model': md.neuropixelVersion, 'serial': md.serial, 'name': label} pi = one.alyx.rest('insertions', 'list', session=eid, name=label) if len(pi) == 0: qc_dict = {'qc': 'NOT_SET', 'extended_qc': {}} alyx_insertion.update({'json': qc_dict}) insertion = one.alyx.rest('insertions', 'create', data=alyx_insertion) else: insertion = one.alyx.rest('insertions', 'partial_update', data=alyx_insertion, id=pi[0]['id']) return description, insertion # Ouputs the probes description file probe_description = [] alyx_insertions = [] for label, ef in zip(labels, efiles_sorted): md = spikeglx.read_meta_data(ef.ap.with_suffix('.meta')) if md.neuropixelVersion == 'NP2.4': # NP2.4 meta that hasn't been split if md.get('NP2.4_shank', None) is None: geometry = spikeglx.read_geometry(ef.ap.with_suffix('.meta')) nshanks = np.unique(geometry['shank']) for shank in nshanks: label_ext = f'{label}{chr(97 + int(shank))}' description, insertion = _create_insertion(md, label_ext, eid) probe_description.append(description) alyx_insertions.append(insertion) # NP2.4 meta that has already been split else: description, insertion = _create_insertion(md, label, eid) probe_description.append(description) alyx_insertions.append(insertion) else: description, insertion = _create_insertion(md, label, eid) probe_description.append(description) alyx_insertions.append(insertion) alf_path = ses_path.joinpath('alf') alf_path.mkdir(exist_ok=True, parents=True) probe_description_file = alf_path.joinpath('probes.description.json') with open(probe_description_file, 'w+') as fid: fid.write(json.dumps(probe_description)) return [probe_description_file]
[docs] def sync_spike_sorting(ap_file, out_path): """ Synchronizes the spike.times using the previously computed sync files :param ap_file: raw binary data file for the probe insertion :param out_path: probe output path (usually {session_path}/alf/{probe_label}) """ def _sr(ap_file): # gets sampling rate from data md = spikeglx.read_meta_data(ap_file.with_suffix('.meta')) return spikeglx._get_fs_from_meta(md) out_files = [] label = ap_file.parts[-1] # now the bin file is always in a folder bearing the name of probe sync_file = ap_file.parent.joinpath( ap_file.name.replace('.ap.', '.sync.')).with_suffix('.npy') # try to get probe sync if it doesn't exist if not sync_file.exists(): _, sync_files = sync_probes.sync(get_session_path(ap_file)) out_files.extend(sync_files) # if it still not there, full blown error if not sync_file.exists(): # if there is no sync file it means something went wrong. Outputs the spike sorting # in time according the the probe by following ALF convention on the times objects error_msg = f'No synchronisation file for {label}: {sync_file}. The spike-' \ f'sorting is not synchronized and data not uploaded on Flat-Iron' _logger.error(error_msg) # remove the alf folder if the sync failed shutil.rmtree(out_path) return None, 1 # patch the spikes.times files manually st_file = out_path.joinpath('spikes.times.npy') spike_samples = np.load(out_path.joinpath('spikes.samples.npy')) interp_times = apply_sync(sync_file, spike_samples / _sr(ap_file), forward=True) np.save(st_file, interp_times) # get the list of output files out_files.extend([f for f in out_path.glob("*.*") if f.name.startswith(('channels.', 'drift', 'clusters.', 'spikes.', 'templates.', '_kilosort_', '_phy_spikes_subset', '_ibl_log.info'))]) # the QC files computed during spike sorting stay within the raw ephys data folder out_files.extend(list(ap_file.parent.glob('_iblqc_*AP.*.npy'))) return out_files, 0
[docs] def ks2_to_alf(ks_path, bin_path, out_path, bin_file=None, ampfactor=1, label=None, force=True): """ Convert Kilosort 2 output to ALF dataset for single probe data :param ks_path: :param bin_path: path of raw data :param out_path: :return: """ m = ephysqc.phy_model_from_ks2_path(ks2_path=ks_path, bin_path=bin_path, bin_file=bin_file) ac = phylib.io.alf.EphysAlfCreator(m) ac.convert(out_path, label=label, force=force, ampfactor=float(ampfactor))
[docs] def ks2_to_tar(ks_path, out_path, force=False): """ Compress output from kilosort 2 into tar file in order to register to flatiron and move to spikesorters/ks2_matlab/probexx path. Output file to register :param ks_path: path to kilosort output :param out_path: path to keep the :return path to tar ks output To extract files from the tar file can use this code Example: save_path = Path('folder you want to extract to') with tarfile.open('_kilosort_output.tar', 'r') as tar_dir: tar_dir.extractall(path=save_path) """ ks2_output = ['amplitudes.npy', 'channel_map.npy', 'channel_positions.npy', 'cluster_Amplitude.tsv', 'cluster_ContamPct.tsv', 'cluster_group.tsv', 'cluster_KSLabel.tsv', 'params.py', 'pc_feature_ind.npy', 'pc_features.npy', 'similar_templates.npy', 'spike_clusters.npy', 'spike_sorting_ks2.log', 'spike_templates.npy', 'spike_times.npy', 'template_feature_ind.npy', 'template_features.npy', 'templates.npy', 'templates_ind.npy', 'whitening_mat.npy', 'whitening_mat_inv.npy'] out_file = Path(out_path).joinpath('_kilosort_raw.output.tar') if out_file.exists() and not force: _logger.info(f"Already converted ks2 to tar: for {ks_path}, skipping.") return [out_file] with tarfile.open(out_file, 'w') as tar_dir: for file in Path(ks_path).iterdir(): if file.name in ks2_output: tar_dir.add(file, file.name) return [out_file]
[docs] def detection(data, fs, h, detect_threshold=-4, time_tol=.002, distance_threshold_um=70): """ Detects and de-duplicates negative voltage spikes based on voltage thresholding. The de-duplication step locks in maximum amplitude events. To account for collisions the amplitude is assumed to be decaying from the peak. If this is a multipeak event, each is labeled as a spike. :param data: 2D numpy array nsamples x nchannels :param fs: sampling frequency (Hz) :param h: dictionary with neuropixel geometry header: see. neuropixel.trace_header :param detect_threshold: negative value below which the voltage is considered to be a spike :param time_tol: time in seconds for which samples before and after are assumed to be part of the spike :param distance_threshold_um: distance for which exceeding threshold values are assumed to part of the same spike :return: spikes dictionary of vectors with keys "time", "trace", "amp" and "ispike" """ multipeak = False time_bracket = np.array([-1, 1]) * time_tol inds, indtr = np.where(data < detect_threshold) picks = Bunch(time=inds / fs, trace=indtr, amp=data[inds, indtr], ispike=np.zeros(inds.size)) amp_order = np.argsort(picks.amp) hxy = h['x'] + 1j * h['y'] spike_id = 1 while np.any(picks.ispike == 0): # find the first unassigned spike with the highest amplitude iamp = np.where(picks.ispike[amp_order] == 0)[0][0] imax = amp_order[iamp] # look only within the time range itlims = np.searchsorted(picks.time, picks.time[imax] + time_bracket) itlims = np.arange(itlims[0], itlims[1]) offset = np.abs(hxy[picks.trace[itlims]] - hxy[picks.trace[imax]]) iit = np.where(offset < distance_threshold_um)[0] picks.ispike[itlims[iit]] = -1 picks.ispike[imax] = spike_id # handles collision with a simple amplitude decay model: if amplitude doesn't decay # as a function of offset, then it's a collision and another spike is set if multipeak: # noqa iii = np.lexsort((picks.amp[itlims[iit]], offset[iit])) sorted_amps_db = 20 * np.log10(np.abs(picks.amp[itlims[iit][iii]])) idetect = np.r_[0, np.where(np.diff(sorted_amps_db) > 12)[0] + 1] picks.ispike[itlims[iit[iii[idetect]]]] = np.arange(idetect.size) + spike_id spike_id += idetect.size else: spike_id += 1 detects = Bunch({k: picks[k][picks.ispike > 0] for k in picks}) return detects