import logging
from ibllib.pipes import base_tasks
from ibllib.io.extractors.ephys_fpga import extract_sync
from iblutil.util import Bunch
import spikeglx
_logger = logging.getLogger('ibllib')
[docs]
class SyncRegisterRaw(base_tasks.RegisterRawDataTask):
"""Task to register raw DAQ data.
Registers DAQ software output for a given device. The object should be _*_DAQdata, where the
namespace identifies the DAQ model or acquisition software, e.g. 'mcc', 'ni' or 'ni-usb-6211'.
At minimum there should be a raw data dataset of the form _*_DAQdata.raw*, e.g.
'_mc_DAQdata.raw.pqt'. The following are optional attribute datasets:
- _*_DAQdata.timestamps.npy: for timeline the timestamps array is separate from the samples.
- _*_DAQdata.meta.json: for timeline all acquisition meta data (e.g. sample rate, channel
names) are stored in a separate file.
- _*_DAQdata.wiring.json: for SpikeGLX the channel map is stored in this file.
_timeline_softwareEvents.log.htsv: UDP messages and other software events in DAQ time.
"""
priority = 100
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [],
'output_files': [(f'*DAQdata.raw.{self.sync_ext}', self.sync_collection, True),
('*DAQdata.timestamps.npy', self.sync_collection, False),
('*DAQdata.meta.json', self.sync_collection, False),
('*DAQdata.wiring.json', self.sync_collection, False),
('*softwareEvents.log.htsv', self.sync_collection, False)]
}
return signature
[docs]
class SyncMtscomp(base_tasks.DynamicTask):
"""
Task to rename, compress and register raw daq data with .bin format collected using NIDAQ
"""
priority = 90
cpu = 2
io_charge = 30 # this jobs reads raw ap files
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [('*.*bin', self.sync_collection, True),
('*.meta', self.sync_collection, True),
('*.wiring.json', self.sync_collection, True)],
'output_files': [(f'_{self.sync_namespace}_DAQdata.raw.cbin', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)]
}
return signature
def _run(self):
out_files = []
# Detect the wiring file and rename (if it hasn't already been renamed)
wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None)
if wiring_file is not None:
if 'DAQdata.wiring' not in wiring_file.stem:
new_wiring_file = wiring_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.wiring.json')
wiring_file.replace(new_wiring_file)
else:
new_wiring_file = wiring_file
out_files.append(new_wiring_file)
# Search for .bin files in the sync_collection folder
bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None)
# If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files
if bin_file is None:
for ext in ['ch', 'meta']:
ext_file = next(self.session_path.joinpath(self.sync_collection).glob(f'*.{ext}'), None)
if ext_file is not None:
if 'DAQdata.raw' not in ext_file.stem:
new_ext_file = ext_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{ext_file.suffix}')
ext_file.replace(new_ext_file)
else:
new_ext_file = ext_file
out_files.append(new_ext_file)
return out_files if len(out_files) > 0 else None
# If we do find the .bin file, compress files (only if they haven't already been compressed)
sr = spikeglx.Reader(bin_file)
if sr.is_mtscomp:
sr.close()
cbin_file = bin_file
assert cbin_file.suffix == '.cbin'
else:
cbin_file = sr.compress_file()
sr.close()
bin_file.unlink()
# Rename files (only if they haven't already been renamed)
if 'DAQdata.raw' not in cbin_file.stem:
new_bin_file = cbin_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{cbin_file.suffix}')
cbin_file.replace(new_bin_file)
meta_file = cbin_file.with_suffix('.meta')
new_meta_file = new_bin_file.with_suffix('.meta')
meta_file.replace(new_meta_file)
ch_file = cbin_file.with_suffix('.ch')
new_ch_file = new_bin_file.with_suffix('.ch')
ch_file.replace(new_ch_file)
else:
new_bin_file = cbin_file
new_meta_file = cbin_file.with_suffix('.meta')
new_ch_file = cbin_file.with_suffix('.ch')
out_files.append(new_bin_file)
out_files.append(new_ch_file)
out_files.append(new_meta_file)
return out_files
[docs]
class SyncPulses(base_tasks.DynamicTask):
"""
Extract sync pulses from NIDAQ .bin / .cbin file
N.B Only extracts sync from sync collection (i.e not equivalent to EphysPulses that extracts sync pulses for each probe)
# TODO generalise to other daq and file formats, generalise to 3A probes
"""
priority = 90
cpu = 2
io_charge = 30 # this jobs reads raw ap files
job_size = 'small'
@property
def signature(self):
signature = {
'input_files': [(f'_{self.sync_namespace}_DAQdata.raw.*bin', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True),
(f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)],
'output_files': [(f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True)]
}
return signature
def _run(self, overwrite=False):
bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None)
if not bin_file:
return []
# TODO this is a hack, once we refactor the sync tasks should make generic extract_sync
# that doesn't rely on output of glob_ephys_files
files = [Bunch({'nidq': bin_file, 'label': ''})]
_, outputs = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite, namespace=self.sync_namespace)
return outputs