import shutil
import logging
from pathlib import Path
import time
import json
import string
import random
import numpy as np
from one.alf.path import remove_uuid_string
import spikeglx
_logger = logging.getLogger('ibllib')
[docs]
class Streamer(spikeglx.Reader):
"""
pid = 'e31b4e39-e350-47a9-aca4-72496d99ff2a'
one = ONE()
sr = Streamer(pid=pid, one=one)
raw_voltage = sr[int(t0 * sr.fs):int((t0 + nsecs) * sr.fs), :]
"""
def __init__(self, pid, one, typ='ap', cache_folder=None, remove_cached=False):
self.target_dir = None # last chunk directory download or read
self.one = one
self.pid = pid
self.cache_folder = cache_folder or Path(self.one.alyx._par.CACHE_DIR).joinpath('cache', typ)
self.remove_cached = remove_cached
self.eid, self.pname = self.one.pid2eid(pid)
self.file_chunks = self.one.load_dataset(self.eid, f'*.{typ}.ch', collection=f"*{self.pname}")
meta_file = self.one.load_dataset(self.eid, f'*.{typ}.meta', collection=f"*{self.pname}")
cbin_rec = self.one.list_datasets(self.eid, collection=f"*{self.pname}", filename=f'*{typ}.*bin', details=True)
cbin_rec.index = cbin_rec.index.map(lambda x: (self.eid, x))
self.url_cbin = self.one.record2url(cbin_rec)[0]
with open(self.file_chunks, 'r') as f:
self.chunks = json.load(f)
self.chunks['chunk_bounds'] = np.array(self.chunks['chunk_bounds'])
super(Streamer, self).__init__(meta_file, ignore_warnings=True)
[docs]
def read(self, nsel=slice(0, 10000), csel=slice(None), sync=True, volts=True):
"""
overload the read function by downloading the necessary chunks
"""
first_chunk = np.maximum(0, np.searchsorted(self.chunks['chunk_bounds'], nsel.start) - 1)
last_chunk = np.maximum(0, np.searchsorted(self.chunks['chunk_bounds'], nsel.stop) - 1)
n0 = self.chunks['chunk_bounds'][first_chunk]
_logger.debug(f'Streamer: caching sample {n0}, (t={n0 / self.fs})')
self.cache_folder.mkdir(exist_ok=True, parents=True)
sr, file_cbin = self._download_raw_partial(first_chunk=first_chunk, last_chunk=last_chunk)
if not volts:
data = np.copy(sr._raw[nsel.start - n0:nsel.stop - n0, csel])
else:
data = sr[nsel.start - n0: nsel.stop - n0, csel]
sr.close()
if self.remove_cached:
shutil.rmtree(self.target_dir)
return data
def _download_raw_partial(self, first_chunk=0, last_chunk=0):
"""
downloads one or several chunks of a mtscomp file and copy ch files and metadata to return
a spikeglx.Reader object
:param first_chunk:
:param last_chunk:
:return: spikeglx.Reader of the current chunk, Pathlib.Path of the directory where it is stored
:return: cbin local path
"""
assert str(self.url_cbin).endswith('.cbin')
webclient = self.one.alyx
relpath = Path(self.url_cbin.replace(webclient._par.HTTP_DATA_SERVER, '.')).parents[0]
# write the temp file into a subdirectory
tdir_chunk = f"chunk_{str(first_chunk).zfill(6)}_to_{str(last_chunk).zfill(6)}"
# for parallel processes, there is a risk of collisions if the removed cached flag is set to True
# if the folder is to be removed append a unique identifier to avoid having duplicate names
if self.remove_cached:
tdir_chunk += ''.join([random.choice(string.ascii_letters) for _ in np.arange(10)])
self.target_dir = Path(self.cache_folder, relpath, tdir_chunk)
Path(self.target_dir).mkdir(parents=True, exist_ok=True)
ch_file_stream = self.target_dir.joinpath(self.file_chunks.name).with_suffix('.stream.ch')
# Get the first sample index, and the number of samples to download.
i0 = self.chunks['chunk_bounds'][first_chunk]
ns_stream = self.chunks['chunk_bounds'][last_chunk + 1] - i0
total_samples = self.chunks['chunk_bounds'][-1]
# handles the meta file
meta_local_path = ch_file_stream.with_suffix('.meta')
if not meta_local_path.exists():
shutil.copy(self.file_chunks.with_suffix('.meta'), meta_local_path)
# if the cached version happens to be the same as the one on disk, just load it
if ch_file_stream.exists() and ch_file_stream.with_suffix('.cbin').exists():
with open(ch_file_stream, 'r') as f:
cmeta_stream = json.load(f)
if (cmeta_stream.get('chopped_first_sample', None) == i0 and
cmeta_stream.get('chopped_total_samples', None) == total_samples):
return spikeglx.Reader(ch_file_stream.with_suffix('.cbin'), ignore_warnings=True), ch_file_stream
else:
shutil.copy(self.file_chunks, ch_file_stream)
assert ch_file_stream.exists()
cmeta = self.chunks.copy()
# prepare the metadata file
cmeta['chunk_bounds'] = cmeta['chunk_bounds'][first_chunk:last_chunk + 2]
cmeta['chunk_bounds'] = [int(_ - i0) for _ in cmeta['chunk_bounds']]
assert len(cmeta['chunk_bounds']) >= 2
assert cmeta['chunk_bounds'][0] == 0
first_byte = cmeta['chunk_offsets'][first_chunk]
cmeta['chunk_offsets'] = cmeta['chunk_offsets'][first_chunk:last_chunk + 2]
cmeta['chunk_offsets'] = [_ - first_byte for _ in cmeta['chunk_offsets']]
assert len(cmeta['chunk_offsets']) >= 2
assert cmeta['chunk_offsets'][0] == 0
n_bytes = cmeta['chunk_offsets'][-1]
assert n_bytes > 0
# Save the chopped chunk bounds and offsets.
cmeta['sha1_compressed'] = None
cmeta['sha1_uncompressed'] = None
cmeta['chopped'] = True
cmeta['chopped_first_sample'] = int(i0)
cmeta['chopped_samples'] = int(ns_stream)
cmeta['chopped_total_samples'] = int(total_samples)
with open(ch_file_stream, 'w') as f:
json.dump(cmeta, f, indent=2, sort_keys=True)
# Download the requested chunks
retries = 0
while True:
try:
cbin_local_path = webclient.download_file(
self.url_cbin, chunks=(first_byte, n_bytes),
target_dir=self.target_dir, clobber=True, return_md5=False)
break
except Exception as e:
retries += 1
if retries > 5:
raise e
_logger.warning(f'Failed to download chunk {first_chunk} to {last_chunk}, retrying')
time.sleep(1)
cbin_local_path_renamed = remove_uuid_string(cbin_local_path).with_suffix('.stream.cbin')
cbin_local_path.replace(cbin_local_path_renamed)
assert cbin_local_path_renamed.exists()
reader = spikeglx.Reader(cbin_local_path_renamed, ignore_warnings=True)
return reader, cbin_local_path