"""Session creation and datasets registration.
The RegistrationClient provides an high-level API for creating experimentation sessions on Alyx
and registering associated datasets.
Summary of methods
create_new_session - Create a new local session folder and optionally create session record on Alyx
create_sessions - Create sessions and register files for folder containing a given flag file
register_session - Create a session on Alyx from local path and register any ALF datasets present
register_files - Register a list of files to their respective sessions on Alyx
import pathlib
from uuid import UUID
from pathlib import Path, PurePosixPath
import datetime
import logging
import itertools
from collections import defaultdict
from fnmatch import fnmatch
import shutil
import requests.exceptions
from iblutil.io import hashfile
from iblutil.util import Bunch, ensure_list
import one.alf.io as alfio
from one.alf.path import ALFPath, session_path_parts, ensure_alf_path, folder_parts
from one.alf.spec import is_valid
import one.alf.exceptions as alferr
from one.api import ONE
from one.webclient import no_cache
_logger = logging.getLogger(__name__)
def get_dataset_type(filename, dtypes):
"""Get the dataset type from a given filename.
A dataset type is matched one of two ways:
1. the filename matches the dataset type filename_pattern;
2. if filename_pattern is empty, the filename object.attribute matches the dataset type name.
filename : str, pathlib.Path
The filename or filepath.
dtypes : iterable
An iterable of dataset type objects with the attributes ('name', 'filename_pattern').
The matching dataset type object for filename.
filename doesn't match any of the dataset types
filename matches multiple dataset types
dataset_types = []
filename = ensure_alf_path(filename)
for dt in dtypes:
if not dt.filename_pattern.strip():
# If the filename pattern is null, check whether the filename object.attribute matches
# the dataset type name.
if is_valid(filename.name):
obj_attr = '.'.join(filename.dataset_name_parts[1:3])
else: # will match name against filename sans extension
obj_attr = filename.stem
if dt.name == obj_attr:
# Check whether pattern matches filename
elif fnmatch(filename.name.casefold(), dt.filename_pattern.casefold()):
n = len(dataset_types)
if n == 0:
raise ValueError(f'No dataset type found for filename "{filename.name}"')
elif n >= 2:
raise ValueError('Multiple matching dataset types found for filename '
f'"{filename.name}": \n{", ".join(map(str, dataset_types))}')
return dataset_types[0]
class RegistrationClient:
"""Methods to create sessions and register data."""
def __init__(self, one=None):
self.one = one
if not one:
self.one = ONE(cache_rest=None)
elif one.alyx.cache_mode == 'GET':
_logger.warning('AlyxClient REST cache active; '
'this may cause issues with registration.')
self.dtypes = list(map(Bunch, self.one.alyx.rest('dataset-types', 'list')))
self.registration_patterns = [
dt['filename_pattern'] for dt in self.dtypes if dt['filename_pattern']]
self.file_extensions = [df['file_extension'] for df in
self.one.alyx.rest('data-formats', 'list', no_cache=True)]
def create_sessions(self, root_data_folder, glob_pattern='**/create_me.flag',
register_files=False, dry=False):
"""Create sessions looking recursively for flag files.
root_data_folder : str, pathlib.Path
Folder to look for sessions.
glob_pattern : str
Register valid sessions that contain this pattern.
register_files : bool
If true, register all valid datasets within the session folder.
dry : bool
If true returns list of sessions without creating them on Alyx.
list of pathlib.Paths
Newly created session paths.
list of dicts
Alyx session records.
flag_files = list(Path(root_data_folder).glob(glob_pattern))
records = []
for flag_file in flag_files:
if dry:
session_path = ALFPath(flag_file.parent)
_logger.info('creating session for ' + str(session_path))
# providing a false flag stops the registration after session creation
session_info, _ = self.register_session(session_path, file_list=register_files)
return [ALFPath(ff.parent) for ff in flag_files], records
def create_new_session(self, subject, session_root=None, date=None, register=True, **kwargs):
"""Create a new local session folder and optionally create session record on Alyx.
subject : str
The subject name. Must exist on Alyx.
session_root : str, pathlib.Path
The root folder in which to create the subject/date/number folder. Defaults to ONE
cache directory.
date : datetime.datetime, datetime.date, str
An optional date for the session. If None the current time is used.
register : bool
If true, create session record on Alyx database.
Optional arguments for RegistrationClient.register_session.
New local session path.
The experiment UUID if register is True.
Create a local session only
>>> session_path, _ = RegistrationClient().create_new_session('Ian', register=False)
Register a session on Alyx in a specific location
>>> session_path, eid = RegistrationClient().create_new_session('Sy', '/data/lab/Subjects')
Create a session for a given date
>>> session_path, eid = RegistrationClient().create_new_session('Ian', date='2020-01-01')
assert not self.one.offline, 'ONE must be in online mode'
date = self.ensure_ISO8601(date) # Format, validate
# Ensure subject exists on Alyx
self.assert_exists(subject, 'subjects')
session_root = Path(session_root or self.one.alyx.cache_dir) / subject / date[:10]
session_path = session_root / alfio.next_num_folder(session_root)
session_path.mkdir(exist_ok=True, parents=True) # Ensure folder exists on disk
if register:
session_info, _ = self.register_session(session_path, **kwargs)
eid = UUID(session_info['url'][-36:])
eid = None
return session_path, eid
def find_files(self, session_path):
"""Returns a generator of file names that match one of the dataset type patterns in Alyx.
session_path : str, pathlib.Path
The session path to search.
File paths that match the dataset type patterns in Alyx.
session_path = ALFPath(session_path)
for p in session_path.iter_datasets(recursive=True):
if any(p.name.endswith(ext) for ext in self.file_extensions):
get_dataset_type(p, self.dtypes)
yield p
except ValueError as ex:
_logger.debug('%s', ex.args[0])
def assert_exists(self, member, endpoint):
"""Raise an error if a given member doesn't exist on Alyx database.
member : str, uuid.UUID, list
The member ID(s) to verify
endpoint: str
The endpoint at which to look it up
>>> client.assert_exists('ALK_036', 'subjects')
>>> client.assert_exists('user_45', 'users')
>>> client.assert_exists('local_server', 'repositories')
Subject does not exist on Alyx
Member does not exist on Alyx
Failed to connect to Alyx database or endpoint not found
dict, list of dict
The endpoint data if member exists.
if isinstance(member, (str, UUID)):
return self.one.alyx.rest(endpoint, 'read', id=str(member), no_cache=True)
except requests.exceptions.HTTPError as ex:
if ex.response.status_code != 404:
raise ex
elif endpoint == 'subjects':
raise alferr.AlyxSubjectNotFound(member)
raise alferr.ALFError(f'Member "{member}" doesn\'t exist in {endpoint}')
return [self.assert_exists(x, endpoint) for x in member]
def ensure_ISO8601(date) -> str:
"""Ensure provided date is ISO 8601 compliant.
date : str, None, datetime.date, datetime.datetime
An optional date to convert to ISO string. If None, the current datetime is used.
The datetime as an ISO 8601 string
date = date or datetime.datetime.now() # If None get current time
if isinstance(date, str):
# FIXME support timezone aware strings, e.g. '2023-03-09T17:08:12.4465024+00:00'
date = datetime.datetime.fromisoformat(date) # Validate by parsing
elif type(date) is datetime.date:
date = datetime.datetime.fromordinal(date.toordinal())
return datetime.datetime.isoformat(date)
def register_session(self, ses_path, users=None, file_list=True, **kwargs):
"""Register session in Alyx.
NB: If providing a lab or start_time kwarg, they must match the lab (if there is one)
and date of the session path.
ses_path : str, pathlib.Path
The local session path
users : str, list
The user(s) to attribute to the session
file_list : bool, list
An optional list of file paths to register. If True, all valid files within the
session folder are registered. If False, no files are registered
location : str
The optional location within the lab where the experiment takes place
procedures : str, list
An optional list of procedures, e.g. 'Behavior training/tasks'
n_correct_trials : int
The number of correct trials (optional)
n_trials : int
The total number of completed trials (optional)
json : dict, str
Optional JSON data
projects: str, list
The project(s) to which the experiment belongs (optional)
type : str
The experiment type, e.g. 'Experiment', 'Base'
task_protocol : str
The task protocol (optional)
lab : str
The name of the lab where the session took place. If None the lab name will be
taken from the path. If no lab name is found in the path (i.e. no <lab>/Subjects)
the default lab on Alyx will be used.
start_time : str, datetime.datetime
The precise start time of the session. The date must match the date in the session
end_time : str, datetime.datetime
The precise end time of the session.
An Alyx session record
list, None
Alyx file records (or None if file_list is False)
Subject does not exist on Alyx or provided start_time does not match date in
session path.
The provided lab name does not match the one found in the session path or
start_time/end_time is not a valid ISO date time.
A 400 status code means the submitted data was incorrect (e.g. task_protocol was an
int instead of a str); A 500 status code means there was a server error.
Failed to connect to Alyx, most likely due to a bad internet connection.
ses_path = ALFPath(ses_path)
details = session_path_parts(ses_path.as_posix(), as_dict=True, assert_valid=True)
# query alyx endpoints for subject, error if not found
self.assert_exists(details['subject'], 'subjects')
# look for a session from the same subject, same number on the same day
with no_cache(self.one.alyx):
session_id, session = self.one.search(subject=details['subject'],
details=True, query_type='remote')
users = ensure_list(users or self.one.alyx.user)
self.assert_exists(users, 'users')
# if nothing found create a new session in Alyx
ses_ = {'subject': details['subject'],
'users': users,
'type': 'Experiment',
'number': details['number']}
if kwargs.get('end_time', False):
ses_['end_time'] = self.ensure_ISO8601(kwargs.pop('end_time'))
start_time = self.ensure_ISO8601(kwargs.pop('start_time', details['date']))
assert start_time[:10] == details['date'], 'start_time doesn\'t match session path'
if kwargs.get('procedures', False):
ses_['procedures'] = ensure_list(kwargs.pop('procedures'))
if kwargs.get('projects', False):
ses_['projects'] = ensure_list(kwargs.pop('projects'))
assert ('subject', 'number') not in kwargs
if 'lab' not in kwargs and details['lab']:
kwargs.update({'lab': details['lab']})
elif details['lab'] and kwargs.get('lab', details['lab']) != details['lab']:
names = (kwargs['lab'], details['lab'])
raise ValueError('lab kwarg "%s" does not match lab name in path ("%s")' % names)
if not session: # Create from scratch
ses_['start_time'] = start_time
session = self.one.alyx.rest('sessions', 'create', data=ses_)
else: # Update existing
if start_time:
ses_['start_time'] = self.ensure_ISO8601(start_time)
session = self.one.alyx.rest('sessions', 'update', id=session_id[0], data=ses_)
_logger.info(session['url'] + ' ')
# at this point the session has been created. If create only, exit
if not file_list:
return session, None
recs = self.register_files(self.find_files(ses_path) if file_list is True else file_list)
if recs: # Update local session data after registering files
session['data_dataset_session_related'] = ensure_list(recs)
return session, recs
def prepare_files(self, file_list, versions=None):
"""Validate file list for registration and group files by session path.
file_list : list, str, pathlib.Path
A filepath (or list thereof) of ALF datasets to register to Alyx.
versions : str, list of str
Optional version tags.
list of dicts
A dict containing a list of files for each session.
list of dicts
A dict containing a list of versions for each session.
A list of files converted to paths.
A boolean indicating if input was a single file.
F = defaultdict(list) # empty map whose keys will be session paths
V = defaultdict(list) # empty map for versions
if single_file := isinstance(file_list, (str, pathlib.Path)):
file_list = [file_list]
file_list = list(map(ALFPath, file_list)) # Ensure list of path objects
if versions is None or isinstance(versions, str):
versions = itertools.repeat(versions)
versions = itertools.cycle(versions)
# Filter valid files and sort by session
for fn, ver in zip(file_list, versions):
session_path = fn.session_path()
if not session_path:
_logger.debug(f'{fn}: Invalid session path')
if fn.suffix not in self.file_extensions:
_logger.debug(f'{fn}: No matching extension "{fn.suffix}" in database')
get_dataset_type(fn, self.dtypes)
except ValueError as ex:
_logger.debug('%s', ex.args[0])
return F, V, file_list, single_file
def check_protected_files(self, file_list, created_by=None):
"""Check whether a set of files associated to a session are protected.
file_list : list, str, pathlib.Path
A filepath (or list thereof) of ALF datasets to register to Alyx.
created_by : str
Name of Alyx user (defaults to whoever is logged in to ONE instance).
list of dicts, dict
A status for each session whether any of the files specified are protected
datasets or not.If none of the datasets are protected, a response with status
200 is returned, if any of the files are protected a response with status
403 is returned.
# Validate files and rearrange into list per session
F, _, _, single_file = self.prepare_files(file_list)
# For each unique session, make a separate POST request
records = []
for session_path, files in F.items():
# this is the generic relative path: subject/yyyy-mm-dd/NNN
details = session_path_parts(session_path.as_posix(), as_dict=True, assert_valid=True)
rel_path = PurePosixPath(details['subject'], details['date'], details['number'])
r_ = {'created_by': created_by or self.one.alyx.user,
'path': rel_path.as_posix(),
'filenames': [x.as_posix() for x in files]
records.append(self.one.alyx.get('/check-protected', data=r_, clobber=True))
return records[0] if single_file else records
def register_files(self, file_list,
versions=None, default=True, created_by=None, server_only=False,
repository=None, exists=True, dry=False, max_md5_size=None, **kwargs):
"""Registers a set of files belonging to a session only on the server.
file_list : list, str, pathlib.Path
A filepath (or list thereof) of ALF datasets to register to Alyx.
versions : str, list of str
Optional version tags.
default : bool
Whether to set as default revision (defaults to True).
created_by : str
Name of Alyx user (defaults to whoever is logged in to ONE instance).
server_only : bool
Will only create file records in the 'online' repositories and skips local repositories
repository : str
Name of the repository in Alyx to register to.
exists : bool
Whether the files exist on the repository (defaults to True).
dry : bool
When true returns POST data for registration endpoint without submitting the data.
max_md5_size : int
Maximum file in bytes to compute md5 sum (always compute if None).
exists : bool
Whether files exist in the repository. May be set to False when registering files
before copying to the repository.
Extra arguments directly passed as REST request data to /register-files endpoint.
list of dicts, dict
A list of newly created Alyx dataset records or the registration data if dry. If
a single file is passed in, a single dict is returned.
- The registered files may be automatically moved to new revision folders if they are
protected on Alyx, therefore it's important to check the relative paths of the output.
- Protected datasets are not checked in dry mode.
- In most circumstances a new revision will be added automatically, however if this fails
a 403 HTTP status may be returned.
Submitted data not valid (400 status code)
Server side database error (500 status code)
Revision protected (403 status code)
F, V, file_list, single_file = self.prepare_files(file_list, versions=versions)
# For each unique session, make a separate POST request
records = [None] * (len(F) if dry else len(file_list)) # If dry return data per session
for session_path, files in F.items():
# this is the generic relative path: subject/yyyy-mm-dd/NNN
details = session_path_parts(session_path.as_posix(), as_dict=True, assert_valid=True)
rel_path = PurePosixPath(details['subject'], details['date'], details['number'])
file_sizes = [session_path.joinpath(fn).stat().st_size for fn in files]
# computing the md5 can be very long, so this is an option to skip if the file is
# bigger than a certain threshold
md5s = [hashfile.md5(session_path.joinpath(fn))
if (max_md5_size is None or sz < max_md5_size) else None
for fn, sz in zip(files, file_sizes)]
_logger.info('Registering ' + str(files))
r_ = {'created_by': created_by or self.one.alyx.user,
'path': rel_path.as_posix(),
'filenames': [x.as_posix() for x in files],
'hashes': md5s,
'filesizes': file_sizes,
'name': repository,
'exists': exists,
'server_only': server_only,
'default': default,
'versions': V[session_path],
'check_protected': True,
# Add optional field
if details['lab'] and 'labs' not in kwargs:
r_['labs'] = details['lab']
# If dry, store POST data, otherwise store resulting file records
if dry:
records[list(F).index(session_path)] = r_
response = self.one.alyx.post('/register-file', data=r_)
# Ensure we keep the order of the output records: the files missing will remain
# as None type
for f, r in zip(files, response):
records[file_list.index(session_path / f)] = r
except requests.exceptions.HTTPError as err:
# 403 response when datasets already registered and protected by tags
err_message = err.response.json()
if not (err_message.get('status_code') == 403 and
err_message.get('error') == 'One or more datasets is protected'):
raise err # Some other error occurred; re-raise
response = err_message['details']
today_revision = datetime.datetime.today().strftime('%Y-%m-%d')
new_file_list = []
for fl, res in zip(files, response):
(name, prot_info), = res.items()
# Dataset has not yet been registered
if not prot_info:
# Check to see if the file path already has a revision in it
file_revision = folder_parts(rel_path / fl, as_dict=True)['revision']
# Find existing protected revisions
existing_revisions = [k for pr in prot_info for k, v in pr.items() if v]
if file_revision:
# If the revision explicitly defined by the user doesn't exist or
# is not protected, register as is
if file_revision not in existing_revisions:
revision_path = fl.parent
# Find the next sub-revision that isn't protected
new_revision = self._next_revision(file_revision, existing_revisions)
revision_path = fl.parent.parent.joinpath(f'#{new_revision}#')
if revision_path != fl.parent:
_logger.info('Moving %s -> %s', fl, revision_path.joinpath(fl.name))
shutil.move(session_path / fl, session_path / revision_path / fl.name)
# The file wasn't in a revision folder but is protected
fl_path = fl.parent
assert name == fl_path.joinpath(fl.name).as_posix()
# Find info about the latest revision
# N.B on django side prot_info is sorted by latest revisions first
(latest_revision, protected), = prot_info[0].items()
# If the latest revision is the original and it is unprotected
# no need for revision e.g {'clusters.amp.npy': [{'': False}]}
if latest_revision == '' and not protected:
# Use original path
revision_path = fl_path
# If there already is a revision but it is unprotected,
# move into this revision folder e.g
# {'clusters.amp.npy':
# [{'2022-10-31': False}, {'2022-05-31': True}, {'': True}]}
elif not protected:
# Check that the latest_revision has the date naming convention we expect
# i.e. 'YYYY-MM-DD'
_ = datetime.datetime.strptime(latest_revision[:10], '%Y-%m-%d')
revision_path = fl_path.joinpath(f'#{latest_revision}#')
# If it doesn't it probably has been made manually so we don't want to
# overwrite this and instead use today's date
except ValueError:
# NB: It's possible that today's date revision is also protected but is
# not the most recent revision. In this case it's safer to let fail.
revision_path = fl_path.joinpath(f'#{today_revision}#')
# If protected and the latest protected revision is from today we need to make
# a sub-revision
elif protected and today_revision in latest_revision:
if latest_revision == today_revision: # iterate from appending 'a'
new_revision = self._next_revision(today_revision, existing_revisions)
else: # assume the revision is date + character, e.g. '2020-01-01c'
alpha = latest_revision[-1] # iterate from this character
new_revision = self._next_revision(
today_revision, existing_revisions, alpha)
revision_path = fl_path.joinpath(f'#{new_revision}#')
# Otherwise cases move into revision from today
# e.g {'clusters.amp.npy': [{'': True}]}
# e.g {'clusters.amp.npy': [{'2022-10-31': True}, {'': True}]}
revision_path = fl_path.joinpath(f'#{today_revision}#')
# Only move for the cases where a revision folder has been made
if revision_path != fl_path:
_logger.info('Moving %s -> %s', fl, revision_path.joinpath(fl.name))
shutil.move(session_path / fl, session_path / revision_path / fl.name)
assert len(new_file_list) == len(files)
r_['filenames'] = [p.as_posix() for p in new_file_list]
r_['filesizes'] = [session_path.joinpath(p).stat().st_size for p in new_file_list]
r_['check_protected'] = False # Speed things up by ignoring server-side checks
response = self.one.alyx.post('/register-file', data=r_)
for f, r in zip(files, response): # Populate records list in correct order
records[file_list.index(session_path / f)] = r
files = new_file_list
# Log file names
_logger.info(f'ALYX REGISTERED DATA {"!DRY!" if dry else ""}: {rel_path}')
for p in files:
_logger.info(f'ALYX REGISTERED DATA: {p}')
return records[0] if single_file else records
def _next_revision(revision: str, reserved: list = None, alpha: str = 'a') -> str:
"""Return the next logical revision that is not already in the provided list.
Revisions will increment by appending a letter to a date or other identifier.
revision : str
The revision on which to base the new revision.
reserved : list of str
A list of reserved (i.e. already existing) revision strings.
alpha : str
The starting character as an integer, defaults to 'a'.
The next logical revision string that's not in the reserved list.
>>> RegistrationClient._next_revision('2020-01-01')
>>> RegistrationClient._next_revision('2020-01-01', ['2020-01-01a', '2020-01-01b'])
>>> RegistrationClient._next_revision('2020-01-01', ['2020-01-01a', '2020-01-01b'])
if len(alpha) != 1:
raise TypeError(
f'`alpha` must be a character; received a string of length {len(alpha)}'
i = ord(alpha)
new_revision = revision + chr(i)
while new_revision in (reserved or []):
i += 1
new_revision = revision + chr(i)
return new_revision
def register_water_administration(self, subject, volume, **kwargs):
"""Register a water administration to Alyx for a given subject.
subject : str
A subject nickname that exists on Alyx
volume : float
The total volume administrated in ml
date_time : str, datetime.datetime, datetime.date
The time of administration. If None, the current time is used.
water_type : str
A water type that exists in Alyx; default is 'Water'
user : str
The user who administrated the water. Currently logged-in user is the default.
session : str, UUID, pathlib.Path, dict
An optional experiment ID to associate
adlib : bool
If true, indicates that the subject was given water ad libitum
A water administration record
Subject does not exist on Alyx
User does not exist on Alyx
date_time is not a valid ISO date time or session ID is not valid
Failed to connect to database, or submitted data not valid (500)
# Ensure subject exists
self.assert_exists(subject, 'subjects')
# Ensure user(s) exist
user = kwargs.pop('user', self.one.alyx.user)
self.assert_exists(user, 'users')
# Ensure volume not zero
if volume == 0:
raise ValueError('Water volume must be greater than zero')
# Post water admin
wa_ = {
'subject': subject,
'date_time': self.ensure_ISO8601(kwargs.pop('date_time', None)),
'water_administered': float(f'{volume:.4g}'), # Round to 4 s.f.
'water_type': kwargs.pop('water_type', 'Water'),
'user': user,
'adlib': kwargs.pop('adlib', False)
# Ensure session is valid; convert to eid
if kwargs.get('session', False):
wa_['session'] = str(self.one.to_eid(kwargs.pop('session')) or '')
if not wa_['session']:
raise ValueError('Failed to parse session ID')
return self.one.alyx.rest('water-administrations', 'create', data=wa_)
def register_weight(self, subject, weight, date_time=None, user=None):
"""Register a subject weight to Alyx.
subject : str
A subject nickname that exists on Alyx.
weight : float
The subject weight in grams.
date_time : str, datetime.datetime, datetime.date
The time of weighing. If None, the current time is used.
user : str
The user who performed the weighing. Currently logged-in user is the default.
An Alyx weight record
Subject does not exist on Alyx
User does not exist on Alyx
date_time is not a valid ISO date time or weight < 1e-4
Failed to connect to database, or submitted data not valid (500)
# Ensure subject exists
self.assert_exists(subject, 'subjects')
# Ensure user(s) exist
user = user or self.one.alyx.user
self.assert_exists(user, 'users')
# Ensure weight not zero
if weight == 0:
raise ValueError('Water volume must be greater than 0')
# Post water admin
wei_ = {'subject': subject,
'date_time': self.ensure_ISO8601(date_time),
'weight': float(f'{weight:.4g}'), # Round to 4 s.f.
'user': user}
return self.one.alyx.rest('weighings', 'create', data=wei_)