Source code for one.alf.cache

"""Construct Parquet database from local file system.

NB: If using a remote Alyx instance it is advisable to generate the cache via the Alyx one_cache
management command, otherwise the resulting cache UUIDs will not match those on the database.

Examples
--------
>>> from one.api import One
>>> cache_dir = 'path/to/data'
>>> make_parquet_db(cache_dir)
>>> one = One(cache_dir=cache_dir)
"""

# -------------------------------------------------------------------------------------------------
# Imports
# -------------------------------------------------------------------------------------------------

import datetime
import uuid
from functools import partial
from pathlib import Path
import warnings
import logging

import pandas as pd
import numpy as np
from packaging import version
from iblutil.io import parquet
from iblutil.io.hashfile import md5

from one.alf.spec import QC
from one.alf.io import iter_sessions
from one.alf.path import session_path_parts, get_alf_path

__all__ = ['make_parquet_db', 'patch_cache', 'remove_missing_datasets',
           'remove_cache_table_files', 'EMPTY_DATASETS_FRAME', 'EMPTY_SESSIONS_FRAME', 'QC_TYPE']
_logger = logging.getLogger(__name__)

# -------------------------------------------------------------------------------------------------
# Global variables
# -------------------------------------------------------------------------------------------------

QC_TYPE = pd.CategoricalDtype(categories=[e.name for e in sorted(QC)], ordered=True)
"""pandas.api.types.CategoricalDtype: The cache table QC column data type."""

SESSIONS_COLUMNS = {
    'id': object,                 # str
    'lab': object,                # str
    'subject': object,            # str
    'date': object,               # datetime.date
    'number': np.uint16,          # int
    'task_protocol': object,      # str
    'projects': object            # str
}
"""dict: A map of sessions table fields and their data types."""

DATASETS_COLUMNS = {
    'eid': object,                # str
    'id': object,                 # str
    'rel_path': object,           # relative to the session path, includes the filename
    'file_size': 'UInt64',        # file size in bytes (nullable)
    'hash': object,               # sha1/md5, computed in load function
    'exists': bool,               # bool
    'qc': QC_TYPE                 # one.alf.spec.QC enumeration
}
"""dict: A map of datasets table fields and their data types."""

EMPTY_DATASETS_FRAME = (pd.DataFrame(columns=DATASETS_COLUMNS)
                        .astype(DATASETS_COLUMNS)
                        .set_index(['eid', 'id']))
"""pandas.DataFrame: An empty datasets dataframe with correct columns and dtypes."""

EMPTY_SESSIONS_FRAME = (pd.DataFrame(columns=SESSIONS_COLUMNS)
                        .astype(SESSIONS_COLUMNS)
                        .set_index('id'))
"""pandas.DataFrame: An empty sessions dataframe with correct columns and dtypes."""


# -------------------------------------------------------------------------------------------------
# Parsing util functions
# -------------------------------------------------------------------------------------------------

def _ses_str_id(session_path):
    """Returns a str id from a session path in the form '(lab/)subject/date/number'."""
    return Path(*filter(None, session_path_parts(session_path, assert_valid=True))).as_posix()


def _get_session_info(rel_ses_path):
    """Parse a relative session path.

    Parameters
    ----------
    rel_ses_path : _type_
        _description_

    Returns
    -------
    str
        Experiment ID expressed as a relative session posix path.
    str
        The lab name (empty str).
    datetime.date
        The session date.
    int
        The session number.
    str
        The task protocol (empty str).
    str
        The associated project (empty str).
    """
    lab, subject, s_date, num = session_path_parts(rel_ses_path, as_dict=False, assert_valid=True)
    eid = _ses_str_id(rel_ses_path)
    s_date = pd.to_datetime(s_date).date()
    return eid, lab or '', subject, s_date, int(num), '', ''


def _get_dataset_info(dset_path, ses_eid=None, compute_hash=False):
    """Create dataset record from local path.

    Parameters
    ----------
    dset_path : one.alf.ALFPath
        A full ALF path.
    ses_eid : str, UUID, optional
        A session uuid.
    compute_hash : bool, optional
        Whether to compute a file hash.

    Returns
    -------
    str, uuid.UUID
        The session uuid.
    str
        The dataset ID expressed as a posix path relative to the session.
    str
        The dataset posix path, relative to the session.
    int
        The dataset file size.
    str
        The file hash, or empty str if `compute_hash` is false.
    True
        Whether the file exists.
    str
        The QC value for the dataset ('NOT_SET').
    """
    rel_dset_path = get_alf_path(dset_path.relative_to_session())
    ses_eid = ses_eid or _ses_str_id(dset_path.session_path())
    file_size = dset_path.stat().st_size
    file_hash = md5(dset_path) if compute_hash else ''
    return ses_eid or pd.NA, rel_dset_path, rel_dset_path, file_size, file_hash, True, 'NOT_SET'


def _rel_path_to_uuid(df, id_key='rel_path', base_id=None, keep_old=False):
    base_id = base_id or uuid.uuid1()  # Base hash based on system by default
    toUUID = partial(uuid.uuid3, base_id)  # MD5 hash from base uuid and rel session path string
    if keep_old:
        df[f'{id_key}_'] = df[id_key].copy()
    df.loc[:, id_key] = df.groupby(id_key)[id_key].transform(lambda x: toUUID(x.name))
    return df


def _ids_to_uuid(df_ses, df_dsets):
    ns = uuid.uuid1()
    df_dsets = _rel_path_to_uuid(df_dsets, id_key='id', base_id=ns)
    df_ses = _rel_path_to_uuid(df_ses, id_key='id', base_id=ns, keep_old=True)
    # Copy new eids into datasets frame
    df_dsets['eid_'] = df_dsets['eid'].copy()
    df_dsets['eid'] = (df_ses
                       .set_index('id_')
                       .loc[df_dsets['eid'], 'id']
                       .values)
    # Check that the session int IDs in both frames match
    ses_id_set = df_ses.set_index('id_')['id']
    assert (df_dsets
            .set_index('eid_')['eid']
            .drop_duplicates()
            .equals(ses_id_set)), 'session int ID mismatch between frames'

    # Set index
    df_ses = df_ses.set_index('id').drop('id_', axis=1).sort_index()
    df_dsets = df_dsets.set_index(['eid', 'id']).drop('eid_', axis=1).sort_index()

    return df_ses, df_dsets


# -------------------------------------------------------------------------------------------------
# Main functions
# -------------------------------------------------------------------------------------------------

def _metadata(origin):
    """
    Metadata dictionary for Parquet files.

    Parameters
    ----------
    origin : str, pathlib.Path
        Path to full directory, or computer name / db name.
    """
    return {
        'date_created': datetime.datetime.now().isoformat(sep=' ', timespec='minutes'),
        'origin': str(origin),
    }


def _make_sessions_df(root_dir) -> pd.DataFrame:
    """
    Given a root directory, recursively finds all sessions and returns a sessions DataFrame.

    Parameters
    ----------
    root_dir : str, pathlib.Path
        The folder to look for sessions.

    Returns
    -------
    pandas.DataFrame
        A pandas DataFrame of session info.
    """
    rows = []
    for full_path in iter_sessions(root_dir):
        # Get the lab/Subjects/subject/date/number part of a file path
        rel_path = get_alf_path(full_path)
        # A dict of session info extracted from path
        ses_info = _get_session_info(rel_path)
        assert len(ses_info) == len(SESSIONS_COLUMNS)
        rows.append(ses_info)
    df = pd.DataFrame(rows, columns=SESSIONS_COLUMNS).astype(SESSIONS_COLUMNS)
    return df


def _make_datasets_df(root_dir, hash_files=False) -> pd.DataFrame:
    """
    Given a root directory, recursively finds all datasets and returns a datasets DataFrame.

    Parameters
    ----------
    root_dir : str, pathlib.Path
        The folder to look for sessions.
    hash_files : bool
        If True, an MD5 is computed for each file and stored in the 'hash' column.

    Returns
    -------
    pandas.DataFrame
        A pandas DataFrame of dataset info.
    """
    # Go through sessions and append datasets
    rows = []
    for session_path in iter_sessions(root_dir):
        for dset_path in session_path.iter_datasets(recursive=True):
            file_info = _get_dataset_info(dset_path, compute_hash=hash_files)
            assert len(file_info) == len(DATASETS_COLUMNS)
            rows.append(file_info)
    return pd.DataFrame(rows, columns=DATASETS_COLUMNS).astype(DATASETS_COLUMNS)


[docs] def make_parquet_db(root_dir, out_dir=None, hash_ids=True, hash_files=False, lab=None): """ Given a data directory, index the ALF datasets and save the generated cache tables. Parameters ---------- root_dir : str, pathlib.Path The file directory to index. out_dir : str, pathlib.Path Optional output directory to save cache tables. If None, the files are saved into the root directory. hash_ids : bool If True, experiment and dataset IDs will be UUIDs generated from the system and relative paths (required for use with ONE API). hash_files : bool If True, an MD5 hash is computed for each dataset and stored in the datasets table. This will substantially increase cache generation time. lab : str An optional lab name to associate with the data. If the folder structure contains 'lab/Subjects', the lab name will be taken from the folder name. Returns ------- pathlib.Path The full path of the saved sessions parquet table. pathlib.Path The full path of the saved datasets parquet table. """ root_dir = Path(root_dir).resolve() # Make the data frames. df_ses = _make_sessions_df(root_dir) df_dsets = _make_datasets_df(root_dir, hash_files=hash_files) # Add integer id columns if hash_ids and len(df_ses) > 0: df_ses, df_dsets = _ids_to_uuid(df_ses, df_dsets) # For parquet all indices must be str df_ses.index = df_ses.index.map(str) df_dsets.index = df_dsets.index.map(lambda x: tuple(map(str, x))) if lab: # Fill in lab name field assert not df_ses['lab'].any() or (df_ses['lab'] == 'lab').all(), 'lab name conflict' df_ses['lab'] = lab # Check any files were found if df_ses.empty or df_dsets.empty: warnings.warn(f'No {"sessions" if df_ses.empty else "datasets"} found', RuntimeWarning) # Output directory. out_dir = Path(out_dir or root_dir) assert out_dir.is_dir() assert out_dir.exists() # Parquet files to save. fn_ses = out_dir / 'sessions.pqt' fn_dsets = out_dir / 'datasets.pqt' # Parquet metadata. metadata = _metadata(root_dir) # Save the Parquet files. parquet.save(fn_ses, df_ses, metadata) parquet.save(fn_dsets, df_dsets, metadata) return fn_ses, fn_dsets
[docs] def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True, dry=True): """ Remove dataset files and session folders that are not in the provided cache. NB: This *does not* remove entries from the cache tables that are missing on disk. Non-ALF files are not removed. Empty sessions that exist in the sessions table are not removed. Parameters ---------- cache_dir : str, pathlib.Path tables : dict[str, pandas.DataFrame], optional A dict with keys ('sessions', 'datasets'), containing the cache tables as DataFrames. remove_empty_sessions : bool Attempt to remove session folders that are empty and not in the sessions table. dry : bool If true, do not remove anything. Returns ------- list A sorted list of paths to be removed. """ cache_dir = Path(cache_dir) if tables is None: tables = {} for name in ('datasets', 'sessions'): table, m = parquet.load(cache_dir / f'{name}.pqt') tables[name] = patch_cache(table, m.get('min_api_version'), name) INDEX_KEY = '.?id' for name in tables: # Set the appropriate index if none already set if isinstance(tables[name].index, pd.RangeIndex): idx_columns = sorted(tables[name].filter(regex=INDEX_KEY).columns) tables[name].set_index(idx_columns, inplace=True) to_delete = set() from one.converters import session_record2path # imported here due to circular imports gen_path = partial(session_record2path, root_dir=cache_dir) # map of session path to eid sessions = {gen_path(rec): eid for eid, rec in tables['sessions'].iterrows()} for session_path in iter_sessions(cache_dir): try: datasets = tables['datasets'].loc[sessions[session_path]] except KeyError: datasets = tables['datasets'].iloc[0:0, :] for dataset in session_path.iter_datasets(): if dataset.relative_to_session().as_posix() not in datasets['rel_path']: to_delete.add(dataset) if session_path not in sessions and remove_empty_sessions: to_delete.add(session_path) if dry: print('The following session and datasets would be removed:', end='\n\t') print('\n\t'.join(sorted(map(str, to_delete)))) return sorted(to_delete) # Delete datasets for path in to_delete: if path.is_file(): _logger.debug(f'Removing {path}') path.unlink() else: # Recursively remove empty folders while path.parent != cache_dir and not next(path.rglob('*'), False): _logger.debug(f'Removing {path}') path.rmdir() path = path.parent return sorted(to_delete)
[docs] def remove_cache_table_files(folder, tables=('sessions', 'datasets')): """Delete cache tables on disk. Parameters ---------- folder : pathlib.Path The directory path containing cache tables to remove. tables : list of str A list of table names to remove, e.g. ['sessions', 'datasets']. NB: This will also delete the cache_info.json metadata file. Returns ------- list of pathlib.Path A list of the removed files. """ filenames = ('cache_info.json', *(f'{t}.pqt' for t in tables)) removed = [] for file in map(folder.joinpath, filenames): if file.exists(): file.unlink() removed.append(file) else: _logger.warning('%s not found', file) return removed
def _cache_int2str(table: pd.DataFrame) -> pd.DataFrame: """Convert int ids to str ids for cache table. Parameters ---------- table : pd.DataFrame A cache table (from One._cache). """ # Convert integer uuids to str uuids if table.index.nlevels < 2 or not any(x.endswith('_0') for x in table.index.names): return table table = table.reset_index() int_cols = table.filter(regex=r'_\d{1}$').columns.sort_values() assert not len(int_cols) % 2, 'expected even number of columns ending in _0 or _1' names = sorted(set(c.rsplit('_', 1)[0] for c in int_cols.values)) for i, name in zip(range(0, len(int_cols), 2), names): table[name] = parquet.np2str(table[int_cols[i:i + 2]]) table = table.drop(int_cols, axis=1).set_index(names) return table
[docs] def patch_cache(table: pd.DataFrame, min_api_version=None, name=None) -> pd.DataFrame: """Reformat older cache tables to comply with this version of ONE. Currently this function will 1. convert integer UUIDs to string UUIDs; 2. rename the 'project' column to 'projects'; 3. add QC column; 4. drop session_path column. Parameters ---------- table : pd.DataFrame A cache table (from One._cache). min_api_version : str The minimum API version supported by this cache table. name : {'dataset', 'session'} str The name of the table. """ min_version = version.parse(min_api_version or '0.0.0') table = _cache_int2str(table) # Rename project column if min_version < version.Version('1.13.0') and 'project' in table.columns: table.rename(columns={'project': 'projects'}, inplace=True) if name == 'datasets' and min_version < version.Version('2.7.0') and 'qc' not in table.columns: qc = pd.Categorical.from_codes(np.zeros(len(table.index), dtype=int), dtype=QC_TYPE) table = table.assign(qc=qc) if name == 'datasets' and 'session_path' in table.columns: table = table.drop('session_path', axis=1) return table