import logging
import traceback
from pathlib import Path
import subprocess
import re
import shutil
import packaging.version
import numpy as np
import pandas as pd
import spikeglx
import neuropixel
from ibldsp.utils import rms
from ibldsp.waveform_extraction import extract_wfs_cbin
import one.alf.io as alfio
from ibllib.misc import check_nvidia_driver
from ibllib.pipes import base_tasks
from ibllib.pipes.sync_tasks import SyncPulses
from ibllib.ephys import ephysqc
import ibllib.ephys.spikes
from ibllib.qc.alignment_qc import get_aligned_channels
from ibllib.plots.figures import LfpPlots, ApPlots, BadChannelsAp
from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots
from ibllib.io.extractors.ephys_fpga import extract_sync
from ibllib.ephys.spikes import sync_probes
_logger = logging.getLogger("ibllib")
[docs]
class EphysRegisterRaw(base_tasks.DynamicTask):
"""
Creates the probe insertions and uploads the probe descriptions file, also compresses the nidq files and uploads
"""
priority = 100
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [],
'output_files': [('probes.description.json', 'alf', True)]
}
return signature
def _run(self):
out_files = ibllib.ephys.spikes.probes_description(self.session_path, self.one)
return out_files
[docs]
class EphysSyncRegisterRaw(base_tasks.DynamicTask):
"""
Task to rename, compress and register raw daq data with .bin format collected using NIDAQ
"""
priority = 90
cpu = 2
io_charge = 30 # this jobs reads raw ap files
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('*.*bin', self.sync_collection, True),
('*.meta', self.sync_collection, True),
('*.wiring.json', self.sync_collection, True)],
'output_files': [('*nidq.cbin', self.sync_collection, True),
('*nidq.ch', self.sync_collection, True),
('*nidq.meta', self.sync_collection, True),
('*nidq.wiring.json', self.sync_collection, True)]
}
return signature
def _run(self):
out_files = []
# Detect the wiring file
wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None)
if wiring_file is not None:
out_files.append(wiring_file)
# Search for .bin files in the sync_collection folder
files = list(self.session_path.joinpath(self.sync_collection).glob('*nidq.*bin'))
bin_file = files[0] if len(files) == 1 else None
# If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files
if bin_file is None:
for ext in ['ch', 'meta']:
files = list(self.session_path.joinpath(self.sync_collection).glob(f'*nidq.{ext}'))
ext_file = files[0] if len(files) == 1 else None
if ext_file is not None:
out_files.append(ext_file)
return out_files if len(out_files) > 0 else None
# If we do find the .bin file, compress files (only if they haven't already been compressed)
sr = spikeglx.Reader(bin_file)
if sr.is_mtscomp:
sr.close()
cbin_file = bin_file
assert cbin_file.suffix == '.cbin'
else:
cbin_file = sr.compress_file()
sr.close()
bin_file.unlink()
meta_file = cbin_file.with_suffix('.meta')
ch_file = cbin_file.with_suffix('.ch')
out_files.append(cbin_file)
out_files.append(ch_file)
out_files.append(meta_file)
return out_files
[docs]
class EphysCompressNP1(base_tasks.EphysTask):
priority = 90
cpu = 2
io_charge = 100 # this jobs reads raw ap files
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
('*lf.meta', f'{self.device_collection}/{self.pname}', True),
('*lf.*bin', f'{self.device_collection}/{self.pname}', True),
('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
('*ap.ch', f'{self.device_collection}/{self.pname}', True),
('*lf.meta', f'{self.device_collection}/{self.pname}', True),
('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
('*lf.ch', f'{self.device_collection}/{self.pname}', True),
('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
}
return signature
def _run(self):
out_files = []
# Detect and upload the wiring file
wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None)
if wiring_file is not None:
out_files.append(wiring_file)
ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch")
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta")
for ef in ephys_files:
for typ in ["ap", "lf"]:
bin_file = ef.get(typ)
if not bin_file:
continue
if bin_file.suffix.find("bin") == 1:
with spikeglx.Reader(bin_file) as sr:
if sr.is_mtscomp:
out_files.append(bin_file)
else:
_logger.info(f"Compressing binary file {bin_file}")
cbin_file = sr.compress_file()
sr.close()
bin_file.unlink()
out_files.append(cbin_file)
out_files.append(cbin_file.with_suffix('.ch'))
else:
out_files.append(bin_file)
return out_files
[docs]
class EphysCompressNP21(base_tasks.EphysTask):
priority = 90
cpu = 2
io_charge = 100 # this jobs reads raw ap files
job_size = 'large'
@property
def signature(self):
signature = {
'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
('*ap.ch', f'{self.device_collection}/{self.pname}', True),
('*lf.meta', f'{self.device_collection}/{self.pname}', True),
('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
('*lf.ch', f'{self.device_collection}/{self.pname}', True),
('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
}
return signature
def _run(self):
out_files = []
# Detect wiring files
wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None)
if wiring_file is not None:
out_files.append(wiring_file)
ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
if len(ephys_files) > 0:
bin_file = ephys_files[0].get('ap', None)
# This is the case where no ap.bin/.cbin file exists
if len(ephys_files) == 0 or not bin_file:
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch")
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta")
for ef in ephys_files:
for typ in ["ap", "lf"]:
bin_file = ef.get(typ)
if bin_file:
out_files.append(bin_file)
return out_files
# If the ap.bin / .cbin file does exists instantiate the NP2converter
np_conv = neuropixel.NP2Converter(bin_file, compress=True)
np_conv_status = np_conv.process()
np_conv_files = np_conv.get_processed_files_NP21()
np_conv.sr.close()
# Status = 1 - successfully complete
if np_conv_status == 1: # This is the status that it has completed successfully
out_files += np_conv_files
return out_files
# Status = 0 - Already processed
elif np_conv_status == 0:
ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch")
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta")
for ef in ephys_files:
for typ in ["ap", "lf"]:
bin_file = ef.get(typ)
if bin_file and bin_file.suffix != '.bin':
out_files.append(bin_file)
return out_files
else:
return
[docs]
class EphysCompressNP24(base_tasks.EphysTask):
"""
Compresses NP2.4 data by splitting into N binary files, corresponding to N shanks
:param pname: a probe name string
:param device_collection: the collection containing the probes (usually 'raw_ephys_data')
:param nshanks: number of shanks used (usually 4 but it may be less depending on electrode map), optional
"""
priority = 90
cpu = 2
io_charge = 100 # this jobs reads raw ap files
job_size = 'large'
def __init__(self, session_path, *args, pname=None, device_collection='raw_ephys_data', nshanks=None, **kwargs):
assert pname, "pname is a required argument"
if nshanks is None:
meta_file = next(session_path.joinpath(device_collection, pname).glob('*ap.meta'))
nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file))
assert nshanks > 1
super(EphysCompressNP24, self).__init__(
session_path, *args, pname=pname, device_collection=device_collection, nshanks=nshanks, **kwargs)
@property
def signature(self):
signature = {
'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*ap.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*ap.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*lf.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*lf.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*lf.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
[('*wiring.json', f'{self.device_collection}/{self.pname}{pext}', False) for pext in self.pextra]
}
return signature
def _run(self, delete_original=True):
# Do we need the ability to register the files once it already been processed and original file deleted?
files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
assert len(files) == 1
bin_file = files[0].get('ap', None)
np_conv = neuropixel.NP2Converter(bin_file, post_check=True, compress=True, delete_original=delete_original)
np_conv_status = np_conv.process()
out_files = np_conv.get_processed_files_NP24()
np_conv.sr.close()
if np_conv_status == 1:
wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None)
if wiring_file is not None:
# copy wiring file to each sub probe directory and add to output files
for pext in self.pextra:
new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', wiring_file.name)
shutil.copyfile(wiring_file, new_wiring_file)
out_files.append(new_wiring_file)
return out_files
elif np_conv_status == 0:
for pext in self.pextra:
ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}'))
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection,
f'{self.pname}{pext}'), ext="ch")
ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection,
f'{self.pname}{pext}'), ext="meta")
for ef in ephys_files:
for typ in ["ap", "lf"]:
bin_file = ef.get(typ)
if bin_file and bin_file.suffix != '.bin':
out_files.append(bin_file)
wiring_file = next(self.session_path.joinpath(self.device_collection,
f'{self.pname}{pext}').glob('*wiring.json'), None)
if wiring_file is None:
# See if we have the original wiring file
orig_wiring_file = next(self.session_path.joinpath(self.device_collection,
self.pname).glob('*wiring.json'), None)
if orig_wiring_file is not None:
# copy wiring file to sub probe directory and add to output files
new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}',
orig_wiring_file.name)
shutil.copyfile(orig_wiring_file, new_wiring_file)
out_files.append(new_wiring_file)
else:
out_files.append(wiring_file)
return out_files
else:
return
[docs]
class EphysSyncPulses(SyncPulses):
priority = 90
cpu = 2
io_charge = 30 # this jobs reads raw ap files
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('*nidq.cbin', self.sync_collection, False),
('*nidq.ch', self.sync_collection, False),
('*nidq.meta', self.sync_collection, False),
('*nidq.wiring.json', self.sync_collection, True)],
'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True),
('_spikeglx_sync.polarities.npy', self.sync_collection, True),
('_spikeglx_sync.channels.npy', self.sync_collection, True)]
}
return signature
[docs]
class EphysPulses(base_tasks.EphysTask):
"""
Extract Pulses from raw electrophysiology data into numpy arrays
Perform the probes synchronisation with nidq (3B) or main probe (3A)
First the job extract the sync pulses from the synchronisation task in all probes, and then perform the
synchronisation with the nidq
:param pname: a list of probes names or a single probe name string
:param device_collection: the collection containing the probes (usually 'raw_ephys_data')
:param sync_collection: the collection containing the synchronisation device - nidq (usually 'raw_ephys_data')
"""
priority = 90
cpu = 2
io_charge = 30 # this jobs reads raw ap files
job_size = 'small'
def __init__(self, *args, **kwargs):
super(EphysPulses, self).__init__(*args, **kwargs)
assert self.device_collection, "device_collection is a required argument"
assert self.sync_collection, "sync_collection is a required argument"
self.pname = [self.pname] if isinstance(self.pname, str) else self.pname
assert isinstance(self.pname, list), 'pname task argument should be a list or a string'
@property
def signature(self):
signature = {
'input_files':
[('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
[('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
[('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
[('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
[('_spikeglx_sync.times.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
[('_spikeglx_sync.polarities.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
[('_spikeglx_sync.channels.*npy', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
[('_spikeglx_sync.times.*npy', self.sync_collection, True),
('_spikeglx_sync.polarities.*npy', self.sync_collection, True),
('_spikeglx_sync.channels.*npy', self.sync_collection, True),
('*ap.meta', self.sync_collection, True)
],
'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True)
for pname in self.pname] +
[(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True)
for pname in self.pname] +
[(f'_spikeglx_sync.channels.{pname}.npy', f'{self.device_collection}/{pname}', True)
for pname in self.pname] +
[('*sync.npy', f'{self.device_collection}/{pname}', True) for pname in
self.pname] +
[('*timestamps.npy', f'{self.device_collection}/{pname}', True) for pname in
self.pname]
}
return signature
def _run(self, overwrite=False):
out_files = []
for probe in self.pname:
files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, probe))
assert len(files) == 1 # will error if the session is split
bin_file = files[0].get('ap', None)
if not bin_file:
return []
_, out = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite)
out_files += out
status, sync_files = sync_probes.sync(self.session_path, probe_names=self.pname)
return out_files + sync_files
[docs]
class RawEphysQC(base_tasks.EphysTask):
cpu = 2
io_charge = 30 # this jobs reads raw ap files
priority = 10 # a lot of jobs depend on this one
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*lf.meta', f'{self.device_collection}/{self.pname}', True),
('*lf.ch', f'{self.device_collection}/{self.pname}', False),
('*lf.*bin', f'{self.device_collection}/{self.pname}', False)],
'output_files': [('_iblqc_ephysChannels.apRMS.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysChannels.rawSpikeRates.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysChannels.labels.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysSpectralDensityLF.freqs.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysSpectralDensityLF.power.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysSpectralDensityAP.freqs.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysSpectralDensityAP.power.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysTimeRmsLF.rms.npy', f'{self.device_collection}/{self.pname}', True),
('_iblqc_ephysTimeRmsLF.timestamps.npy', f'{self.device_collection}/{self.pname}', True)]
}
return signature
# TODO make sure this works with NP2 probes (at the moment not sure it will due to raiseError mapping)
def _run(self, overwrite=False):
eid = self.one.path2eid(self.session_path)
probe = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname)
# We expect there to only be one probe
if len(probe) != 1:
_logger.warning(f"{self.pname} for {eid} not found") # Should we create it?
self.status = -1
return
pid = probe[0]['id']
qc_files = []
_logger.info(f"\nRunning QC for probe insertion {self.pname}")
try:
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
_logger.info("Creating LFP QC plots")
plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one)
_ = plot_task.run()
self.plot_tasks.append(plot_task)
plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one)
_ = plot_task.run()
self.plot_tasks.append(plot_task)
except AssertionError:
_logger.error(traceback.format_exc())
self.status = -1
return qc_files
[docs]
class CellQCMixin:
"""
This mixin class is used to compute the cell QC metrics and update the json field of the probe insertion
The compute_cell_qc method is static and can be used independently.
"""
[docs]
@staticmethod
def compute_cell_qc(folder_alf_probe):
"""
Computes the cell QC given an extracted probe alf path
:param folder_alf_probe: folder
:return:
"""
# compute the straight qc
_logger.info(f"Computing cluster qc for {folder_alf_probe}")
spikes = alfio.load_object(folder_alf_probe, 'spikes')
clusters = alfio.load_object(folder_alf_probe, 'clusters')
df_units, drift = ephysqc.spike_sorting_metrics(
spikes.times, spikes.clusters, spikes.amps, spikes.depths,
cluster_ids=np.arange(clusters.channels.size))
# if the ks2 labels file exist, load them and add the column
file_labels = folder_alf_probe.joinpath('cluster_KSLabel.tsv')
if file_labels.exists():
ks2_labels = pd.read_csv(file_labels, sep='\t')
ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True)
df_units = pd.concat(
[df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1)
# save as parquet file
df_units.to_parquet(file_metrics := folder_alf_probe.joinpath("clusters.metrics.pqt"))
assert np.all((df_units['bitwise_fail'] == 0) == (df_units['label'] == 1)) # useless but sanity check for OW
cok = df_units['bitwise_fail'] == 0
sok = cok[spikes['clusters']].values
spikes['templates'] = spikes['templates'].astype(np.uint16)
spikes['clusters'] = spikes['clusters'].astype(np.uint16)
spikes['depths'] = spikes['depths'].astype(np.float32)
spikes['amps'] = spikes['amps'].astype(np.float32)
file_passing = folder_alf_probe.joinpath('passingSpikes.table.pqt')
df_spikes = pd.DataFrame(spikes)
df_spikes = df_spikes.iloc[sok, :].reset_index(drop=True)
df_spikes.to_parquet(file_passing)
return [file_metrics, file_passing], df_units, drift
def _label_probe_qc(self, folder_probe, df_units, drift):
"""
Labels the json field of the alyx corresponding probe insertion
:param folder_probe:
:param df_units:
:param drift:
:return:
"""
eid = self.one.path2eid(self.session_path, query_type='remote')
pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname, no_cache=True)
if len(pdict) != 1:
_logger.warning(f'No probe found for probe name: {self.pname}')
return
isok = df_units['label'] == 1
qcdict = {'n_units': int(df_units.shape[0]),
'n_units_qc_pass': int(np.sum(isok)),
'firing_rate_max': np.max(df_units['firing_rate'][isok]),
'firing_rate_median': np.median(df_units['firing_rate'][isok]),
'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6,
'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6,
'drift_rms_um': rms(drift['drift_um']),
}
file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy')
if file_wm.exists():
wm = np.load(file_wm)
qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm)
# groom qc dict (this function will eventually go directly into the json field update)
for k in qcdict:
if isinstance(qcdict[k], np.int64):
qcdict[k] = int(qcdict[k])
elif isinstance(qcdict[k], float):
qcdict[k] = np.round(qcdict[k], 2)
self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict)
[docs]
class SpikeSorting(base_tasks.EphysTask, CellQCMixin):
"""
Pykilosort 2.5 pipeline
"""
gpu = 1
io_charge = 100 # this jobs reads raw ap files
priority = 60
job_size = 'large'
force = True
env = 'iblsorter'
_sortername = 'iblsorter'
SHELL_SCRIPT = Path.home().joinpath(
f"Documents/PYTHON/iblscripts/deploy/serverpc/{_sortername}/sort_recording.sh"
)
SPIKE_SORTER_NAME = 'iblsorter'
SORTER_REPOSITORY = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter')
@property
def signature(self):
signature = {
'input_files': [
('*ap.meta', f'{self.device_collection}/{self.pname}', True),
('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
('*ap.ch', f'{self.device_collection}/{self.pname}', False),
('*sync.npy', f'{self.device_collection}/{self.pname}', True)
],
'output_files': [
# ./raw_ephys_data/probe00/
('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}/', True),
('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}/', True),
('_iblqc_ephysSaturation.samples.npy', f'{self.device_collection}/{self.pname}/', True),
# ./spike_sorters/iblsorter/probe00
('spike_sorting_iblsorter.log', f'spike_sorters/{self._sortername}/{self.pname}', True),
('_kilosort_raw.output.tar', f'spike_sorters/{self._sortername}/{self.pname}/', True),
# ./alf/probe00/iblsorter
('_kilosort_whitening.matrix.npy', f'alf/{self.pname}/{self._sortername}/', True),
('_phy_spikes_subset.channels.npy', f'alf/{self.pname}/{self._sortername}/', True),
('_phy_spikes_subset.spikes.npy', f'alf/{self.pname}/{self._sortername}/', True),
('_phy_spikes_subset.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True),
('channels.labels.npy', f'alf/{self.pname}/{self._sortername}/', True),
('channels.localCoordinates.npy', f'alf/{self.pname}/{self._sortername}/', True),
('channels.rawInd.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.amps.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.channels.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.depths.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.metrics.pqt', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.peakToTrough.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.uuids.csv', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True),
('clusters.waveformsChannels.npy', f'alf/{self.pname}/{self._sortername}/', True),
('drift.times.npy', f'alf/{self.pname}/{self._sortername}/', True),
('drift.um.npy', f'alf/{self.pname}/{self._sortername}/', True),
('drift_depths.um.npy', f'alf/{self.pname}/{self._sortername}/', True),
('passingSpikes.table.pqt', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.amps.npy', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.clusters.npy', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.depths.npy', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.samples.npy', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.templates.npy', f'alf/{self.pname}/{self._sortername}/', True),
('spikes.times.npy', f'alf/{self.pname}/{self._sortername}/', True),
('templates.amps.npy', f'alf/{self.pname}/{self._sortername}/', True),
('templates.waveforms.npy', f'alf/{self.pname}/{self._sortername}/', True),
('templates.waveformsChannels.npy', f'alf/{self.pname}/{self._sortername}/', True),
],
}
return signature
@property
def scratch_folder_run(self):
"""
Constructs a path to a temporary folder for the spike sorting output and scratch files
This is usually on a high performance drive, and we should factor around 2.5 times the uncompressed raw recording size
For a scratch drive at /mnt/h0 we would have the following temp dir:
/mnt/h0/iblsorter_1.8.0_CSHL071_2020-10-04_001_probe01/
"""
# get the scratch drive from the shell script
if self.scratch_folder is None:
with open(self.SHELL_SCRIPT) as fid:
lines = fid.readlines()
line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0]
m = re.search(r"\=(.*?)(\#|\n)", line)[0]
scratch_drive = Path(m[1:-1].strip())
else:
scratch_drive = self.scratch_folder
assert scratch_drive.exists(), f"Scratch drive {scratch_drive} not found"
# get the version of the sorter
self.version = self._fetch_iblsorter_version(self.SORTER_REPOSITORY)
spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}"
return scratch_drive.joinpath(spikesorter_dir)
@staticmethod
def _sample2v(ap_file):
md = spikeglx.read_meta_data(ap_file.with_suffix(".meta"))
s2v = spikeglx._conversion_sample2v_from_meta(md)
return s2v["ap"][0]
@staticmethod
def _fetch_iblsorter_version(repo_path):
try:
import iblsorter
return f"iblsorter_{iblsorter.__version__}"
except ImportError:
_logger.info('IBL-sorter not in environment, trying to locate the repository')
init_file = Path(repo_path).joinpath('iblsorter', '__init__.py')
try:
with open(init_file) as fid:
lines = fid.readlines()
for line in lines:
if line.startswith("__version__ = "):
version = line.split('=')[-1].strip().replace('"', '').replace("'", '')
except Exception:
pass
return f"iblsorter_{version}"
@staticmethod
def _fetch_iblsorter_run_version(log_file):
"""
Parse the following line (2 formats depending on version) from the log files to get the version
'\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n'
'\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n'
"""
with open(log_file) as fid:
line = fid.readline()
version = re.search('version (.*), output', line)
version = version or re.search('version (.*)', line) # old versions have output, new have a version line
version = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', version.group(1))
return version
def _run_iblsort(self, ap_file):
"""
Runs the ks2 matlab spike sorting for one probe dataset
the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder
(discontinued support for old spike sortings in the probe folder <1.5.5)
:return: path of the folder containing ks2 spike sorting output
"""
sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, self.pname)
self.FORCE_RERUN = False
if not self.FORCE_RERUN:
log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
if log_file.exists():
run_version = self._fetch_iblsorter_run_version(log_file)
if packaging.version.parse(run_version) >= packaging.version.parse('1.7.0'):
_logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log"
f" found in {sorter_dir}, skipping.")
return sorter_dir
else:
self.FORCE_RERUN = True
_logger.info(f"job progress command: tail -f {self.scratch_folder_run} *.log")
self.scratch_folder_run.mkdir(parents=True, exist_ok=True)
check_nvidia_driver()
try:
# if pykilosort is in the environment, use the installed version within the task
import iblsorter.ibl # noqa
iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=self.scratch_folder_run, delete=False)
except ImportError:
command2run = f"{self.SHELL_SCRIPT} {ap_file} {self.scratch_folder_run}"
_logger.info(command2run)
process = subprocess.Popen(
command2run,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
executable="/bin/bash",
)
info, error = process.communicate()
info_str = info.decode("utf-8").strip()
_logger.info(info_str)
if process.returncode != 0:
error_str = error.decode("utf-8").strip()
# try and get the kilosort log if any
for log_file in self.scratch_folder_run.rglob('*_kilosort.log'):
with open(log_file) as fid:
log = fid.read()
_logger.error(log)
break
raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}")
shutil.copytree(self.scratch_folder_run.joinpath('output'), sorter_dir, dirs_exist_ok=True)
return sorter_dir
def _run(self):
"""
Multiple steps. For each probe:
- Runs ks2 (skips if it already ran)
- synchronize the spike sorting
- output the probe description files
- compute the waveforms
:return: list of files to be registered on database
"""
efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()]
assert len(ap_files) != 0, f"No ap file found for probe {self.session_path.joinpath(self.device_collection, self.pname)}"
assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}"
ap_file, label = ap_files[0]
out_files = []
sorter_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran
# convert the data to ALF in the ./alf/probeXX/SPIKE_SORTER_NAME folder
probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME)
shutil.rmtree(probe_out_path, ignore_errors=True)
probe_out_path.mkdir(parents=True, exist_ok=True)
ibllib.ephys.spikes.ks2_to_alf(
sorter_dir,
bin_path=ap_file.parent,
out_path=probe_out_path,
bin_file=ap_file,
ampfactor=self._sample2v(ap_file),
)
logfile = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
if logfile.exists():
shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log"))
# recover the QC files from the spike sorting output and copy them
for file_qc in sorter_dir.glob('_iblqc_*.npy'):
shutil.move(file_qc, file_qc_out := ap_file.parent.joinpath(file_qc.name))
out_files.append(file_qc_out)
# Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A
out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path)
out_files.extend(out)
# Now compute the unit metrics
files_qc, df_units, drift = self.compute_cell_qc(probe_out_path)
out_files.extend(files_qc)
# convert ks2_output into tar file and also register
# Make this in case spike sorting is in old raw_ephys_data folders, for new
# sessions it should already exist
tar_dir = self.session_path.joinpath('spike_sorters', self.SPIKE_SORTER_NAME, label)
tar_dir.mkdir(parents=True, exist_ok=True)
out = ibllib.ephys.spikes.ks2_to_tar(sorter_dir, tar_dir, force=self.FORCE_RERUN)
out_files.extend(out)
# run waveform extraction
_logger.info("Running waveform extraction")
spikes = alfio.load_object(probe_out_path, 'spikes', attribute=['samples', 'clusters'])
clusters = alfio.load_object(probe_out_path, 'clusters', attribute=['channels'])
channels = alfio.load_object(probe_out_path, 'channels')
extract_wfs_cbin(
bin_file=ap_file,
output_dir=probe_out_path,
spike_samples=spikes['samples'],
spike_clusters=spikes['clusters'],
spike_channels=clusters['channels'][spikes['clusters']],
channel_labels=channels['labels'],
max_wf=256,
trough_offset=42,
spike_length_samples=128,
chunksize_samples=int(30_000),
n_jobs=None,
wfs_dtype=np.float16,
preprocess_steps=["phase_shift", "bad_channel_interpolation", "butterworth", "car"],
scratch_dir=self.scratch_folder_run,
)
_logger.info(f"Cleaning up temporary folder {self.scratch_folder_run}")
shutil.rmtree(self.scratch_folder_run, ignore_errors=True)
if self.one:
eid = self.one.path2eid(self.session_path, query_type='remote')
ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote')
if len(ins) != 0:
_logger.info("Populating probe insertion with qc")
self._label_probe_qc(probe_out_path, df_units, drift)
_logger.info("Creating SpikeSorting QC plots")
plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
_ = plot_task.run()
self.plot_tasks.append(plot_task)
plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
_ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path)))
self.plot_tasks.append(plot_task)
resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \
get('alignment_resolved', False)
if resolved:
chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy'))
out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path)
out_files.extend(out)
return out_files