"""I/O functions for ALyx Files
Provides support for time-series reading and interpolation as per the specifications
For a full overview of the scope of the format, see:
https://ibllib.readthedocs.io/en/develop/04_reference.html#alf # FIXME Old link
"""
import json
import copy
import logging
import os
import re
from fnmatch import fnmatch
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
from iblutil.util import Bunch
from iblutil.io import parquet
from iblutil.io import jsonable
from .exceptions import ALFObjectNotFound
from . import files, spec
from .spec import FILE_SPEC
_logger = logging.getLogger(__name__)
[docs]class AlfBunch(Bunch):
@property
def check_dimensions(self):
return check_dimensions(self)
[docs] def append(self, b, inplace=False):
"""
Appends one bunch to another, key by key
Parameters
----------
b : Bunch, dict
A Bunch of data to append
inplace : bool
If true, the data are appended in place, otherwise a copy is returned
Returns
-------
An ALFBunch
"""
# default is to return a copy
if inplace:
a = self
else:
a = AlfBunch(copy.deepcopy(self))
# handles empty bunches for convenience if looping
if b == {}:
return a
if a == {}:
return b
# right now supports only strictly matching keys. Will implement other cases as needed
if set(a.keys()) != set(b.keys()):
raise NotImplementedError("Append bunches only works with strictly matching keys"
"For more complex merges, convert to pandas dataframe.")
# do the merge; only concatenate lists and np arrays right now
for k in a:
if isinstance(a[k], np.ndarray):
a[k] = np.concatenate((a[k], b[k]), axis=0)
elif isinstance(a[k], list):
a[k].extend(b[k])
else:
_logger.warning(f"bunch key '{k}' is a {a[k].__class__}. I don't know how to"
f" handle that. Use pandas for advanced features")
check_dimensions(a)
return a
[docs] def to_df(self):
return dataframe(self)
[docs]def dataframe(adict):
"""
Converts an Bunch conforming to size conventions into a pandas Dataframe
For 2-D arrays, stops at 10 columns per attribute
Parameters
----------
adict : dict, Bunch
A dict-like object of data to convert to DataFrame
Returns
-------
A pandas DataFrame of data
"""
if check_dimensions(adict) != 0:
raise ValueError("Can only convert to Dataframe objects with consistent size")
# easy case where there are only vectors
if all([len(adict[k].shape) == 1 for k in adict]):
return pd.DataFrame(adict)
# pandas has trouble with 2d data, chop it off with a limit of 10 columns per dataset
df = pd.DataFrame()
for k in adict.keys():
if adict[k].ndim == 1:
df[k] = adict[k]
elif adict[k].ndim == 2 and adict[k].shape[1] == 1:
df[k] = adict[k][:, 0]
elif adict[k].ndim == 2:
for i in np.arange(adict[k].shape[1]):
df[f"{k}_{i}"] = adict[k][:, i]
if i == 9:
break
else:
_logger.warning(f"{k} attribute is 3D or more and won't convert to dataframe")
continue
return df
def _find_metadata(file_alf) -> Path:
"""
File path for an existing meta-data file for an alf_file
Parameters
----------
file_alf : str, pathlib.Path
A path of existing ALF
Returns
-------
Path of meta-data file if exists
"""
file_alf = Path(file_alf)
ns, obj = file_alf.name.split('.')[:2]
meta_data_file = list(file_alf.parent.glob(f'{ns}.{obj}*.metadata*.json'))
if meta_data_file:
return meta_data_file[0]
[docs]def read_ts(filename):
"""
Load time-series from ALF format
Parameters
----------
filename : str, pathlib.Path
An ALF path whose values to load
Returns
-------
An array of timestamps and an array of values in filename
Examples
--------
t, d = alf.read_ts(filename)
"""
if not isinstance(filename, Path):
filename = Path(filename)
# alf format is object.attribute.extension, for example '_ibl_wheel.position.npy'
_, obj, attr, *_, ext = files.filename_parts(filename.parts[-1])
try:
# looking for matching object with attribute timestamps: '_ibl_wheel.timestamps.npy'
(time_file,), _ = filter_by(filename.parent, object=obj, attribute='times*', extension=ext)
assert time_file
except (ValueError, AssertionError):
name = spec.to_alf(obj, attr, ext)
_logger.error(name + ' not found! no time-scale for' + str(filename))
raise FileNotFoundError(name + ' not found! no time-scale for' + str(filename))
ts = np.load(filename.parent / time_file)
val = np.load(filename)
# Ensure timestamps
return ts2vec(ts, val.shape[0]), _ensure_flat(val)
def _ensure_flat(arr):
"""
Given a single column array, returns a flat vector. Other shapes are returned unchanged.
Parameters
----------
arr : numpy.array
An array with shape (n, 1)
Returns
-------
A vector with shape (n,)
"""
return arr.flatten() if arr.ndim == 2 and arr.shape[1] == 1 else arr
[docs]def ts2vec(ts: np.ndarray, n_samples: int) -> np.ndarray:
"""
Interpolate a continuous timeseries of the shape (2, 2)
Parameters
----------
ts : numpy.array
a 2x2 numpy array of the form (sample, ts)
n_samples : int
Number of samples; i.e. the size of the resulting vector
Returns
-------
A vector of interpolated timestamps
"""
if len(ts.shape) == 1:
return ts
elif ts.ndim == 2 and ts.shape[1] == 1:
return ts.flatten() # Deal with MATLAB single column array
if ts.ndim > 2 or ts.shape[1] != 2:
raise ValueError('Array shape should be (2, 2)')
# Linearly interpolate the times
x = np.arange(n_samples)
return np.interp(x, ts[:, 0], ts[:, 1])
[docs]def check_dimensions(dico):
"""
Test for consistency of dimensions as per ALF specs in a dictionary.
Alf broadcasting rules: only accepts consistent dimensions for a given axis
a dimension is consistent with another if it's empty, 1, or equal to the other arrays
dims [a, 1], [1, b] and [a, b] are all consistent, [c, 1] is not
Parameters
----------
dico : ALFBunch, dict
Dictionary containing data
Returns
-------
Status 0 for consistent dimensions, 1 for inconsistent dimensions
"""
shapes = [dico[lab].shape for lab in dico if isinstance(dico[lab], np.ndarray) and
lab.split('.')[0] != 'timestamps']
first_shapes = [sh[0] for sh in shapes]
# Continuous timeseries are permitted to be a (2, 2)
timeseries = [k for k, v in dico.items() if 'timestamps' in k and isinstance(v, np.ndarray)]
if any(timeseries):
for key in timeseries:
if dico[key].ndim == 1 or (dico[key].ndim == 2 and dico[key].shape[1] == 1):
# Should be vector with same length as other attributes
first_shapes.append(dico[key].shape[0])
elif dico[key].ndim > 1 and dico[key].shape != (2, 2):
return 1 # ts not a (2, 2) arr or a vector
ok = len(first_shapes) == 0 or set(first_shapes).issubset({max(first_shapes), 1})
return int(ok is False)
[docs]def load_file_content(fil):
"""
Returns content of files. Designed for very generic file formats:
so far supported contents are `json`, `npy`, `csv`, `tsv`, `ssv`, `jsonable`
Parameters
----------
fil : str, pathlib.Path
File to read
Returns
-------
Array/json/pandas dataframe depending on format
"""
if not fil:
return
fil = Path(fil)
if fil.stat().st_size == 0:
return
if fil.suffix == '.csv':
return pd.read_csv(fil)
if fil.suffix == '.json':
try:
with open(fil) as _fil:
return json.loads(_fil.read())
except Exception as e:
_logger.error(e)
return None
if fil.suffix == '.jsonable':
return jsonable.read(fil)
if fil.suffix == '.npy':
return _ensure_flat(np.load(file=fil, allow_pickle=True))
if fil.suffix == '.pqt':
return parquet.load(fil)[0]
if fil.suffix == '.ssv':
return pd.read_csv(fil, delimiter=' ')
if fil.suffix == '.tsv':
return pd.read_csv(fil, delimiter='\t')
return Path(fil)
def _ls(alfpath, object=None, **kwargs):
"""
Given a path, an object and a filter, returns all files and associated attributes
Parameters
----------
alfpath : str, pathlib.Path
The folder to list
object : str, list
An ALF object name to filter by
wildcards : bool
If true uses unix shell style pattern matching, otherwise uses regular expressions
kwargs : dict
Other ALF parts to filter, including namespace, attribute, etc.
Returns
-------
A list of ALF paths
"""
alfpath = Path(alfpath)
if not alfpath.exists():
files_alf = attributes = None
elif alfpath.is_dir():
if object is None:
# List all ALF files
files_alf, attributes = filter_by(alfpath, **kwargs)
else:
files_alf, attributes = filter_by(alfpath, object=object, **kwargs)
else:
object = files.filename_parts(alfpath.name)[1]
alfpath = alfpath.parent
files_alf, attributes = filter_by(alfpath, object=object, **kwargs)
# raise error if no files found
if not files_alf:
err_str = f'object "{object}" ' if object else 'ALF files'
raise ALFObjectNotFound(f'No {err_str} found in {alfpath}')
return [alfpath.joinpath(f) for f in files_alf], attributes
[docs]def exists(alfpath, object, attributes=None, **kwargs):
"""
Test if ALF object and optionally specific attributes exist in the given path
Parameters
----------
alfpath : str, pathlib.Path
The folder to look into
object : str
ALF object name
attributes : str, list
Wanted attributes
wildcards : bool
If true uses unix shell style pattern matching, otherwise uses regular expressions
kwargs : dict
Other ALF parts to filter by
Returns
-------
For multiple attributes, returns True only if all attributes are found
"""
# if the object is not found, return False
try:
_, attributes_found = _ls(alfpath, object, **kwargs)
except (FileNotFoundError, ALFObjectNotFound):
return False
# if object found and no attribute provided, True
if not attributes:
return True
# if attributes provided, test if all are found
if isinstance(attributes, str):
attributes = [attributes]
attributes_found = set(part[2] for part in attributes_found)
return set(attributes).issubset(attributes_found)
[docs]def load_object(alfpath, object=None, short_keys=False, **kwargs):
"""
Reads all files (ie. attributes) sharing the same object.
For example, if the file provided to the function is `spikes.times`, the function will
load `spikes.time`, `spikes.clusters`, `spikes.depths`, `spike.amps` in a dictionary
whose keys will be `time`, `clusters`, `depths`, `amps`
# TODO Change URL
Full Reference here: https://docs.internationalbrainlab.org/en/latest/04_reference.html#alf
Simplified example: _namespace_object.attribute_timescale.part1.part2.extension
Parameters
----------
alfpath : str, pathlib.Path, list
Any ALF path pertaining to the object OR directory containing ALFs OR list of paths
object : str, list, None
The ALF object(s) to filter by. If a directory is provided and object is None, all valid
ALF files returned
short_keys : bool
By default, the output dictionary keys will be compounds of attributes, timescale and
any eventual parts separated by a dot. Use True to shorten the keys to the attribute
and timescale
wildcards : bool
If true uses unix shell style pattern matching, otherwise uses regular expressions
kwargs : dict
Other ALF parts to filter by
Returns
-------
A ALFBunch (dict-like) of all attributes pertaining to the object
Examples
--------
# Load `spikes` object
spikes = ibllib.io.alf.load_object('/path/to/my/alffolder/', 'spikes')
# Load `trials` object under the `ibl` namespace
trials = ibllib.io.alf.load_object(session_path, 'trials', namespace='ibl')
"""
if isinstance(alfpath, (Path, str)):
if Path(alfpath).is_dir() and object is None:
raise ValueError('If a directory is provided, the object name should be provided too')
files_alf, parts = _ls(alfpath, object, **kwargs)
else: # A list of paths allows us to load an object from different revisions
files_alf = alfpath
parts = [files.filename_parts(x.name) for x in files_alf]
assert len(set(p[1] for p in parts)) == 1
# Take attribute and timescale from parts list
attributes = [p[2] if not p[3] else '_'.join(p[2:4]) for p in parts]
if not short_keys: # Include extra parts in the keys
attributes = [attr + ('.' + p[4] if p[4] else '') for attr, p in zip(attributes, parts)]
# TODO List duplicates; raise ALFError
assert len(set(attributes)) == len(attributes), (
f'multiple object {object} with the same attribute in {alfpath}, restrict parts/namespace')
out = AlfBunch({})
# load content for each file
for fil, att in zip(files_alf, attributes):
# if there is a corresponding metadata file, read it:
meta_data_file = _find_metadata(fil)
# if this is the actual meta-data file, skip and it will be read later
if meta_data_file == fil:
continue
out[att] = load_file_content(fil)
if meta_data_file:
meta = load_file_content(meta_data_file)
# the columns keyword splits array along the last dimension
if 'columns' in meta.keys():
out.update({v: out[att][::, k] for k, v in enumerate(meta['columns'])})
out.pop(att)
meta.pop('columns')
# if there is other stuff in the dictionary, save it, otherwise disregard
if meta:
out[att + 'metadata'] = meta
status = check_dimensions(out)
timeseries = [k for k in out.keys() if 'timestamps' in k]
if any(timeseries) and len(out.keys()) > len(timeseries) and status == 0:
# Get length of one of the other arrays
n_samples = next(v for k, v in out.items() if 'timestamps' not in k).shape[0]
for key in timeseries:
# Expand timeseries if necessary
out[key] = ts2vec(out[key], n_samples)
if status != 0:
print_sizes = '\n'.join([f'{v.shape}, {k}' for k, v in out.items()])
_logger.warning(f"Inconsistent dimensions for object: {object} \n{print_sizes}")
return out
[docs]def save_object_npy(alfpath, dico, object, parts=None, namespace=None, timescale=None):
"""
Saves a dictionary in alf format using object as object name and dictionary keys as attribute
names. Dimensions have to be consistent.
Reference here: https://github.com/cortex-lab/ALF TODO Fix link
Simplified example: _namespace_object.attribute.part1.part2.extension
Parameters
----------
alfpath : str, pathlib.Path
Path of the folder to save data to
dico : dict
Dictionary to save to npy; keys correspond to ALF attributes
object : str
Name of the object to save
parts : str, list, None
Extra parts to the ALF name
namespace : str, None
The optional namespace of the object
timescale : str, None
The optional timescale of the object
Returns
-------
List of written files
Examples
--------
save_object_npy('/path/to/my/alffolder/', spikes, 'spikes')
"""
alfpath = Path(alfpath)
status = check_dimensions(dico)
if status != 0:
raise ValueError('Dimensions are not consistent to save all arrays in ALF format: ' +
str([(k, v.shape) for k, v in dico.items()]))
out_files = []
for k, v in dico.items():
out_file = alfpath / spec.to_alf(object, k, 'npy',
extra=parts, namespace=namespace, timescale=timescale)
np.save(out_file, v)
out_files.append(out_file)
return out_files
[docs]def remove_uuid_file(file_path, dry=False):
"""
Renames a file without the UUID and returns the new pathlib.Path object
"""
if isinstance(file_path, str):
file_path = Path(file_path)
name_parts = file_path.name.split('.')
if not spec.is_uuid_string(name_parts[-2]):
return file_path
name_parts.pop(-2)
new_path = file_path.parent.joinpath('.'.join(name_parts))
if not dry and file_path.exists():
file_path.replace(new_path)
return new_path
[docs]def remove_uuid_recursive(folder, dry=False):
"""
Within a folder, recursive renaming of all files to remove UUID
"""
for fn in Path(folder).rglob('*.*'):
print(remove_uuid_file(fn, dry=dry))
[docs]def next_num_folder(session_date_folder: Union[str, Path]) -> str:
"""Return the next number for a session given a session_date_folder"""
session_date_folder = Path(session_date_folder)
if not session_date_folder.exists():
return '001'
session_nums = [
int(x.name) for x in session_date_folder.iterdir()
if x.is_dir() and not x.name.startswith('.') and x.name.isdigit()
]
out = f'{max(session_nums or [0]) + 1:03d}'
assert len(out) == 3, 'ALF spec does not support session numbers > 999'
return out
[docs]def remove_empty_folders(folder: Union[str, Path]) -> None:
"""Will iteratively remove any children empty folders"""
all_folders = sorted(x for x in Path(folder).rglob('*') if x.is_dir())
for f in reversed(all_folders): # Reversed sorted ensures we remove deepest first
try:
f.rmdir()
except Exception:
continue
[docs]def filter_by(alf_path, wildcards=True, **kwargs):
"""
Given a path and optional filters, returns all ALF files and their associated parts. The
filters constitute a logical AND. For all but `extra`, if a list is provided, one or more
elements must match (a logical OR).
Parameters
----------
alf_path : str, pathlib.Path
A path to a folder containing ALF datasets
wildcards : bool
If true, kwargs are matched as unix-style patterns, otherwise as regular expressions
object : str, list
Filter by a given object (e.g. 'spikes')
attribute : str, list
Filter by a given attribute (e.g. 'intervals')
extension : str, list
Filter by extension (e.g. 'npy')
namespace : str, list
Filter by a given namespace (e.g. 'ibl') or None for files without one
timescale : str, list
Filter by a given timescale (e.g. 'bpod') or None for files without one
extra : str, list
Filter by extra parameters (e.g. 'raw') or None for files without extra parts
NB: Wild cards not permitted here.
Returns
-------
alf_files : str
A Path to a directory containing ALF files
attributes : list of dicts
A list of parsed file parts
Examples
--------
# Filter files with universal timescale
filter_by(alf_path, timescale=None)
# Filter files by a given ALF object
filter_by(alf_path, object='wheel')
# Filter using wildcard, e.g. 'wheel' and 'wheelMoves' ALF objects
filter_by(alf_path, object='wh*')
# Filter all intervals that are in bpod time
filter_by(alf_path, attribute='intervals', timescale='bpod')
# Filter all files containing either 'intervals' OR 'timestamps' attributes
filter_by(alf_path, attribute=['intervals', 'timestamps'])
# Filter all files using a regular expression
filter_by(alf_path, object='^wheel.*', wildcards=False)
filter_by(alf_path, object=['^wheel$', '.*Moves'], wildcards=False)
"""
alf_files = [f for f in os.listdir(alf_path) if spec.is_valid(f)]
attributes = [files.filename_parts(f, as_dict=True) for f in alf_files]
if kwargs:
# Validate keyword arguments against regex group names
invalid = kwargs.keys() - spec.regex(FILE_SPEC).groupindex.keys()
if invalid:
raise TypeError("%s() got an unexpected keyword argument '%s'"
% (__name__, set(invalid).pop()))
# # Ensure 'extra' input is a list; if str split on dot
if 'extra' in kwargs and isinstance(kwargs['extra'], str):
kwargs['extra'] = kwargs['extra'].split('.')
def _match(part, pattern, split=None):
if pattern is None or part is None:
# If either is None, both should be None to match
return pattern is part
elif split:
# Check all provided extra fields match those in ALF
return all(elem in part.split(split) for elem in pattern if elem)
elif not isinstance(pattern, str):
if wildcards:
return any(_match(part, x, split) for x in pattern)
else:
return re.match('|'.join(pattern), part) is not None
else:
# Check given attribute matches, allowing wildcards
return fnmatch(part, pattern) if wildcards else re.match(pattern, part) is not None
# Iterate over ALF files
for file, attr in zip(alf_files.copy(), attributes.copy()):
for k, v in kwargs.items(): # Iterate over attributes
match = _match(attr[k], v, '.' if k == 'extra' else None)
if not match: # Remove file from list and move on to next file
alf_files.remove(file)
attributes.remove(attr)
break
return alf_files, [tuple(attr.values()) for attr in attributes]