Source code for ibllib.pipes.misc

"""Miscellaneous pipeline utility functions."""
import ctypes
import os
import re
import shutil
import logging
from functools import wraps
from pathlib import Path
from typing import Union, List, Callable, Any

import spikeglx
from one.alf.spec import is_uuid_string
from one.api import ONE

from ibllib.io.misc import delete_empty_folders

log = logging.getLogger(__name__)

DEVICE_FLAG_MAP = {'neuropixel': 'ephys',
                   'cameras': 'video',
                   'widefield': 'widefield',
                   'sync': 'sync'}


[docs] def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]: """ Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned If there is a NP2.4 probe with several shanks, create several probes :param session_path: :return: list of strings """ plabels = [] raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data') for meta_file in raw_ephys_folder.rglob('*.ap.meta'): if meta_file.parents[1] != raw_ephys_folder: continue meta = spikeglx.read_meta_data(meta_file) nshanks = spikeglx._get_nshanks_from_meta(meta) if nshanks > 1: for i in range(nshanks): plabels.append(meta_file.parts[-2] + 'abcdefghij'[i]) else: plabels.append(meta_file.parts[-2]) plabels.sort() return plabels
[docs] def create_alyx_probe_insertions( session_path: str, force: bool = False, one: object = None, model: str = None, labels: list = None, ): if one is None: one = ONE(cache_rest=None, mode='local') eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path) if eid is None: log.warning("Session not found on Alyx: please create session before creating insertions") if model is None: probe_model = spikeglx.get_neuropixel_version_from_folder(session_path) pmodel = "3B2" if probe_model == "3B" else probe_model else: pmodel = model labels = labels or probe_labels_from_session_path(session_path) # create the qc fields in the json field qc_dict = {} qc_dict.update({"qc": "NOT_SET"}) qc_dict.update({"extended_qc": {}}) # create the dictionary insertions = [] for plabel in labels: insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict} # search for the corresponding insertion in Alyx alyx_insertion = one.alyx.get(f'/insertions?&session={eid}&name={plabel}', clobber=True) # if it doesn't exist, create it if len(alyx_insertion) == 0: alyx_insertion = one.alyx.rest("insertions", "create", data=insdict) else: iid = alyx_insertion[0]["id"] if force: alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict) else: alyx_insertion = alyx_insertion[0] insertions.append(alyx_insertion) return insertions
[docs] def rename_ephys_files(session_folder: str) -> None: """rename_ephys_files is system agnostic (3A, 3B1, 3B2). Renames all ephys files to Alyx compatible filenames. Uses get_new_filename. :param session_folder: Session folder path :type session_folder: str :return: None - Changes names of files on filesystem :rtype: None """ session_path = Path(session_folder) ap_files = session_path.rglob("*.ap.*") lf_files = session_path.rglob("*.lf.*") nidq_files = session_path.rglob("*.nidq.*") for apf in ap_files: new_filename = get_new_filename(apf.name) shutil.move(str(apf), str(apf.parent / new_filename)) for lff in lf_files: new_filename = get_new_filename(lff.name) shutil.move(str(lff), str(lff.parent / new_filename)) for nidqf in nidq_files: # Ignore wiring files: these are usually created after the file renaming however this # function may be called a second time upon failed transfer. if 'wiring' in nidqf.name: continue new_filename = get_new_filename(nidqf.name) shutil.move(str(nidqf), str(nidqf.parent / new_filename))
[docs] def get_new_filename(filename: str) -> str: """get_new_filename is system agnostic (3A, 3B1, 3B2). Gets an alyx compatible filename from any spikeglx ephys file. :param filename: Name of an ephys file :return: New name for ephys file """ root = "_spikeglx_ephysData" parts = filename.split('.') if len(parts) < 3: raise ValueError(fr'unrecognized filename "{filename}"') pattern = r'.*(?P<gt>_g\d+_t\d+)' if not (match := re.match(pattern, parts[0])): raise ValueError(fr'unrecognized filename "{filename}"') return '.'.join([root + match.group(1), *parts[1:]])
[docs] def move_ephys_files(session_folder: str) -> None: """move_ephys_files is system agnostic (3A, 3B1, 3B2). Moves all properly named ephys files to appropriate locations for transfer. Use rename_ephys_files function before this one. :param session_folder: Session folder path :type session_folder: str :return: None - Moves files on filesystem :rtype: None """ session_path = Path(session_folder) raw_ephys_data_path = session_path / "raw_ephys_data" imec_files = session_path.rglob("*.imec*") for imf in imec_files: # For 3B system probe0x == imecx probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name) if not probe_number: # For 3A system imec files must be in a 'probexx' folder probe_label = re.search(r'probe\d+', str(imf)) assert probe_label, f'Cannot assign probe number to file {imf}' probe_label = probe_label.group() else: probe_number, = probe_number.groups() probe_label = f'probe{probe_number.zfill(2)}' raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True) shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name)) # NIDAq files (3B system only) nidq_files = session_path.rglob("*.nidq.*") for nidqf in nidq_files: shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name)) # Delete all empty folders recursively delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True)
[docs] def get_iblscripts_folder(): return str(Path().cwd().parent.parent)
[docs] class WindowsInhibitor: """Prevent OS sleep/hibernate in windows; code from: https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py API documentation: https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx""" ES_CONTINUOUS = 0x80000000 ES_SYSTEM_REQUIRED = 0x00000001 @staticmethod def _set_thread_execution_state(state: int) -> None: result = ctypes.windll.kernel32.SetThreadExecutionState(state) if result == 0: log.error("Failed to set thread execution state.")
[docs] @staticmethod def inhibit(quiet: bool = False): if quiet: log.debug("Preventing Windows from going to sleep") else: print("Preventing Windows from going to sleep") WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED)
[docs] @staticmethod def uninhibit(quiet: bool = False): if quiet: log.debug("Allowing Windows to go to sleep") else: print("Allowing Windows to go to sleep") WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS)
[docs] def sleepless(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task. This decorator wraps a function and sets the thread execution state to prevent the system from entering sleep or idle mode while the decorated function is running. Parameters ---------- func : callable The function to decorate. Returns ------- callable The decorated function. """ @wraps(func) def inner(*args, **kwargs) -> Any: if os.name == 'nt': WindowsInhibitor().inhibit(quiet=True) result = func(*args, **kwargs) if os.name == 'nt': WindowsInhibitor().uninhibit(quiet=True) return result return inner