Source code for iblutil.io.params

from pathlib import Path, PurePath
from datetime import datetime
import collections
import sys
import os
import json
import subprocess
import logging
import time
import socket
import asyncio
from math import inf


[docs] def as_dict(par): if not par or isinstance(par, dict): return par else: return dict(par._asdict())
[docs] def from_dict(par_dict): if not par_dict: return None par = collections.namedtuple('Params', par_dict.keys()) class IBLParams(par): __slots__ = () def set(self, field, value): d = as_dict(self) d[field] = value return from_dict(d) def as_dict(self): return as_dict(self) return IBLParams(**par_dict)
[docs] def getfile(str_params): """ Returns full path of the param file per system convention: linux/mac: ~/.str_params, Windows: APPDATA folder :param str_params: string that identifies parm file :return: string of full path """ # strips already existing dot if any parts = ['.' + p if not p.startswith('.') else p for p in Path(str_params).parts] if sys.platform == 'win32' or sys.platform == 'cygwin': pfile = str(PurePath(os.environ['APPDATA'], *parts)) else: pfile = str(Path.home().joinpath(*parts)) return pfile
[docs] def set_hidden(path, hide: bool) -> Path: """ Set a given file or folder path to be hidden. On macOS and Windows a specific flag is set, while on other systems the file or folder is simply renamed to start with a dot. On macOS the folder may only be hidden in Explorer. Parameters ---------- path : str, pathlib.Path The path of the file or folder to (un)hide. hide : bool If True the path is set to hidden, otherwise it is unhidden. Returns ------- pathlib.Path The path of the file or folder, which may have been renamed. """ path = Path(path) assert path.exists() if sys.platform == 'win32' or sys.platform == 'cygwin': flag = ('+' if hide else '-') + 'H' subprocess.run(['attrib', flag, str(path)]).check_returncode() elif sys.platform == 'darwin': flag = ('' if hide else 'no') + 'hidden' subprocess.run(['chflags', flag, str(path)]).check_returncode() elif hide and not path.name.startswith('.'): path = path.rename(path.parent.joinpath('.' + path.name)) elif not hide and path.name.startswith('.'): path = path.rename(path.parent.joinpath(path.name[1:])) return path
[docs] def read(str_params, default=None): """ Reads in and parse Json parameter file into dictionary. If the parameter file doesn't exist and no defaults are provided, a FileNotFound error is raised, otherwise any extra default parameters will be written into the file. Examples: # Load parameters, raise error if file not found par = read('globus/admin') # Load with defaults par = read('globus/admin', {'local_endpoint': None, 'remote_endpoint': None}) # Return empty dict if file not found (i.e. touch new param file) par = read('new_pars', {}) :param str_params: path to text json file :param default: default values for missing parameters :return: named tuple containing parameters """ pfile = getfile(str_params) par_dict = as_dict(default) or {} if Path(pfile).exists(): with open(pfile) as fil: file_pars = json.loads(fil.read()) par_dict.update(file_pars) elif default is None: # No defaults provided raise FileNotFoundError(f'Parameter file {pfile} not found') if not Path(pfile).exists() or par_dict.keys() > file_pars.keys(): # write the new parameter file with the extra param write(str_params, par_dict) return from_dict(par_dict)
[docs] def write(str_params, par): """ Write a parameter file in Json format :param str_params: path to text json file :param par: dictionary containing parameters values :return: None """ pfile = Path(getfile(str_params)) if not pfile.parent.exists(): pfile.parent.mkdir() dpar = as_dict(par) for k in dpar: if isinstance(dpar[k], Path): dpar[k] = str(dpar[k]) with open(pfile, 'w') as fil: json.dump(as_dict(par), fil, sort_keys=False, indent=4)
[docs] class FileLock: def __init__(self, filename, log=None, timeout=10, timeout_action='delete'): """ A context manager to ensure a file is not written to. This context manager checks whether a lock file already exists, indicating that the filename is currently being written to by another process, and waits until it is free before entering. If the lock file is not removed within the timeout period, it is either forcebly removed (assumes other process hanging or killed), or raises an exception. Before entering, a new lock file is created, containing the hostname, datetime and pid, then subsequenctly removed upon exit. Parameters ---------- filename : pathlib.Path, str A filepath to 'lock'. log : logging.Logger A logger instance to use. timeout : float How long to wait before either raising an exception or deleting the previous lock file. timeout_action : {'delete', 'raise'} str Action to take if previous lock file remains throughout timeout period. Either delete the old lock file or raise an exception. Examples -------- Ensure a file is not being written to by another process before writing >>> with FileLock(filename, timeout_action='delete'): >>> with open(filename, 'w') as fp: >>> fp.write(r'{"foo": "bar"}') Asychronous implementation example with raise behaviour >>> try: >>> async with FileLock(filename, timeout_action='raise'): >>> with open(filename, 'w') as fp: >>> fp.write(r'{"foo": "bar"}') >>> except asyncio.TimeoutError: >>> print(f'failed to write to {filename}') """ self.filename = Path(filename) self._logger = log or __name__ if not isinstance(log, logging.Logger): self._logger = logging.getLogger(self._logger) self.timeout = timeout self.timeout_action = timeout_action if self.timeout_action not in ('delete', 'raise'): raise ValueError(f'Invalid timeout action: {self.timeout_action}') self._async_poll_freq = 0.2 # how long to sleep between lock file checks in async mode @property def lockfile(self): """pathlib.Path: the lock filepath.""" return self.filename.with_suffix('.lock') async def _lock_check_async(self): while self.lockfile.exists(): assert self._async_poll_freq > 0 await asyncio.sleep(self._async_poll_freq) def __enter__(self): # if a lock file exists retries n times to see if it exists attempts = 0 n_attempts = 5 if self.timeout else inf timeout = (self.timeout / n_attempts) if self.timeout else self._poll_freq while self.lockfile.exists() and attempts < n_attempts: self._logger.info('file lock found, waiting %.2f seconds %s', timeout, self.lockfile) time.sleep(timeout) attempts += 1 # if the file still exists after 5 attempts, remove it as it's a job that went wrong if self.lockfile.exists(): with open(self.lockfile, 'r') as fp: _contents = json.load(fp) if self.lockfile.stat().st_size else '<empty>' self._logger.debug('file lock contents: %s', _contents) if self.timeout_action == 'delete': self._logger.info('stale file lock found, deleting %s', self.lockfile) self.lockfile.unlink() else: raise TimeoutError(f'{self.lockfile} file lock timed out') # add in the lock file, add some metadata to ease debugging if one gets stuck with open(self.lockfile, 'w') as fp: json.dump(dict(datetime=datetime.utcnow().isoformat(), hostname=str(socket.gethostname)), fp) async def __aenter__(self): # if a lock file exists wait until timeout before removing try: await asyncio.wait_for(self._lock_check_async(), timeout=self.timeout) # py3.11 use with asyncio.timeout except asyncio.TimeoutError as e: with open(self.lockfile, 'r') as fp: _contents = json.load(fp) if self.lockfile.stat().st_size else '<empty>' self._logger.debug('file lock contents: %s', _contents) if self.timeout_action == 'raise': raise e self._logger.info('stale file lock found, deleting %s', self.lockfile) self.lockfile.unlink() # add in the lock file, add some metadata to ease debugging if one gets stuck with open(self.lockfile, 'w') as fp: info = dict(datetime=datetime.utcnow().isoformat(), hostname=str(socket.gethostname), pid=os.getpid()) json.dump(info, fp) def __exit__(self, exc_type, exc_value, exc_tb): self.lockfile.unlink() async def __aexit__(self, exc_type, exc_value, exc_tb): self.lockfile.unlink()