Source code for iblutil.util

from itertools import takewhile
from pathlib import Path
import collections
import colorlog
import copy
import logging
import sys

import numpy as np

log = logging.getLogger('__name__')

LOG_FORMAT_STR = u'%(asctime)s %(levelname)-8s %(filename)s:%(lineno)-4d %(message)s'
LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
LOG_COLORS = {
    'DEBUG': 'green',
    'INFO': 'cyan',
    'WARNING': 'bold_yellow',
    'ERROR': 'bold_red',
    'CRITICAL': 'bold_purple'}


[docs] class Bunch(dict): """A subclass of dictionary with an additional dot syntax.""" def __init__(self, *args, **kwargs): super(Bunch, self).__init__(*args, **kwargs) self.__dict__ = self
[docs] def copy(self, deep=False): """Return a new Bunch instance which is a copy of the current Bunch instance. Parameters ---------- deep : bool If True perform a deep copy (see notes). By default a shallow copy is returned. Returns ------- Bunch A new copy of the Bunch. Notes ----- - A shallow copy constructs a new Bunch object and then (to the extent possible) inserts references into it to the objects found in the original. - A deep copy constructs a new Bunch and then, recursively, inserts copies into it of the objects found in the original. """ return copy.deepcopy(self) if deep else Bunch(super(Bunch, self).copy())
[docs] def save(self, npz_file, compress=False): """ Saves a npz file containing the arrays of the bunch. :param npz_file: output file :param compress: bool (False) use compression :return: None """ if compress: np.savez_compressed(npz_file, **self) else: np.savez(npz_file, **self)
[docs] @staticmethod def load(npz_file): """ Loads a npz file containing the arrays of the bunch. :param npz_file: output file :return: Bunch """ if not Path(npz_file).exists(): raise FileNotFoundError(f"{npz_file}") return Bunch(np.load(npz_file))
def _iflatten(x): result = [] for el in x: if isinstance(el, collections.abc.Iterable) and not ( isinstance(el, str) or isinstance(el, dict)): result.extend(_iflatten(el)) else: result.append(el) return result def _gflatten(x): def iselement(e): return not (isinstance(e, collections.abc.Iterable) and not ( isinstance(el, str) or isinstance(el, dict))) for el in x: if iselement(el): yield el else: yield from _gflatten(el)
[docs] def flatten(x, generator=False): """ Flatten a nested Iterable excluding strings and dicts. Converts nested Iterable into flat list. Will not iterate through strings or dicts. :return: Flattened list or generator object. :rtype: list or generator """ return _gflatten(x) if generator else _iflatten(x)
[docs] def range_str(values: iter) -> str: """ Given a list of integers, returns a terse string expressing the unique values. Example: indices = [0, 1, 2, 3, 4, 7, 8, 11, 15, 20] range_str(indices) >> '0-4, 7-8, 11, 15 & 20' :param values: An iterable of ints :return: A string of unique value ranges """ trial_str = '' values = list(set(values)) for i in range(len(values)): if i == 0: trial_str += str(values[i]) elif values[i] - (values[i - 1]) == 1: if i == len(values) - 1 or values[i + 1] - values[i] > 1: trial_str += f'-{values[i]}' else: trial_str += f', {values[i]}' # Replace final comma with an ampersand k = trial_str.rfind(',') if k > -1: trial_str = f'{trial_str[:k]} &{trial_str[k + 1:]}' return trial_str
[docs] def setup_logger(name='ibl', level=logging.NOTSET, file=None, no_color=False): """Set up a log for IBL packages. Uses date time, calling function and distinct colours for levels. Sets the name if not set already and add a stream handler. If the stream handler already exists, does not duplicate. The naming/level allows not to interfere with third-party libraries when setting level. Parameters ---------- name : str Log name, should be set to the root package name for consistent logging throughout the app. level : str, int The logging level (defaults to NOTSET, which inherits the parent log level) file : bool, str, pathlib.Path If True, a file handler is added with the default file location, otherwise a log file path may be passed. no_color : bool If true the colour log is deactivated. May be useful when directing the std out to a file. Returns ------- logging.Logger, logging.RootLogger The configured log. """ log = logging.getLogger() if not name else logging.getLogger(name) log.setLevel(level) fkwargs = {'no_color': True} if no_color else {'log_colors': LOG_COLORS} # check existence of stream handlers before adding another if not any(map(lambda x: x.name == f'{name}_auto', log.handlers)): # need to remove any previous default Stream handler configured on stderr # to not duplicate output for h in log.handlers: if isinstance(h, logging.StreamHandler) and h.stream.name == '<stderr>' and h.level == 0 and h.name is None: log.removeHandler(h) stream_handler = logging.StreamHandler(stream=sys.stdout) stream_handler.setFormatter( colorlog.ColoredFormatter('%(log_color)s' + LOG_FORMAT_STR, LOG_DATE_FORMAT, **fkwargs)) stream_handler.name = f'{name}_auto' log.addHandler(stream_handler) # add the file handler if requested, but check for duplicates if not any(map(lambda x: x.name == f'{name}_file', log.handlers)): if file is True: log_to_file(log=name) elif file: log_to_file(filename=file, log=name) return log
[docs] def log_to_file(log='ibl', filename=None): """ Save log information to a given filename in '.ibl_logs' folder (in home directory). Parameters ---------- log : str, logging.Logger The log (name or object) to add file handler to. filename : str, Pathlib.Path The name of the log file to save to. Returns ------- logging.Logger The log with the file handler attached. """ if isinstance(log, str): log = logging.getLogger(log) if filename is None: filename = Path.home().joinpath('.ibl_logs', log.name) elif not Path(filename).is_absolute(): filename = Path.home().joinpath('.ibl_logs', filename) filename.parent.mkdir(exist_ok=True) file_handler = logging.FileHandler(filename, encoding='utf-8') file_format = logging.Formatter(LOG_FORMAT_STR, LOG_DATE_FORMAT) file_handler.setFormatter(file_format) file_handler.name = f'{log.name}_file' log.addHandler(file_handler) log.info(f'File log initiated {file_handler.name}') return log
[docs] def rrmdir(folder: Path, levels: int = 0): """ Recursively remove a folder and its parents up to a defined level - if they are empty. Parameters ---------- folder : pathlib.Path The path to a folder at which to start the recursion. levels : int Recursion level, i.e. the number of parents to delete, relative to `folder`. Defaults to 0 - which has the same effect as `pathlib.Path.rmdir` except that it won't raise an OSError if the directory is not empty. Returns ------- list of pathlib.Path A list of folders that were recursively removed. Raises ------ FileNotFoundError If `folder` does not exist. PermissionError Insufficient privileges or folder in use by another process. NotADirectoryError The folder provided is most likely a file. """ try: # a sorted list of absolute nested folder paths to_remove = (folder, *folder.parents[:levels]) # py >= 3.9 except TypeError: # py <= 3.8 compatible to_remove = (folder, *[folder.parents[n] for n in range(levels)]) # filter list to those that are empty; if statement always true as rmdir returns None return [f for f in takewhile(lambda f: not any(f.iterdir()), to_remove) if not f.rmdir()]