import json
import logging
from pathlib import Path
import re
import shutil
import numpy as np
import mtscomp
from iblutil.io import hashfile
from iblutil.util import Bunch
from one.alf.io import remove_uuid_file
from one.api import ONE
from ibllib.ephys import neuropixel as neuropixel
SAMPLE_SIZE = 2 # int16
DEFAULT_BATCH_SIZE = 1e6
_logger = logging.getLogger('ibllib')
# provided as convenience if no meta-data is provided, always better to read from meta
S2V_AP = 2.34375e-06
S2V_LFP = 4.6875e-06
[docs]class Reader:
"""
Class for SpikeGLX reading purposes
Some format description was found looking at the Matlab SDK here
https://github.com/billkarsh/SpikeGLX/blob/master/MATLAB-SDK/DemoReadSGLXData.m
To open a spikeglx file that has an associated meta-data file:
sr = spikeglx.Reader(bin_file_path)
To open a flat binary file:
sr = spikeglx.Reader(bin_file_path, nc=385, ns=nsamples, fs=30000)
one can provide more options to the reader:
sr = spikeglx.Reader(..., dtype='int16, s2mv=2.34375e-06)
usual sample 2 mv conversion factors:
s2mv = 2.34375e-06 (NP1 ap banc) : default value used
s2mv = 4.6875e-06 (NP1 lfp band)
Note: To release system resources the close method must be called
"""
def __init__(self, sglx_file, open=True, nc=None, ns=None, fs=None, dtype='int16', s2v=None,
nsync=None):
"""
An interface for reading data from a SpikeGLX file
:param sglx_file: Path to a SpikeGLX file (compressed or otherwise)
:param open: when True the file is opened
"""
self.file_bin = Path(sglx_file)
self.nbytes = self.file_bin.stat().st_size
self.dtype = np.dtype(dtype)
file_meta_data = Path(sglx_file).with_suffix('.meta')
if not file_meta_data.exists():
err_str = "Instantiating an Reader without meta data requires providing nc, fs and nc parameters"
assert (nc is not None and fs is not None and nc is not None), err_str
self.file_meta_data = None
self.meta = None
self._nc, self._fs, self._ns = (nc, fs, ns)
# handles default parameters: if int16 we assume it's a raw recording with 1 sync and sample2mv
# if its' float32 or something else, we assume the sync channel has been removed and the scaling applied
if nsync is None:
nsync = 1 if self.dtype == np.dtype('int16') else 0
self._nsync = nsync
if s2v is None:
s2v = S2V_AP if self.dtype == np.dtype('int16') else 1.0
self.channel_conversion_sample2v = {'samples': np.ones(nc) * s2v}
self.channel_conversion_sample2v['samples'][-nsync:] = 1
else:
# normal case we continue reading and interpreting the metadata file
self.file_meta_data = file_meta_data
self.meta = read_meta_data(file_meta_data)
self.channel_conversion_sample2v = _conversion_sample2v_from_meta(self.meta)
self._raw = None
if open:
self.open()
[docs] def open(self):
# if we are not looking at a compressed file, use a memmap, otherwise instantiate mtscomp
sglx_file = str(self.file_bin)
if self.is_mtscomp:
self._raw = mtscomp.Reader()
self._raw.open(self.file_bin, self.file_bin.with_suffix('.ch'))
if self._raw.shape != (self.ns, self.nc):
ftsec = self._raw.shape[0] / self.fs
self.meta['fileTimeSecs'] = ftsec
if ftsec > 1: # avoid the checks for streaming data
_logger.warning(f"{sglx_file} : meta data and compressed chunks dont checkout\n"
f"File duration: expected {self.meta['fileTimeSecs']},"
f" actual {ftsec}\n"
f"Will attempt to fudge the meta-data information.")
else:
if self.nc * self.ns * self.dtype.itemsize != self.nbytes:
ftsec = self.file_bin.stat().st_size / self.dtype.itemsize / self.nc / self.fs
_logger.warning(f"{sglx_file} : meta data and filesize do not checkout\n"
f"File size: expected {self.meta['fileSizeBytes']},"
f" actual {self.file_bin.stat().st_size}\n"
f"File duration: expected {self.meta['fileTimeSecs']},"
f" actual {ftsec}\n"
f"Will attempt to fudge the meta-data information.")
self.meta['fileTimeSecs'] = ftsec
self._raw = np.memmap(sglx_file, dtype=self.dtype, mode='r', shape=(self.ns, self.nc))
[docs] def close(self):
if self.is_open:
getattr(self._raw, '_mmap', self._raw).close()
def __enter__(self):
if not self.is_open:
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getitem__(self, item):
if isinstance(item, int) or isinstance(item, slice):
return self.read(nsel=item, sync=False)
elif len(item) == 2:
return self.read(nsel=item[0], csel=item[1], sync=False)
@property
def sample2volts(self):
return self.channel_conversion_sample2v[self.type]
@property
def geometry(self):
"""
Gets the geometry, ie. the full trace header for the recording
:return: dictionary with keys 'row', 'col', 'ind', 'shank', 'adc', 'x', 'y', 'sample_shift'
"""
return _geometry_from_meta(self.meta)
@property
def shape(self):
return self.ns, self.nc
@property
def is_open(self):
return self._raw is not None
@property
def is_mtscomp(self):
return 'cbin' in self.file_bin.suffix
@property
def version(self):
"""Gets the version string: '3A', '3B2', '3B1', 'NP2.1', 'NP2.4'"""
return None if self.meta is None else _get_neuropixel_version_from_meta(self.meta)
@property
def major_version(self):
"""Gets the the major version int: 1 or 2"""
return None if self.meta is None else _get_neuropixel_major_version_from_meta(self.meta)
@property
def rl(self):
return self.ns / self.fs
@property
def type(self):
""":return: ap, lf or nidq. Useful to index dictionaries """
if not self.meta:
return 'samples'
return _get_type_from_meta(self.meta)
@property
def fs(self):
""" :return: sampling frequency (Hz) """
return self._fs if self.meta is None else _get_fs_from_meta(self.meta)
@property
def nc(self):
""" :return: number of channels """
return self._nc if self.meta is None else _get_nchannels_from_meta(self.meta)
@property
def nsync(self):
""":return: number of sync channels"""
return self._nsync if self.meta is None else len(_get_sync_trace_indices_from_meta(self.meta))
@property
def ns(self):
""" :return: number of samples """
if self.meta is None:
return self._ns
return int(np.round(self.meta.get('fileTimeSecs') * self.fs))
[docs] def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True):
"""
Read from slices or indexes
:param slice_n: slice or sample indices
:param slice_c: slice or channel indices
:return: float32 array
"""
if not self.is_open:
raise IOError('Reader not open; call `open` before `read`')
darray = self._raw[nsel, csel].astype(np.float32, copy=True)
darray *= self.channel_conversion_sample2v[self.type][csel]
if sync:
return darray, self.read_sync(nsel)
else:
return darray
[docs] def read_samples(self, first_sample=0, last_sample=10000, channels=None):
"""
reads all channels from first_sample to last_sample, following numpy slicing convention
sglx.read_samples(first=0, last=100) would be equivalent to slicing the array D
D[:,0:100] where the last axis represent time and the first channels.
:param first_sample: first sample to be read, python slice-wise
:param last_sample: last sample to be read, python slice-wise
:param channels: slice or numpy array of indices
:return: numpy array of int16
"""
if channels is None:
channels = slice(None)
return self.read(slice(first_sample, last_sample), channels)
[docs] def read_sync_digital(self, _slice=slice(0, 10000)):
"""
Reads only the digital sync trace at specified samples using slicing syntax
>>> sync_samples = sr.read_sync_digital(slice(0,10000))
"""
if not self.is_open:
raise IOError('Reader not open; call `open` before `read`')
if not self.meta:
_logger.warning('Sync trace not labeled in metadata. Assuming last trace')
return split_sync(self._raw[_slice, _get_sync_trace_indices_from_meta(self.meta)])
[docs] def read_sync_analog(self, _slice=slice(0, 10000)):
"""
Reads only the analog sync traces at specified samples using slicing syntax
>>> sync_samples = sr.read_sync_analog(slice(0,10000))
"""
if not self.meta:
return
csel = _get_analog_sync_trace_indices_from_meta(self.meta)
if not csel:
return
else:
return self.read(nsel=_slice, csel=csel, sync=False)
[docs] def read_sync(self, _slice=slice(0, 10000), threshold=1.2, floor_percentile=10):
"""
Reads all sync trace. Convert analog to digital with selected threshold and append to array
:param _slice: samples slice
:param threshold: (V) threshold for front detection, defaults to 1.2 V
:param floor_percentile: 10% removes the percentile value of the analog trace before
thresholding. This is to avoid DC offset drift
:return: int8 array
"""
digital = self.read_sync_digital(_slice)
analog = self.read_sync_analog(_slice)
if analog is not None and floor_percentile:
analog -= np.percentile(analog, 10, axis=0)
if analog is None:
return digital
analog[np.where(analog < threshold)] = 0
analog[np.where(analog >= threshold)] = 1
return np.concatenate((digital, np.int8(analog)), axis=1)
[docs] def compress_file(self, keep_original=True, **kwargs):
"""
Compresses
:param keep_original: defaults True. If False, the original uncompressed file is deleted
and the current spikeglx.Reader object is modified in place
:param kwargs:
:return: pathlib.Path of the compressed *.cbin file
"""
file_tmp = self.file_bin.with_suffix('.cbin_tmp')
assert not self.is_mtscomp
mtscomp.compress(self.file_bin,
out=file_tmp,
outmeta=self.file_bin.with_suffix('.ch'),
sample_rate=self.fs,
n_channels=self.nc,
dtype=np.int16,
**kwargs)
file_out = file_tmp.with_suffix('.cbin')
file_tmp.rename(file_out)
if not keep_original:
self.file_bin.unlink()
self.file_bin = file_out
return file_out
[docs] def decompress_file(self, keep_original=True, **kwargs):
"""
Decompresses a mtscomp file
:param keep_original: defaults True. If False, the original compressed file (input)
is deleted and the current spikeglx.Reader object is modified in place
NB: This is not equivalent to overwrite (which replaces the output file)
:return: pathlib.Path of the decompressed *.bin file
"""
if 'out' not in kwargs:
kwargs['out'] = self.file_bin.with_suffix('.bin')
assert self.is_mtscomp
r = mtscomp.decompress(self.file_bin, self.file_bin.with_suffix('.ch'), **kwargs)
r.close()
if not keep_original:
self.close()
self.file_bin.unlink()
self.file_bin.with_suffix('.ch').unlink()
self.file_bin = kwargs['out']
return kwargs['out']
[docs] def verify_hash(self):
"""
Computes SHA-1 hash and returns True if it matches metadata, False otherwise
:return: boolean
"""
if self.is_mtscomp:
with open(self.file_bin.with_suffix('.ch')) as fid:
mtscomp_params = json.load(fid)
sm = mtscomp_params.get('sha1_compressed', None)
if sm is None:
_logger.warning("SHA1 hash is not implemented for compressed ephys. To check "
"the spikeglx acquisition hash, uncompress the file first !")
return True
sm = sm.upper()
else:
sm = self.meta.fileSHA1
sc = hashfile.sha1(self.file_bin).upper()
if sm == sc:
log_func = _logger.info
else:
log_func = _logger.error
log_func(f"SHA1 metadata: {sm}")
log_func(f"SHA1 computed: {sc}")
return sm == sc
[docs]def read(sglx_file, first_sample=0, last_sample=10000):
"""
Function to read from a spikeglx binary file without instantiating the class.
Gets the meta-data as well.
>>> ibllib.io.spikeglx.read('/path/to/file.bin', first_sample=0, last_sample=1000)
:param sglx_file: full path the the binary file to read
:param first_sample: first sample to be read, python slice-wise
:param last_sample: last sample to be read, python slice-wise
:return: Data array, sync trace, meta-data
"""
with Reader(sglx_file) as sglxr:
D, sync = sglxr.read_samples(first_sample=first_sample, last_sample=last_sample)
return D, sync, sglxr.meta
def _get_savedChans_subset(chns):
"""
Get the subset of the original channels that are saved per shank
:param chns:
:return:
"""
chn_grps = np.r_[0, np.where(np.diff(chns) != 1)[0] + 1, len(chns)]
chn_subset = [f'{chns[chn_grps[i]]}:{chns[chn_grps[i + 1] - 1]}'
if chn_grps[i] < len(chns) - 1 else f'{chns[chn_grps[i]]}'
for i in range(len(chn_grps) - 1)]
return ','.join([sub for sub in chn_subset])
def _get_serial_number_from_meta(md):
"""
Get neuropixel serial number from the metadata dictionary
"""
# imProbeSN for 3A, imDatPrb_sn for 3B2, None for nidq 3B2
serial = md.get('imProbeSN') or md.get('imDatPrb_sn')
if serial:
return int(serial)
def _get_neuropixel_major_version_from_meta(md):
MAJOR_VERSION = {'3A': 1, '3B2': 1, '3B1': 1, 'NP2.1': 2, 'NP2.4': 2.4}
version = _get_neuropixel_version_from_meta(md)
if version is not None:
return MAJOR_VERSION[version]
def _get_neuropixel_version_from_meta(md):
"""
Get neuropixel version tag (3A, 3B1, 3B2) from the metadata dictionary
"""
if 'typeEnabled' in md.keys():
return '3A'
prb_type = md.get('imDatPrb_type')
# Neuropixel 1.0 either 3B1 or 3B2 (ask Olivier about 3B1)
if prb_type == 0:
if 'imDatPrb_port' in md.keys() and 'imDatPrb_slot' in md.keys():
return '3B2'
else:
return '3B1'
# Neuropixel 2.0 single shank
if prb_type == 21:
return 'NP2.1'
# Neuropixel 2.0 four shank
if prb_type == 24:
return 'NP2.4'
def _get_sync_trace_indices_from_meta(md):
"""
Returns a list containing indices of the sync traces in the original array
"""
typ = _get_type_from_meta(md)
ntr = int(_get_nchannels_from_meta(md))
if typ == 'nidq':
nsync = int(md.get('snsMnMaXaDw')[-1])
elif typ in ['lf', 'ap']:
nsync = int(md.get('snsApLfSy')[2])
return list(range(ntr - nsync, ntr))
def _get_analog_sync_trace_indices_from_meta(md):
"""
Returns a list containing indices of the sync traces in the original array
"""
typ = _get_type_from_meta(md)
if typ != 'nidq':
return []
tr = md.get('snsMnMaXaDw')
nsa = int(tr[-2])
return list(range(int(sum(tr[0:2])), int(sum(tr[0:2])) + nsa))
def _get_nchannels_from_meta(md):
return int(md.get('nSavedChans'))
def _get_fs_from_meta(md):
if md.get('typeThis') == 'imec':
return md.get('imSampRate')
else:
return md.get('niSampRate')
def _get_type_from_meta(md):
"""
Get neuropixel data type (ap, lf or nidq) from metadata
"""
snsApLfSy = md.get('snsApLfSy', [-1, -1, -1])
if snsApLfSy[0] == 0 and snsApLfSy[1] != 0:
return 'lf'
elif snsApLfSy[0] != 0 and snsApLfSy[1] == 0:
return 'ap'
elif snsApLfSy == [-1, -1, -1] and md.get('typeThis', None) == 'nidq':
return 'nidq'
def _geometry_from_meta(meta_data):
"""
Gets the geometry, ie. the full trace header for the recording
:param meta_data: meta_data dictionary as read by ibllib.io.spikeglx.read_meta_data
:return: dictionary with keys 'row', 'col', 'ind', 'shank', 'adc', 'x', 'y', 'sample_shift'
"""
cm = _map_channels_from_meta(meta_data)
major_version = _get_neuropixel_major_version_from_meta(meta_data)
if cm is None:
_logger.warning("Meta data doesn't have geometry (snsShankMap field), returning defaults")
th = neuropixel.trace_header(version=major_version)
th['flag'] = th['x'] * 0 + 1.
return th
th = cm.copy()
if major_version == 1:
# the spike sorting channel maps have a flipped version of the channel map
th['col'] = - cm['col'] * 2 + 2 + np.mod(cm['row'], 2)
th.update(neuropixel.rc2xy(th['row'], th['col'], version=major_version))
th['sample_shift'], th['adc'] = neuropixel.adc_shifts(version=major_version)
th['ind'] = np.arange(cm['col'].size)
return th
def _map_channels_from_meta(meta_data):
"""
Interpret the meta data string to extract an array of channel positions along the shank
:param meta_data: dictionary output from spikeglx.read_meta_data
:return: dictionary of arrays 'shank', 'col', 'row', 'flag', one value per active site
"""
if 'snsShankMap' in meta_data.keys():
chmap = re.findall(r'([0-9]*:[0-9]*:[0-9]*:[0-9]*)', meta_data['snsShankMap'])
# for digital nidq types, the key exists but does not contain any information
if not chmap:
return {'shank': None, 'col': None, 'row': None, 'flag': None}
# shank#, col#, row#, drawflag
# (nb: drawflag is one should be drawn and considered spatial average)
chmap = np.array([np.float32(cm.split(':')) for cm in chmap])
return {k: chmap[:, v] for (k, v) in {'shank': 0, 'col': 1, 'row': 2, 'flag': 3}.items()}
def _conversion_sample2v_from_meta(meta_data):
"""
Interpret the meta data to extract an array of conversion factors for each channel
so the output data is in Volts
Conversion factor is: int2volt / channelGain
For Lf/Ap interpret the gain string from metadata
For Nidq, repmat the gains from the trace counts in `snsMnMaXaDw`
:param meta_data: dictionary output from spikeglx.read_meta_data
:return: numpy array with one gain value per channel
"""
def int2volts(md):
""" :return: Conversion scalar to Volts. Needs to be combined with channel gains """
if md.get('typeThis', None) == 'imec':
if 'imMaxInt' in md:
return md.get('imAiRangeMax') / int(md['imMaxInt'])
else:
return md.get('imAiRangeMax') / 512
else:
return md.get('niAiRangeMax') / 32768
int2volt = int2volts(meta_data)
version = _get_neuropixel_version_from_meta(meta_data)
# interprets the gain value from the metadata header:
if 'imroTbl' in meta_data.keys(): # binary from the probes: ap or lf
sy_gain = np.ones(int(meta_data['snsApLfSy'][-1]), dtype=np.float32)
# imroTbl has 384 entries regardless of no of channels saved, so need to index by n_ch
# TODO need to look at snsSaveChanMap and index channels to get correct gain
n_chn = _get_nchannels_from_meta(meta_data) - len(_get_sync_trace_indices_from_meta(meta_data))
if 'NP2' in version:
# NP 2.0; APGain = 80 for all AP
# return 0 for LFgain (no LF channels)
out = {'lf': np.hstack((int2volt / 80 * np.ones(n_chn).astype(np.float32), sy_gain)),
'ap': np.hstack((int2volt / 80 * np.ones(n_chn).astype(np.float32), sy_gain))}
else:
# the sync traces are not included in the gain values, so are included for
# broadcast ops
gain = re.findall(r'([0-9]* [0-9]* [0-9]* [0-9]* [0-9]*)',
meta_data['imroTbl'])[:n_chn]
out = {'lf': np.hstack((np.array([1 / np.float32(g.split(' ')[-1]) for g in gain]) *
int2volt, sy_gain)),
'ap': np.hstack((np.array([1 / np.float32(g.split(' ')[-2]) for g in gain]) *
int2volt, sy_gain))}
# nidaq gain can be read in the same way regardless of NP1.0 or NP2.0
elif 'niMNGain' in meta_data.keys(): # binary from nidq
gain = np.r_[
np.ones(int(meta_data['snsMnMaXaDw'][0], )) / meta_data['niMNGain'] * int2volt,
np.ones(int(meta_data['snsMnMaXaDw'][1], )) / meta_data['niMAGain'] * int2volt,
np.ones(int(meta_data['snsMnMaXaDw'][2], )) * int2volt, # no gain for analog sync
np.ones(int(np.sum(meta_data['snsMnMaXaDw'][3]), ))] # no unit for digital sync
out = {'nidq': gain}
return out
[docs]def split_sync(sync_tr):
"""
The synchronization channels are stored as single bits, this will split the int16 original
channel into 16 single bits channels
:param sync_tr: numpy vector: samples of synchronisation trace
:return: int8 numpy array of 16 channels, 1 column per sync trace
"""
sync_tr = np.int16(np.copy(sync_tr))
out = np.unpackbits(sync_tr.view(np.uint8)).reshape(sync_tr.size, 16)
out = np.flip(np.roll(out, 8, axis=1), axis=1)
return np.int8(out)
[docs]def get_neuropixel_version_from_folder(session_path):
ephys_files = glob_ephys_files(session_path, ext='meta')
return get_neuropixel_version_from_files(ephys_files)
[docs]def get_neuropixel_version_from_files(ephys_files):
if any([ef.get('nidq') for ef in ephys_files]):
return '3B'
else:
return '3A'
[docs]def get_probes_from_folder(session_path):
# should glob the ephys files and get out the labels
# This assumes the meta files exist on the server (this is the case for now but should it be?)
ephys_files = glob_ephys_files(session_path, ext='meta')
probes = []
for files in ephys_files:
if files['label']:
probes.append(files['label'])
return probes
[docs]def glob_ephys_files(session_path, suffix='.meta', ext='bin', recursive=True, bin_exists=True):
"""
From an arbitrary folder (usually session folder) gets the ap and lf files and labels
Associated to the subfolders where they are
the expected folder tree is:
├── 3A
│ ├── imec0
│ ├── sync_testing_g0_t0.imec0.ap.bin
│ │ └── sync_testing_g0_t0.imec0.lf.bin
│ └── imec1
│ ├── sync_testing_g0_t0.imec1.ap.bin
│ └── sync_testing_g0_t0.imec1.lf.bin
└── 3B
├── sync_testing_g0_t0.nidq.bin
├── imec0
│ ├── sync_testing_g0_t0.imec0.ap.bin
│ └── sync_testing_g0_t0.imec0.lf.bin
└── imec1
├── sync_testing_g0_t0.imec1.ap.bin
└── sync_testing_g0_t0.imec1.lf.bin
:param bin_exists:
:param suffix:
:param ext: file extension to look for, default 'bin' but could also be 'meta' or 'ch'
:param recursive:
:param session_path: folder, string or pathlib.Path
:returns: a list of dictionaries with keys 'ap': apfile, 'lf': lffile and 'label'
"""
def get_label(raw_ephys_apfile):
if raw_ephys_apfile.parts[-2] != 'raw_ephys_data':
return raw_ephys_apfile.parts[-2]
else:
return ''
recurse = '**/' if recursive else ''
ephys_files = []
for raw_ephys_file in Path(session_path).glob(f'{recurse}*.ap*{suffix}'):
raw_ephys_apfile = next(raw_ephys_file.parent.glob(raw_ephys_file.stem + f'.*{ext}'), None)
if not raw_ephys_apfile and bin_exists:
continue
elif not raw_ephys_apfile and ext != 'bin':
continue
elif not bin_exists and ext == 'bin':
raw_ephys_apfile = raw_ephys_file.with_suffix('.bin')
# first get the ap file
ephys_files.extend([Bunch({'label': None, 'ap': None, 'lf': None, 'path': None})])
ephys_files[-1].ap = raw_ephys_apfile
# then get the corresponding lf file if it exists
lf_file = raw_ephys_apfile.parent / raw_ephys_apfile.name.replace('.ap.', '.lf.')
ephys_files[-1].lf = next(lf_file.parent.glob(lf_file.stem + f'.*{ext}'), None)
# finally, the label is the current directory except if it is bare in raw_ephys_data
ephys_files[-1].label = get_label(raw_ephys_apfile)
ephys_files[-1].path = raw_ephys_apfile.parent
# for 3b probes, need also to get the nidq dataset type
for raw_ephys_file in Path(session_path).rglob(f'{recurse}*.nidq*{suffix}'):
raw_ephys_nidqfile = next(raw_ephys_file.parent.glob(raw_ephys_file.stem + f'.*{ext}'),
None)
if not bin_exists and ext == 'bin':
raw_ephys_nidqfile = raw_ephys_file.with_suffix('.bin')
ephys_files.extend([Bunch({'label': get_label(raw_ephys_file),
'nidq': raw_ephys_nidqfile,
'path': raw_ephys_file.parent})])
return ephys_files
def _mock_spikeglx_file(mock_bin_file, meta_file, ns, nc, sync_depth,
random=False, int2volts=0.6 / 32768, corrupt=False):
"""
For testing purposes, create a binary file with sync pulses to test reading and extraction
"""
meta_file = Path(meta_file)
mock_path_bin = Path(mock_bin_file)
mock_path_meta = mock_path_bin.with_suffix('.meta')
md = read_meta_data(meta_file)
assert meta_file != mock_path_meta
fs = _get_fs_from_meta(md)
fid_source = open(meta_file)
fid_target = open(mock_path_meta, 'w+')
line = fid_source.readline()
while line:
line = fid_source.readline()
if line.startswith('fileSizeBytes'):
line = f'fileSizeBytes={ns * nc * 2}\n'
if line.startswith('fileTimeSecs'):
if corrupt:
line = f'fileTimeSecs={ns / fs + 1.8324}\n'
else:
line = f'fileTimeSecs={ns / fs}\n'
fid_target.write(line)
fid_source.close()
fid_target.close()
if random:
D = np.random.randint(-32767, 32767, size=(ns, nc), dtype=np.int16)
else: # each channel as an int of chn + 1
D = np.tile(np.int16((np.arange(nc) + 1) / int2volts), (ns, 1))
D[0:16, :] = 0
# the last channel is the sync that we fill with
sync = np.int16(2 ** np.float32(np.arange(-1, sync_depth)))
D[:, -1] = 0
D[:sync.size, -1] = sync
with open(mock_path_bin, 'w+') as fid:
D.tofile(fid)
return {'bin_file': mock_path_bin, 'ns': ns, 'nc': nc, 'sync_depth': sync_depth, 'D': D}
[docs]def get_hardware_config(config_file):
"""
Reads the neuropixel_wirings.json file containing sync mapping and parameters
:param config_file: folder or json file
:return: dictionary or None
"""
config_file = Path(config_file)
if config_file.is_dir():
config_file = list(config_file.glob('*.wiring*.json'))
if config_file:
config_file = config_file[0]
if not config_file or not config_file.exists():
return
with open(config_file) as fid:
par = json.loads(fid.read())
return par
def _sync_map_from_hardware_config(hardware_config):
"""
:param hardware_config: dictonary from json read of neuropixel_wirings.json
:return: dictionary where key names refer to object and values to sync channel index
"""
pin_out = neuropixel.SYNC_PIN_OUT[hardware_config['SYSTEM']]
sync_map = {hardware_config['SYNC_WIRING_DIGITAL'][pin]: pin_out[pin]
for pin in hardware_config['SYNC_WIRING_DIGITAL']
if pin_out[pin] is not None}
analog = hardware_config.get('SYNC_WIRING_ANALOG')
if analog:
sync_map.update({analog[pin]: int(pin[2:]) + 16 for pin in analog})
return sync_map
[docs]def get_sync_map(folder_ephys):
hc = get_hardware_config(folder_ephys)
if not hc:
_logger.warning(f"No channel map for {str(folder_ephys)}")
return None
else:
return _sync_map_from_hardware_config(hc)
[docs]def download_raw_partial(url_cbin, url_ch, first_chunk=0, last_chunk=0, one=None, cache_dir=None):
"""
TODO Document
:param url_cbin:
:param url_ch:
:param first_chunk:
:param last_chunk:
:return:
"""
assert str(url_cbin).endswith('.cbin')
assert str(url_ch).endswith('.ch')
one = one or ONE()
webclient = one.alyx
cache_dir = cache_dir or webclient.cache_dir
relpath = Path(url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0]
# write the temp file into a subdirectory
tdir_chunk = f"chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}"
target_dir = Path(cache_dir, relpath, tdir_chunk)
Path(target_dir).mkdir(parents=True, exist_ok=True)
# First, download the .ch file if necessary
if isinstance(url_ch, Path):
ch_file = url_ch
else:
ch_file = Path(webclient.download_file(
url_ch, target_dir=target_dir, clobber=True, return_md5=False))
ch_file = remove_uuid_file(ch_file)
ch_file_stream = target_dir.joinpath(ch_file.name).with_suffix('.stream.ch')
# Load the .ch file.
with open(ch_file, 'r') as f:
cmeta = json.load(f)
# Get the first sample index, and the number of samples to download.
i0 = cmeta['chunk_bounds'][first_chunk]
ns_stream = cmeta['chunk_bounds'][last_chunk + 1] - i0
total_samples = cmeta['chunk_bounds'][-1]
# handles the meta file
meta_local_path = ch_file_stream.with_suffix('.meta')
if not meta_local_path.exists():
shutil.copy(ch_file.with_suffix('.meta'), meta_local_path)
# if the cached version happens to be the same as the one on disk, just load it
if ch_file_stream.exists():
with open(ch_file_stream, 'r') as f:
cmeta_stream = json.load(f)
if (cmeta_stream.get('chopped_first_sample', None) == i0 and
cmeta_stream.get('chopped_total_samples', None) == total_samples):
return Reader(ch_file_stream.with_suffix('.cbin'))
else:
shutil.copy(ch_file, ch_file_stream)
assert ch_file_stream.exists()
# prepare the metadata file
cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk:last_chunk + 2]
cmeta['chunk_bounds'] = [_ - i0 for _ in cmeta['chunk_bounds']]
assert len(cmeta['chunk_bounds']) >= 2
assert cmeta['chunk_bounds'][0] == 0
first_byte = cmeta['chunk_offsets'][first_chunk]
cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk:last_chunk + 2]
cmeta['chunk_offsets'] = [_ - first_byte for _ in cmeta['chunk_offsets']]
assert len(cmeta['chunk_offsets']) >= 2
assert cmeta['chunk_offsets'][0] == 0
n_bytes = cmeta['chunk_offsets'][-1]
assert n_bytes > 0
# Save the chopped chunk bounds and offsets.
cmeta['sha1_compressed'] = None
cmeta['sha1_uncompressed'] = None
cmeta['chopped'] = True
cmeta['chopped_first_sample'] = i0
cmeta['chopped_samples'] = ns_stream
cmeta['chopped_total_samples'] = total_samples
with open(ch_file_stream, 'w') as f:
json.dump(cmeta, f, indent=2, sort_keys=True)
# Download the requested chunks
cbin_local_path = webclient.download_file(
url_cbin, chunks=(first_byte, n_bytes),
target_dir=target_dir, clobber=True, return_md5=False)
cbin_local_path = remove_uuid_file(cbin_local_path)
cbin_local_path_renamed = cbin_local_path.with_suffix('.stream.cbin')
cbin_local_path.replace(cbin_local_path_renamed)
assert cbin_local_path_renamed.exists()
reader = Reader(cbin_local_path_renamed)
return reader