Source code for iblutil.io.params

from pathlib import Path, PurePath
from datetime import datetime
import collections
import sys
import os
import json
import subprocess
import logging
import time
import socket
import asyncio
from math import inf


[docs] def as_dict(par): if not par or isinstance(par, dict): return par else: return dict(par._asdict())
[docs] def from_dict(par_dict): if not par_dict: return None par = collections.namedtuple('Params', par_dict.keys()) class IBLParams(par): __slots__ = () def set(self, field, value): d = as_dict(self) d[field] = value return from_dict(d) def as_dict(self): return as_dict(self) return IBLParams(**par_dict)
[docs] def getfile(str_params): """ Returns full path of the param file per system convention: linux/mac: ~/.str_params, Windows: APPDATA folder :param str_params: string that identifies parm file :return: string of full path """ # strips already existing dot if any parts = ['.' + p if not p.startswith('.') else p for p in Path(str_params).parts] if sys.platform == 'win32' or sys.platform == 'cygwin': pfile = str(PurePath(os.environ['APPDATA'], *parts)) else: pfile = str(Path.home().joinpath(*parts)) return pfile
[docs] def set_hidden(path, hide: bool) -> Path: """ Set a given file or folder path to be hidden. On macOS and Windows a specific flag is set, while on other systems the file or folder is simply renamed to start with a dot. On macOS the folder may only be hidden in Explorer. Parameters ---------- path : str, pathlib.Path The path of the file or folder to (un)hide. hide : bool If True the path is set to hidden, otherwise it is unhidden. Returns ------- pathlib.Path The path of the file or folder, which may have been renamed. """ path = Path(path) assert path.exists() if sys.platform == 'win32' or sys.platform == 'cygwin': flag = ('+' if hide else '-') + 'H' subprocess.run(['attrib', flag, str(path)]).check_returncode() elif sys.platform == 'darwin': flag = ('' if hide else 'no') + 'hidden' subprocess.run(['chflags', flag, str(path)]).check_returncode() elif hide and not path.name.startswith('.'): path = path.rename(path.parent.joinpath('.' + path.name)) elif not hide and path.name.startswith('.'): path = path.rename(path.parent.joinpath(path.name[1:])) return path
[docs] def read(str_params, default=None): """ Reads in and parse Json parameter file into dictionary. If the parameter file doesn't exist and no defaults are provided, a FileNotFound error is raised, otherwise any extra default parameters will be written into the file. Examples: # Load parameters, raise error if file not found par = read('globus/admin') # Load with defaults par = read('globus/admin', {'local_endpoint': None, 'remote_endpoint': None}) # Return empty dict if file not found (i.e. touch new param file) par = read('new_pars', {}) :param str_params: path to text json file :param default: default values for missing parameters :return: named tuple containing parameters """ pfile = getfile(str_params) par_dict = as_dict(default) or {} if Path(pfile).exists(): with open(pfile) as fil: file_pars = json.loads(fil.read()) par_dict.update(file_pars) elif default is None: # No defaults provided raise FileNotFoundError(f'Parameter file {pfile} not found') if not Path(pfile).exists() or par_dict.keys() > file_pars.keys(): # write the new parameter file with the extra param write(str_params, par_dict) return from_dict(par_dict)
[docs] def write(str_params, par): """ Write a parameter file in Json format :param str_params: path to text json file :param par: dictionary containing parameters values :return: None """ pfile = Path(getfile(str_params)) if not pfile.parent.exists(): pfile.parent.mkdir() dpar = as_dict(par) for k in dpar: if isinstance(dpar[k], Path): dpar[k] = str(dpar[k]) with open(pfile, 'w') as fil: json.dump(as_dict(par), fil, sort_keys=False, indent=4)
[docs] class FileLock: def __init__(self, filename, log=None, timeout=10, timeout_action='delete'): self.filename = Path(filename) self._logger = log or logging.getLogger(__name__) self.timeout = timeout self.timeout_action = timeout_action if self.timeout_action not in ('delete', 'raise'): raise ValueError(f'Invalid timeout action: {self.timeout_action}') self._poll_freq = 0.2 @property def lockfile(self): return self.filename.with_suffix('.lock') async def _lock_check_async(self): while self.lockfile.exists(): assert self._poll_freq > 0 await asyncio.sleep(self._poll_freq) def __enter__(self): # if a lock file exists retries n times to see if it exists attempts = 0 n_attempts = 5 if self.timeout else inf timeout = (self.timeout / n_attempts) if self.timeout else self._poll_freq while self.lockfile.exists() and attempts < n_attempts: self._logger.info('file lock found, waiting %.2f seconds %s', timeout, self.lockfile) time.sleep(timeout) attempts += 1 # if the file still exists after 5 attempts, remove it as it's a job that went wrong if self.lockfile.exists(): with open(self.lockfile, 'r') as fp: _contents = json.load(fp) if self.lockfile.stat().st_size else '<empty>' self._logger.debug('file lock contents: %s', _contents) if self.timeout_action == 'delete': self._logger.info('stale file lock found, deleting %s', self.lockfile) self.lockfile.unlink() else: raise TimeoutError(f'{self.lockfile} file lock timed out') # add in the lock file, add some metadata to ease debugging if one gets stuck with open(self.lockfile, 'w') as fp: json.dump(dict(datetime=datetime.utcnow().isoformat(), hostname=str(socket.gethostname)), fp) async def __aenter__(self): # if a lock file exists wait until timeout before removing try: await asyncio.wait_for(self._lock_check_async(), timeout=self.timeout) # py3.11 use with asyncio.timeout except asyncio.TimeoutError as e: with open(self.lockfile, 'r') as fp: _contents = json.load(fp) if self.lockfile.stat().st_size else '<empty>' self._logger.debug('file lock contents: %s', _contents) if self.timeout_action == 'raise': raise e self._logger.info('stale file lock found, deleting %s', self.lockfile) self.lockfile.unlink() # add in the lock file, add some metadata to ease debugging if one gets stuck with open(self.lockfile, 'w') as fp: json.dump(dict(datetime=datetime.utcnow().isoformat(), hostname=str(socket.gethostname)), fp) def __exit__(self, exc_type, exc_value, exc_tb): self.lockfile.unlink() async def __aexit__(self, exc_type, exc_value, exc_tb): self.lockfile.unlink()