Source code for iblrig.path_helper

import logging
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import TypeVar

import numpy as np
import yaml
from packaging import version
from pydantic import BaseModel, ValidationError

import iblrig
from ibllib.io import session_params
from ibllib.io.raw_data_loaders import load_settings
from iblrig.constants import HARDWARE_SETTINGS_YAML, RIG_SETTINGS_YAML
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
from iblutil.util import Bunch
from one.alf.spec import is_session_path

log = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)


[docs] def iterate_previous_sessions(subject_name: str, task_name: str, n: int = 1, **kwargs) -> list[dict]: """ Iterate over the sessions of a given subject in both the remote and local path and search for a given protocol name. Return the information of the last n found matching protocols in the form of a dictionary. Parameters ---------- subject_name : str Name of the subject. task_name : str Name of the protocol to look for in experiment description. n : int, optional maximum number of protocols to return **kwargs Optional arguments to be passed to iblrig.path_helper.get_local_and_remote_paths If not used, will use the arguments from iblrig/settings/iblrig_settings.yaml Returns ------- list[dict] List of dictionaries with keys: session_path, experiment_description, task_settings, file_task_data """ rig_paths = get_local_and_remote_paths(**kwargs) local_subjects_folder = rig_paths['local_subjects_folder'] remote_subjects_folder = rig_paths['remote_subjects_folder'] sessions = _iterate_protocols(local_subjects_folder.joinpath(subject_name), task_name=task_name, n=n) if remote_subjects_folder is not None: remote_sessions = _iterate_protocols(remote_subjects_folder.joinpath(subject_name), task_name=task_name, n=n) if remote_sessions is not None: sessions.extend(remote_sessions) # here we rely on the fact that np.unique sort and then we output sessions with the last one first _, ises = np.unique([s['session_stub'] for s in sessions], return_index=True) sessions = [sessions[i] for i in np.flipud(ises)] return sessions
def _iterate_protocols(subject_folder: Path, task_name: str, n: int = 1, min_trials: int = 43) -> list[dict]: """ Return information on the last n sessions with matching protocol. This function iterates over the sessions of a given subject and searches for a given protocol name. Parameters ---------- subject_folder : Path A subject folder containing dated folders. task_name : str The task protocol name to look for. n : int The number of previous protocols to return. min_trials : int Skips sessions with fewer than this number of trials. Returns ------- list[dict] list of dictionaries with keys: session_stub, session_path, experiment_description, task_settings, file_task_data. """ def proc_num(x): """Return protocol number. Use 'protocol_number' key if present (unlikely), otherwise use collection name. """ i = (x or {}).get('collection', '00').split('_') collection_int = int(i[-1]) if i[-1].isnumeric() else 0 return x.get('protocol_number', collection_int) protocols = [] if subject_folder is None or Path(subject_folder).exists() is False: return protocols sessions = subject_folder.glob('????-??-??/*/_ibl_experiment.description*.yaml') # seq may be X or XXX # Make extra sure to only include valid sessions sessions = filter(lambda x: is_session_path(x.relative_to(subject_folder.parent).parent), sessions) for file_experiment in sorted(sessions, reverse=True): session_path = file_experiment.parent ad = session_params.read_params(file_experiment) # reversed: we look for the last task first if the protocol ran twice tasks = filter(None, map(lambda x: x.get(task_name), ad.get('tasks', []))) for adt in sorted(tasks, key=proc_num, reverse=True): if not (task_settings := load_settings(session_path, task_collection=adt['collection'])): continue if task_settings.get('NTRIALS', min_trials + 1) < min_trials: # ignore sessions with too few trials continue protocols.append( Bunch( { 'session_stub': '_'.join(file_experiment.parent.parts[-2:]), # 2019-01-01_001 'session_path': file_experiment.parent, 'task_collection': adt['collection'], 'experiment_description': ad, 'task_settings': task_settings, 'file_task_data': session_path.joinpath(adt['collection'], '_iblrig_taskData.raw.jsonable'), } ) ) if len(protocols) >= n: return protocols return protocols
[docs] def get_local_and_remote_paths( local_path: str | Path | None = None, remote_path: str | Path | None = None, lab: str | None = None, iblrig_settings=None ) -> dict: """ Function used to parse input arguments to transfer commands. If the arguments are None, reads in the settings and returns the values from the files. local_subjects_path always has a fallback on the home directory / iblrig_data remote_subjects_path has no fallback and will return None when all options are exhausted :param local_path: :param remote_path: :param lab: :param iblrig_settings: if provided, settings dictionary, otherwise will load the default settings files :return: dictionary, with following keys (example output) {'local_data_folder': PosixPath('C:/iblrigv8_data'), 'remote_data_folder': PosixPath('Y:/'), 'local_subjects_folder': PosixPath('C:/iblrigv8_data/mainenlab/Subjects'), 'remote_subjects_folder': PosixPath('Y:/Subjects')} """ # we only want to attempt to load the settings file if necessary if (local_path is None) or (remote_path is None) or (lab is None): iblrig_settings = load_pydantic_yaml(RigSettings) if iblrig_settings is None else iblrig_settings paths = Bunch({'local_data_folder': local_path, 'remote_data_folder': remote_path}) if paths.local_data_folder is None: paths.local_data_folder = ( Path(p) if (p := iblrig_settings['iblrig_local_data_path']) else Path.home().joinpath('iblrig_data') ) elif isinstance(paths.local_data_folder, str): paths.local_data_folder = Path(paths.local_data_folder) if paths.remote_data_folder is None: paths.remote_data_folder = Path(p) if (p := iblrig_settings['iblrig_remote_data_path']) else None elif isinstance(paths.remote_data_folder, str): paths.remote_data_folder = Path(paths.remote_data_folder) # Get the subjects folders. If not defined in the settings, assume local_data_folder + /Subjects paths.local_subjects_folder = (iblrig_settings or {}).get('iblrig_local_subjects_path', None) lab = lab or (iblrig_settings or {}).get('ALYX_LAB', None) if paths.local_subjects_folder is None: if paths.local_data_folder.name == 'Subjects': paths.local_subjects_folder = paths.local_data_folder elif lab: # append lab/Subjects part paths.local_subjects_folder = paths.local_data_folder.joinpath(lab, 'Subjects') else: # NB: case is important here. ALF spec expects lab folder before 'Subjects' (capitalized) paths.local_subjects_folder = paths.local_data_folder.joinpath('subjects') else: paths.local_subjects_folder = Path(paths.local_subjects_folder) # Get the remote subjects folders. If not defined in the settings, assume remote_data_folder + /Subjects paths.remote_subjects_folder = (iblrig_settings or {}).get('iblrig_remote_subjects_path', None) if paths.remote_subjects_folder is None: if paths.remote_data_folder: if paths.remote_data_folder.name == 'Subjects': paths.remote_subjects_folder = paths.remote_data_folder else: paths.remote_subjects_folder = paths.remote_data_folder.joinpath('Subjects') else: paths.remote_subjects_folder = Path(paths.remote_subjects_folder) return paths
def _load_settings_yaml(filename: Path | str = RIG_SETTINGS_YAML, do_raise: bool = True) -> Bunch: filename = Path(filename) if not filename.is_absolute(): filename = Path(iblrig.__file__).parents[1].joinpath('settings', filename) if not filename.exists() and not do_raise: log.error(f'File not found: {filename}') return Bunch() with open(filename) as fp: rs = yaml.safe_load(fp) rs = patch_settings(rs, filename.stem) return Bunch(rs)
[docs] def load_pydantic_yaml(model: type[T], filename: Path | str | None = None, do_raise: bool = True) -> T: """ Load YAML data from a specified file or a standard IBLRIG settings file, validate it using a Pydantic model, and return the validated Pydantic model instance. Parameters ---------- model : Type[T] The Pydantic model class to validate the YAML data against. filename : Path | str | None, optional The path to the YAML file. If None (default), the function deduces the appropriate standard IBLRIG settings file based on the model. do_raise : bool, optional If True (default), raise a ValidationError if validation fails. If False, log the validation error and construct a model instance with the provided data. Defaults to True. Returns ------- T An instance of the Pydantic model, validated against the YAML data. Raises ------ ValidationError If validation fails and do_raise is set to True. The raised exception contains details about the validation error. TypeError If the filename is None and the model class is not recognized as HardwareSettings or RigSettings. """ if filename is None: if model == HardwareSettings: filename = HARDWARE_SETTINGS_YAML elif model == RigSettings: filename = RIG_SETTINGS_YAML else: raise TypeError(f'Cannot deduce filename for model `{model.__name__}`.') if filename not in (HARDWARE_SETTINGS_YAML, RIG_SETTINGS_YAML): # TODO: We currently skip validation of pydantic models if an extra # filename is provided that does NOT correspond to the standard # settings files of IBLRIG. This should be re-evaluated. do_raise = False rs = _load_settings_yaml(filename=filename, do_raise=do_raise) try: return model.model_validate(rs) except ValidationError as e: if not do_raise: log.exception(e) return model.model_construct(**rs) else: raise e
[docs] def save_pydantic_yaml(data: T, filename: Path | str | None = None) -> None: if filename is None: if isinstance(data, HardwareSettings): filename = HARDWARE_SETTINGS_YAML elif isinstance(data, RigSettings): filename = RIG_SETTINGS_YAML else: raise TypeError(f'Cannot deduce filename for model `{type(data).__name__}`.') else: filename = Path(filename) yaml_data = data.model_dump() data.model_validate(yaml_data) with open(filename, 'w') as f: log.debug(f'Dumping {type(data).__name__} to {filename.name}') yaml.dump(yaml_data, f, sort_keys=False)
[docs] def patch_settings(rs: dict, filename: str | Path) -> dict: """ Update loaded settings files to ensure compatibility with latest version. Parameters ---------- rs : dict A loaded settings file. filename : str | Path The filename of the settings file. Returns ------- dict The updated settings. """ filename = Path(filename) settings_version = version.parse(rs.get('VERSION', '0.0.0')) if filename.stem.startswith('hardware'): if settings_version < version.Version('1.0.0') and 'device_camera' in rs: log.info('Patching hardware settings; assuming left camera label') rs['device_cameras'] = {'left': rs.pop('device_camera')} rs['VERSION'] = '1.0.0' if 'device_cameras' in rs and rs['device_cameras'] is not None: rs['device_cameras'] = {k: v for k, v in rs['device_cameras'].items() if v} # remove empty keys idx_missing = set(rs['device_cameras']) == {'left'} and 'INDEX' not in rs['device_cameras']['left'] if settings_version < version.Version('1.1.0') and idx_missing: log.info('Patching hardware settings; assuming left camera index and training workflow') workflow = rs['device_cameras']['left'].pop('BONSAI_WORKFLOW', None) bonsai_workflows = {'setup': 'devices/camera_setup/setup_video.bonsai', 'recording': workflow} rs['device_cameras'] = { 'training': {'BONSAI_WORKFLOW': bonsai_workflows, 'left': {'INDEX': 1, 'SYNC_LABEL': 'audio'}} } rs['VERSION'] = '1.1.0' if rs.get('device_cameras') is None: rs['device_cameras'] = {} return rs
[docs] def get_commit_hash(folder: str): here = os.getcwd() os.chdir(folder) out = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() os.chdir(here) if not out: log.debug('Commit hash is empty string') log.debug(f'Found commit hash {out}') return out
[docs] def iterate_collection(session_path: str, collection_name='raw_task_data') -> str: """ Given a session path returns the next numbered collection name. Parameters ---------- session_path : str The session path containing zero or more numbered collections. collection_name : str The collection name without the _NN suffix. Returns ------- str The next numbered collection name. Examples -------- In a folder where there are no raw task data folders >>> iterate_collection('./subject/2020-01-01/001') 'raw_task_data_00' In a folder where there is one raw_imaging_data_00 folder >>> iterate_collection('./subject/2020-01-01/001', collection_name='raw_imaging_data') 'raw_imaging_data_01' """ if not Path(session_path).exists(): return f'{collection_name}_00' collections = filter(Path.is_dir, Path(session_path).iterdir()) collection_names = map(lambda x: x.name, collections) tasks = sorted(filter(re.compile(f'{collection_name}' + '_[0-9]{2}').match, collection_names)) if len(tasks) == 0: return f'{collection_name}_00' return f'{collection_name}_{int(tasks[-1][-2:]) + 1:02}'
[docs] def create_bonsai_layout_from_template(workflow_file: Path) -> None: """ Create a Bonsai layout file from a template if it does not already exist. If the file with the suffix `.bonsai.layout` does not exist for the given workflow file, this function will attempt to create it from a template file with the suffix `.bonsai.layout_template`. If the template file also does not exist, the function logs that no template layout is available. Background: Bonsai stores dialog settings (window position, control visibility, etc.) in an XML file with the suffix `.bonsai.layout`. These layout files are user-specific and may be overwritten locally by the user according to their preferences. To ensure that a default layout is available, a template file with the suffix `.bonsai.layout_template` can be provided as a starting point. Parameters ---------- workflow_file : Path The path to the Bonsai workflow for which the layout is to be created. Raises ------ FileNotFoundError If the provided workflow_file does not exist. """ if not workflow_file.exists(): raise FileNotFoundError(workflow_file) if not (layout_file := workflow_file.with_suffix('.bonsai.layout')).exists(): template_file = workflow_file.with_suffix('.bonsai.layout_template') if template_file.exists(): log.info(f'Creating default {layout_file.name}') shutil.copy(template_file, layout_file) else: log.debug(f'No template layout for {workflow_file.name}')