"""Construct Parquet database from local file system.
NB: If using a remote Alyx instance it is advisable to generate the cache via the Alyx one_cache
management command, otherwise the resulting cache UUIDs will not match those on the database.
Examples
--------
>>> from one.api import One
>>> cache_dir = 'path/to/data'
>>> make_parquet_db(cache_dir)
>>> one = One(cache_dir=cache_dir)
"""
# -------------------------------------------------------------------------------------------------
# Imports
# -------------------------------------------------------------------------------------------------
import datetime
import uuid
from functools import partial
from pathlib import Path
import warnings
import logging
import pandas as pd
from iblutil.io import parquet
from iblutil.io.hashfile import md5
from one.alf.io import iter_sessions, iter_datasets
from one.alf.path import session_path_parts, get_alf_path
from one.converters import session_record2path
from one.util import QC_TYPE, patch_cache
__all__ = ['make_parquet_db', 'remove_missing_datasets', 'DATASETS_COLUMNS', 'SESSIONS_COLUMNS']
_logger = logging.getLogger(__name__)
# -------------------------------------------------------------------------------------------------
# Global variables
# -------------------------------------------------------------------------------------------------
SESSIONS_COLUMNS = (
'id', # int64
'lab', # str
'subject', # str
'date', # datetime.date
'number', # int
'task_protocol', # str
'projects', # str
)
DATASETS_COLUMNS = (
'id', # int64
'eid', # int64
'rel_path', # relative to the session path, includes the filename
'file_size', # file size in bytes
'hash', # sha1/md5, computed in load function
'exists', # bool
'qc', # one.util.QC_TYPE
)
# -------------------------------------------------------------------------------------------------
# Parsing util functions
# -------------------------------------------------------------------------------------------------
def _ses_str_id(session_path):
"""Returns a str id from a session path in the form '(lab/)subject/date/number'."""
return Path(*filter(None, session_path_parts(session_path, assert_valid=True))).as_posix()
def _get_session_info(rel_ses_path):
"""Parse a relative session path."""
out = session_path_parts(rel_ses_path, as_dict=True, assert_valid=True)
out['id'] = _ses_str_id(rel_ses_path)
out['date'] = pd.to_datetime(out['date']).date()
out['number'] = int(out['number'])
out['task_protocol'] = ''
out['projects'] = ''
return out
def _get_dataset_info(full_ses_path, rel_dset_path, ses_eid=None, compute_hash=False):
rel_ses_path = get_alf_path(full_ses_path)
full_dset_path = Path(full_ses_path, rel_dset_path).as_posix()
file_size = Path(full_dset_path).stat().st_size
ses_eid = ses_eid or _ses_str_id(rel_ses_path)
return {
'id': Path(rel_ses_path, rel_dset_path).as_posix(),
'eid': str(ses_eid),
'rel_path': Path(rel_dset_path).as_posix(),
'file_size': file_size,
'hash': md5(full_dset_path) if compute_hash else None,
'exists': True,
'qc': 'NOT_SET'
}
def _rel_path_to_uuid(df, id_key='rel_path', base_id=None, keep_old=False):
base_id = base_id or uuid.uuid1() # Base hash based on system by default
toUUID = partial(uuid.uuid3, base_id) # MD5 hash from base uuid and rel session path string
if keep_old:
df[f'{id_key}_'] = df[id_key].copy()
df[id_key] = df[id_key].apply(lambda x: str(toUUID(x)))
assert len(df[id_key].unique()) == len(df[id_key]) # WARNING This fails :(
return df
def _ids_to_uuid(df_ses, df_dsets):
ns = uuid.uuid1()
df_dsets = _rel_path_to_uuid(df_dsets, id_key='id', base_id=ns)
df_ses = _rel_path_to_uuid(df_ses, id_key='id', base_id=ns, keep_old=True)
# Copy new eids into datasets frame
df_dsets['eid_'] = df_dsets['eid'].copy()
df_dsets['eid'] = (df_ses
.set_index('id_')
.loc[df_dsets['eid'], 'id']
.values)
# Check that the session int IDs in both frames match
ses_id_set = df_ses.set_index('id_')['id']
assert (df_dsets
.set_index('eid_')['eid']
.drop_duplicates()
.equals(ses_id_set)), 'session int ID mismatch between frames'
# Set index
df_ses = df_ses.set_index('id').drop('id_', axis=1).sort_index()
df_dsets = df_dsets.set_index(['eid', 'id']).drop('eid_', axis=1).sort_index()
return df_ses, df_dsets
# -------------------------------------------------------------------------------------------------
# Main functions
# -------------------------------------------------------------------------------------------------
def _metadata(origin):
"""
Metadata dictionary for Parquet files.
Parameters
----------
origin : str, pathlib.Path
Path to full directory, or computer name / db name.
"""
return {
'date_created': datetime.datetime.now().isoformat(sep=' ', timespec='minutes'),
'origin': str(origin),
}
def _make_sessions_df(root_dir) -> pd.DataFrame:
"""
Given a root directory, recursively finds all sessions and returns a sessions DataFrame.
Parameters
----------
root_dir : str, pathlib.Path
The folder to look for sessions.
Returns
-------
pandas.DataFrame
A pandas DataFrame of session info.
"""
rows = []
for full_path in iter_sessions(root_dir):
# Get the lab/Subjects/subject/date/number part of a file path
rel_path = get_alf_path(full_path)
# A dict of session info extracted from path
ses_info = _get_session_info(rel_path)
assert set(ses_info.keys()) <= set(SESSIONS_COLUMNS)
rows.append(ses_info)
df = pd.DataFrame(rows, columns=SESSIONS_COLUMNS)
return df
def _make_datasets_df(root_dir, hash_files=False) -> pd.DataFrame:
"""
Given a root directory, recursively finds all datasets and returns a datasets DataFrame.
Parameters
----------
root_dir : str, pathlib.Path
The folder to look for sessions.
hash_files : bool
If True, an MD5 is computed for each file and stored in the 'hash' column.
Returns
-------
pandas.DataFrame
A pandas DataFrame of dataset info.
"""
df = pd.DataFrame([], columns=DATASETS_COLUMNS).astype({'qc': QC_TYPE})
# Go through sessions and append datasets
for session_path in iter_sessions(root_dir):
rows = []
for rel_dset_path in iter_datasets(session_path):
file_info = _get_dataset_info(session_path, rel_dset_path, compute_hash=hash_files)
assert set(file_info.keys()) <= set(DATASETS_COLUMNS)
rows.append(file_info)
df = pd.concat((df, pd.DataFrame(rows, columns=DATASETS_COLUMNS)),
ignore_index=True, verify_integrity=True)
return df.astype({'qc': QC_TYPE})
[docs]
def make_parquet_db(root_dir, out_dir=None, hash_ids=True, hash_files=False, lab=None):
"""
Given a data directory, index the ALF datasets and save the generated cache tables.
Parameters
----------
root_dir : str, pathlib.Path
The file directory to index.
out_dir : str, pathlib.Path
Optional output directory to save cache tables. If None, the files are saved into the
root directory.
hash_ids : bool
If True, experiment and dataset IDs will be UUIDs generated from the system and relative
paths (required for use with ONE API).
hash_files : bool
If True, an MD5 hash is computed for each dataset and stored in the datasets table.
This will substantially increase cache generation time.
lab : str
An optional lab name to associate with the data. If the folder structure
contains 'lab/Subjects', the lab name will be taken from the folder name.
Returns
-------
pathlib.Path
The full path of the saved sessions parquet table.
pathlib.Path
The full path of the saved datasets parquet table.
"""
root_dir = Path(root_dir).resolve()
# Make the dataframes.
df_ses = _make_sessions_df(root_dir)
df_dsets = _make_datasets_df(root_dir, hash_files=hash_files)
# Add integer id columns
if hash_ids and len(df_ses) > 0:
df_ses, df_dsets = _ids_to_uuid(df_ses, df_dsets)
if lab: # Fill in lab name field
assert not df_ses['lab'].any() or (df_ses['lab'] == 'lab').all(), 'lab name conflict'
df_ses['lab'] = lab
# Check any files were found
if df_ses.empty or df_dsets.empty:
warnings.warn(f'No {"sessions" if df_ses.empty else "datasets"} found', RuntimeWarning)
# Output directory.
out_dir = Path(out_dir or root_dir)
assert out_dir.is_dir()
assert out_dir.exists()
# Parquet files to save.
fn_ses = out_dir / 'sessions.pqt'
fn_dsets = out_dir / 'datasets.pqt'
# Parquet metadata.
metadata = _metadata(root_dir)
# Save the Parquet files.
parquet.save(fn_ses, df_ses, metadata)
parquet.save(fn_dsets, df_dsets, metadata)
return fn_ses, fn_dsets
[docs]
def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True, dry=True):
"""
Remove dataset files and session folders that are not in the provided cache.
NB: This *does not* remove entries from the cache tables that are missing on disk.
Non-ALF files are not removed. Empty sessions that exist in the sessions table are not removed.
Parameters
----------
cache_dir : str, pathlib.Path
tables : dict[str, pandas.DataFrame], optional
A dict with keys ('sessions', 'datasets'), containing the cache tables as DataFrames.
remove_empty_sessions : bool
Attempt to remove session folders that are empty and not in the sessions table.
dry : bool
If true, do not remove anything.
Returns
-------
list
A sorted list of paths to be removed.
"""
cache_dir = Path(cache_dir)
if tables is None:
tables = {}
for name in ('datasets', 'sessions'):
table, m = parquet.load(cache_dir / f'{name}.pqt')
tables[name] = patch_cache(table, m.get('min_api_version'), name)
INDEX_KEY = '.?id'
for name in tables:
# Set the appropriate index if none already set
if isinstance(tables[name].index, pd.RangeIndex):
idx_columns = sorted(tables[name].filter(regex=INDEX_KEY).columns)
tables[name].set_index(idx_columns, inplace=True)
to_delete = set()
gen_path = partial(session_record2path, root_dir=cache_dir)
# map of session path to eid
sessions = {gen_path(rec): eid for eid, rec in tables['sessions'].iterrows()}
for session_path in iter_sessions(cache_dir):
try:
datasets = tables['datasets'].loc[sessions[session_path]]
except KeyError:
datasets = tables['datasets'].iloc[0:0, :]
for dataset in iter_datasets(session_path):
if dataset.as_posix() not in datasets['rel_path']:
to_delete.add(session_path.joinpath(dataset))
if session_path not in sessions and remove_empty_sessions:
to_delete.add(session_path)
if dry:
print('The following session and datasets would be removed:', end='\n\t')
print('\n\t'.join(sorted(map(str, to_delete))))
return sorted(to_delete)
# Delete datasets
for path in to_delete:
if path.is_file():
_logger.debug(f'Removing {path}')
path.unlink()
else:
# Recursively remove empty folders
while path.parent != cache_dir and not next(path.rglob('*'), False):
_logger.debug(f'Removing {path}')
path.rmdir()
path = path.parent
return sorted(to_delete)