"""Miscellaneous pipeline utility functions."""
import ctypes
import os
import re
import shutil
import logging
from functools import wraps
from pathlib import Path
from typing import Union, List, Callable, Any
import spikeglx
from one.alf.spec import is_uuid_string
from one.api import ONE
from ibllib.io.misc import delete_empty_folders
log = logging.getLogger(__name__)
DEVICE_FLAG_MAP = {'neuropixel': 'ephys',
'cameras': 'video',
'widefield': 'widefield',
'sync': 'sync'}
[docs]
def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]:
"""
Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder
name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned
If there is a NP2.4 probe with several shanks, create several probes
:param session_path:
:return: list of strings
"""
plabels = []
raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data')
for meta_file in raw_ephys_folder.rglob('*.ap.meta'):
if meta_file.parents[1] != raw_ephys_folder:
continue
meta = spikeglx.read_meta_data(meta_file)
nshanks = spikeglx._get_nshanks_from_meta(meta)
if nshanks > 1:
for i in range(nshanks):
plabels.append(meta_file.parts[-2] + 'abcdefghij'[i])
else:
plabels.append(meta_file.parts[-2])
plabels.sort()
return plabels
[docs]
def create_alyx_probe_insertions(
session_path: str,
force: bool = False,
one: object = None,
model: str = None,
labels: list = None,
):
if one is None:
one = ONE(cache_rest=None, mode='local')
eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path)
if eid is None:
log.warning("Session not found on Alyx: please create session before creating insertions")
if model is None:
probe_model = spikeglx.get_neuropixel_version_from_folder(session_path)
pmodel = "3B2" if probe_model == "3B" else probe_model
else:
pmodel = model
labels = labels or probe_labels_from_session_path(session_path)
# create the qc fields in the json field
qc_dict = {}
qc_dict.update({"qc": "NOT_SET"})
qc_dict.update({"extended_qc": {}})
# create the dictionary
insertions = []
for plabel in labels:
insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict}
# search for the corresponding insertion in Alyx
alyx_insertion = one.alyx.get(f'/insertions?&session={eid}&name={plabel}', clobber=True)
# if it doesn't exist, create it
if len(alyx_insertion) == 0:
alyx_insertion = one.alyx.rest("insertions", "create", data=insdict)
else:
iid = alyx_insertion[0]["id"]
if force:
alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict)
else:
alyx_insertion = alyx_insertion[0]
insertions.append(alyx_insertion)
return insertions
[docs]
def rename_ephys_files(session_folder: str) -> None:
"""rename_ephys_files is system agnostic (3A, 3B1, 3B2).
Renames all ephys files to Alyx compatible filenames. Uses get_new_filename.
:param session_folder: Session folder path
:type session_folder: str
:return: None - Changes names of files on filesystem
:rtype: None
"""
session_path = Path(session_folder)
ap_files = session_path.rglob("*.ap.*")
lf_files = session_path.rglob("*.lf.*")
nidq_files = session_path.rglob("*.nidq.*")
for apf in ap_files:
new_filename = get_new_filename(apf.name)
shutil.move(str(apf), str(apf.parent / new_filename))
for lff in lf_files:
new_filename = get_new_filename(lff.name)
shutil.move(str(lff), str(lff.parent / new_filename))
for nidqf in nidq_files:
# Ignore wiring files: these are usually created after the file renaming however this
# function may be called a second time upon failed transfer.
if 'wiring' in nidqf.name:
continue
new_filename = get_new_filename(nidqf.name)
shutil.move(str(nidqf), str(nidqf.parent / new_filename))
[docs]
def get_new_filename(filename: str) -> str:
"""get_new_filename is system agnostic (3A, 3B1, 3B2).
Gets an alyx compatible filename from any spikeglx ephys file.
:param filename: Name of an ephys file
:return: New name for ephys file
"""
root = "_spikeglx_ephysData"
parts = filename.split('.')
if len(parts) < 3:
raise ValueError(fr'unrecognized filename "{filename}"')
pattern = r'.*(?P<gt>_g\d+_t\d+)'
if not (match := re.match(pattern, parts[0])):
raise ValueError(fr'unrecognized filename "{filename}"')
return '.'.join([root + match.group(1), *parts[1:]])
[docs]
def move_ephys_files(session_folder: str) -> None:
"""move_ephys_files is system agnostic (3A, 3B1, 3B2).
Moves all properly named ephys files to appropriate locations for transfer.
Use rename_ephys_files function before this one.
:param session_folder: Session folder path
:type session_folder: str
:return: None - Moves files on filesystem
:rtype: None
"""
session_path = Path(session_folder)
raw_ephys_data_path = session_path / "raw_ephys_data"
imec_files = session_path.rglob("*.imec*")
for imf in imec_files:
# For 3B system probe0x == imecx
probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name)
if not probe_number:
# For 3A system imec files must be in a 'probexx' folder
probe_label = re.search(r'probe\d+', str(imf))
assert probe_label, f'Cannot assign probe number to file {imf}'
probe_label = probe_label.group()
else:
probe_number, = probe_number.groups()
probe_label = f'probe{probe_number.zfill(2)}'
raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True)
shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name))
# NIDAq files (3B system only)
nidq_files = session_path.rglob("*.nidq.*")
for nidqf in nidq_files:
shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name))
# Delete all empty folders recursively
delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True)
[docs]
def get_iblscripts_folder():
return str(Path().cwd().parent.parent)
[docs]
class WindowsInhibitor:
"""Prevent OS sleep/hibernate in windows; code from:
https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py
API documentation:
https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx"""
ES_CONTINUOUS = 0x80000000
ES_SYSTEM_REQUIRED = 0x00000001
@staticmethod
def _set_thread_execution_state(state: int) -> None:
result = ctypes.windll.kernel32.SetThreadExecutionState(state)
if result == 0:
log.error("Failed to set thread execution state.")
[docs]
@staticmethod
def inhibit(quiet: bool = False):
if quiet:
log.debug("Preventing Windows from going to sleep")
else:
print("Preventing Windows from going to sleep")
WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED)
[docs]
@staticmethod
def uninhibit(quiet: bool = False):
if quiet:
log.debug("Allowing Windows to go to sleep")
else:
print("Allowing Windows to go to sleep")
WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS)
[docs]
def sleepless(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task.
This decorator wraps a function and sets the thread execution state to prevent
the system from entering sleep or idle mode while the decorated function is
running.
Parameters
----------
func : callable
The function to decorate.
Returns
-------
callable
The decorated function.
"""
@wraps(func)
def inner(*args, **kwargs) -> Any:
if os.name == 'nt':
WindowsInhibitor().inhibit(quiet=True)
result = func(*args, **kwargs)
if os.name == 'nt':
WindowsInhibitor().uninhibit(quiet=True)
return result
return inner