Source code for ibllib.pipes.sync_tasks

import logging

from ibllib.pipes import base_tasks
from ibllib.io.extractors.ephys_fpga import extract_sync
from iblutil.util import Bunch

import spikeglx

_logger = logging.getLogger('ibllib')


[docs] class SyncRegisterRaw(base_tasks.RegisterRawDataTask): """Task to register raw DAQ data. Registers DAQ software output for a given device. The object should be _*_DAQdata, where the namespace identifies the DAQ model or acquisition software, e.g. 'mcc', 'ni' or 'ni-usb-6211'. At minimum there should be a raw data dataset of the form _*_DAQdata.raw*, e.g. '_mc_DAQdata.raw.pqt'. The following are optional attribute datasets: - _*_DAQdata.timestamps.npy: for timeline the timestamps array is separate from the samples. - _*_DAQdata.meta.json: for timeline all acquisition meta data (e.g. sample rate, channel names) are stored in a separate file. - _*_DAQdata.wiring.json: for SpikeGLX the channel map is stored in this file. _timeline_softwareEvents.log.htsv: UDP messages and other software events in DAQ time. """ priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [(f'*DAQdata.raw.{self.sync_ext}', self.sync_collection, True), ('*DAQdata.timestamps.npy', self.sync_collection, False), ('*DAQdata.meta.json', self.sync_collection, False), ('*DAQdata.wiring.json', self.sync_collection, False), ('*softwareEvents.log.htsv', self.sync_collection, False)] } return signature
[docs] class SyncMtscomp(base_tasks.DynamicTask): """ Task to rename, compress and register raw daq data with .bin format collected using NIDAQ """ priority = 90 cpu = 2 io_charge = 30 # this jobs reads raw ap files job_size = 'small' @property def signature(self): signature = { 'input_files': [('*.*bin', self.sync_collection, True), ('*.meta', self.sync_collection, True), ('*.wiring.json', self.sync_collection, True)], 'output_files': [(f'_{self.sync_namespace}_DAQdata.raw.cbin', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)] } return signature def _run(self): out_files = [] # Detect the wiring file and rename (if it hasn't already been renamed) wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) if wiring_file is not None: if 'DAQdata.wiring' not in wiring_file.stem: new_wiring_file = wiring_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.wiring.json') wiring_file.replace(new_wiring_file) else: new_wiring_file = wiring_file out_files.append(new_wiring_file) # Search for .bin files in the sync_collection folder bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files if bin_file is None: for ext in ['ch', 'meta']: ext_file = next(self.session_path.joinpath(self.sync_collection).glob(f'*.{ext}'), None) if ext_file is not None: if 'DAQdata.raw' not in ext_file.stem: new_ext_file = ext_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{ext_file.suffix}') ext_file.replace(new_ext_file) else: new_ext_file = ext_file out_files.append(new_ext_file) return out_files if len(out_files) > 0 else None # If we do find the .bin file, compress files (only if they haven't already been compressed) sr = spikeglx.Reader(bin_file) if sr.is_mtscomp: sr.close() cbin_file = bin_file assert cbin_file.suffix == '.cbin' else: cbin_file = sr.compress_file() sr.close() bin_file.unlink() # Rename files (only if they haven't already been renamed) if 'DAQdata.raw' not in cbin_file.stem: new_bin_file = cbin_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{cbin_file.suffix}') cbin_file.replace(new_bin_file) meta_file = cbin_file.with_suffix('.meta') new_meta_file = new_bin_file.with_suffix('.meta') meta_file.replace(new_meta_file) ch_file = cbin_file.with_suffix('.ch') new_ch_file = new_bin_file.with_suffix('.ch') ch_file.replace(new_ch_file) else: new_bin_file = cbin_file new_meta_file = cbin_file.with_suffix('.meta') new_ch_file = cbin_file.with_suffix('.ch') out_files.append(new_bin_file) out_files.append(new_ch_file) out_files.append(new_meta_file) return out_files
[docs] class SyncPulses(base_tasks.DynamicTask): """ Extract sync pulses from NIDAQ .bin / .cbin file N.B Only extracts sync from sync collection (i.e not equivalent to EphysPulses that extracts sync pulses for each probe) # TODO generalise to other daq and file formats, generalise to 3A probes """ priority = 90 cpu = 2 io_charge = 30 # this jobs reads raw ap files job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_{self.sync_namespace}_DAQdata.raw.*bin', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True), (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)], 'output_files': [(f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True)] } return signature def _run(self, overwrite=False): bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) if not bin_file: return [] # TODO this is a hack, once we refactor the sync tasks should make generic extract_sync # that doesn't rely on output of glob_ephys_files files = [Bunch({'nidq': bin_file, 'label': ''})] _, outputs = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite, namespace=self.sync_namespace) return outputs