Source code for oneibl.registration

from pathlib import Path
import json
import datetime
import logging

import ibllib.io.extractors.base
from dateutil import parser as dateparser
import re

import alf.io
from oneibl.one import ONE
from ibllib.misc import version
import ibllib.time
import ibllib.io.raw_data_loaders as raw
from ibllib.io import flags, hashfile
import ibllib.exceptions

_logger = logging.getLogger('ibllib.alf')
EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
REGISTRATION_GLOB_PATTERNS = ['alf/**/*.*',
                              'raw_behavior_data/**/_iblrig_*.*',
                              'raw_passive_data/**/_iblrig_*.*',
                              'raw_behavior_data/**/_iblmic_*.*',
                              'raw_video_data/**/_iblrig_*.*',
                              'raw_video_data/**/_ibl_*.*',
                              'raw_ephys_data/**/_iblrig_*.*',
                              'raw_ephys_data/**/_spikeglx_*.*',
                              'raw_ephys_data/**/_iblqc_*.*',
                              'spikesorters/**/_kilosort_*.*'
                              ]


def _check_filename_for_registration(full_file, patterns):
    for pat in patterns:
        reg = pat.replace('.', r'\.').replace('_', r'\_').replace('*', r'.*')
        if Path(full_file).suffix in EXCLUDED_EXTENSIONS:
            return False
        elif re.match(reg, Path(full_file).name, re.IGNORECASE):
            return True
    return False


[docs]def register_dataset(file_list, one=None, created_by=None, repository=None, server_only=False, versions=None, revisions=None, default=True, dry=False, max_md5_size=None): """ Registers a set of files belonging to a session only on the server :param file_list: (list of pathlib.Path or pathlib.Path) :param one: optional (oneibl.ONE), current one object, will create an instance if not provided :param created_by: (string) name of user in Alyx (defaults to 'root') :param repository: optional: (string) name of the repository in Alyx :param server_only: optional: (bool) if True only creates on the Flatiron (defaults to False) :param versions: optional (list of strings): versions tags (defaults to ibllib version) :param revisions: optional (list of strings): revision name (defaults to no revision) :param default: optional (bool) whether to set as default dataset (defaults to True) :param dry: (bool) False by default :param verbose: (bool) logs :param max_md5_size: (int) maximum file in bytes to compute md5 sum (always compute if Npne) defaults to None :return: """ if created_by is None: created_by = one._par.ALYX_LOGIN if file_list is None or file_list == '' or file_list == []: return elif not isinstance(file_list, list): file_list = [Path(file_list)] assert len(set([alf.io.get_session_path(f) for f in file_list])) == 1 assert all([Path(f).exists() for f in file_list]) if versions is None: versions = version.ibllib() if isinstance(versions, str): versions = [versions for _ in file_list] assert isinstance(versions, list) and len(versions) == len(file_list) if revisions is None: revisions = [None for _ in file_list] else: if isinstance(revisions, str): revisions = [revisions for _ in file_list] assert isinstance(revisions, list) and len(revisions) == len(file_list) # computing the md5 can be very long, so this is an option to skip if the file is bigger # than a certain threshold if max_md5_size: hashes = [hashfile.md5(p) if p.stat().st_size < max_md5_size else None for p in file_list] else: hashes = [hashfile.md5(p) for p in file_list] session_path = alf.io.get_session_path(file_list[0]) # first register the file r = {'created_by': created_by, 'path': session_path.relative_to((session_path.parents[2])).as_posix(), 'filenames': [p.relative_to(session_path).as_posix() for p in file_list], 'name': repository, 'server_only': server_only, 'hashes': hashes, 'filesizes': [p.stat().st_size for p in file_list], 'versions': versions, 'revisions': revisions, 'default': default} if not dry: if one is None: one = ONE() response = one.alyx.rest('register-file', 'create', data=r) for p in file_list: _logger.info(f"ALYX REGISTERED DATA: {p}") return response
[docs]def register_session_raw_data(session_path, one=None, overwrite=False, dry=False, **kwargs): """ Registers all files corresponding to raw data files to Alyx. It will select files that match Alyx registration patterns. :param session_path: :param one: one instance to work with :param overwrite: (False) if set to True, will patch the datasets. It will take very long. If set to False (default) will skip all already registered data. :param dry: do not register files, returns the list of files to be registered :return: list of file to register :return: Alyx response: dictionary of registered files """ session_path = Path(session_path) eid = one.eid_from_path(session_path, use_cache=False) # needs to make sure we're up to date # query the database for existing datasets on the session and allowed dataset types dsets = one.alyx.rest('datasets', 'list', session=eid) already_registered = [ session_path.joinpath(Path(ds['collection'] or '').joinpath(ds['name'])) for ds in dsets] dtypes = one.alyx.rest('dataset-types', 'list') registration_patterns = [dt['filename_pattern'] for dt in dtypes if dt['filename_pattern']] # glob all the files glob_patterns = [pat for pat in REGISTRATION_GLOB_PATTERNS if pat.startswith('raw')] files_2_register = [] for gp in glob_patterns: f2r = list(session_path.glob(gp)) files_2_register.extend(f2r) # filter 1/2 filter out datasets that do not match any dataset type files_2_register = list(filter(lambda f: _check_filename_for_registration( f, registration_patterns), files_2_register)) # filter 2/2 unless overwrite is True, filter out the datasets that already exists if not overwrite: files_2_register = list(filter(lambda f: f not in already_registered, files_2_register)) response = register_dataset(files_2_register, one=one, versions=None, dry=dry, **kwargs) return files_2_register, response
[docs]class RegistrationClient: """ Object that keeps the ONE instance and provides method to create sessions and register data. """ def __init__(self, one=None): self.one = one if not one: self.one = ONE() self.dtypes = self.one.alyx.rest('dataset-types', 'list') self.registration_patterns = [ dt['filename_pattern'] for dt in self.dtypes if dt['filename_pattern']] self.file_extensions = [df['file_extension'] for df in self.one.alyx.rest('data-formats', 'list')]
[docs] def create_sessions(self, root_data_folder, glob_pattern='**/create_me.flag', dry=False): """ Create sessions looking recursively for flag files :param root_data_folder: folder to look for create_me.flag :param dry: bool. Dry run if True :param glob_pattern: bool. Dry run if True :return: None """ flag_files = Path(root_data_folder).glob(glob_pattern) for flag_file in flag_files: if dry: print(flag_file) continue _logger.info('creating session for ' + str(flag_file.parent)) # providing a false flag stops the registration after session creation self.create_session(flag_file.parent) flag_file.unlink() return [ff.parent for ff in flag_files]
[docs] def create_session(self, session_path): """ create_session(session_path) """ return self.register_session(session_path, file_list=False)
[docs] def register_sync(self, root_data_folder, dry=False): """ Register sessions looking recursively for flag files :param root_data_folder: folder to look for register_me.flag :param dry: bool. Dry run if True :return: """ flag_files = Path(root_data_folder).glob('**/register_me.flag') for flag_file in flag_files: if dry: print(flag_file) continue file_list = flags.read_flag_file(flag_file) _logger.info('registering ' + str(flag_file.parent)) self.register_session(flag_file.parent, file_list=file_list) flags.write_flag_file(flag_file.parent.joinpath('flatiron.flag'), file_list=file_list) flag_file.unlink() if flag_file.parent.joinpath('create_me.flag').exists(): flag_file.parent.joinpath('create_me.flag').unlink() _logger.info('registered' + '\n')
[docs] def register_session(self, ses_path, file_list=True): """ Register session in Alyx :param ses_path: path to the session :param file_list: bool. Set to False will only create the session and skip registration :param repository_name: Optional, repository on which to register the data :return: Status string on error """ if isinstance(ses_path, str): ses_path = Path(ses_path) # read meta data from the rig for the session from the task settings file settings_json_file = list(ses_path.glob( '**/raw_behavior_data/_iblrig_taskSettings.raw*.json')) if not settings_json_file: settings_json_file = list(ses_path.glob('**/_iblrig_taskSettings.raw*.json')) if not settings_json_file: _logger.error(['could not find _iblrig_taskSettings.raw.json. Abort.']) return _logger.warning([f'Settings found in a strange place: {settings_json_file}']) else: settings_json_file = settings_json_file[0] md = _read_settings_json_compatibility_enforced(settings_json_file) # query alyx endpoints for subject, error if not found try: subject = self.one.alyx.rest('subjects?nickname=' + md['SUBJECT_NAME'], 'list')[0] except IndexError: _logger.error(f"Subject: {md['SUBJECT_NAME']} doesn't exist in Alyx. ABORT.") raise ibllib.exceptions.AlyxSubjectNotFound(md['SUBJECT_NAME']) # look for a session from the same subject, same number on the same day session_id, session = self.one.search(subjects=subject['nickname'], date_range=md['SESSION_DATE'], number=md['SESSION_NUMBER'], details=True) try: user = self.one.alyx.rest('users', 'read', id=md["PYBPOD_CREATOR"][0]) except Exception as e: _logger.error(f"User: {md['PYBPOD_CREATOR'][0]} doesn't exist in Alyx. ABORT") raise e username = user['username'] if user else subject['responsible_user'] # load the trials data to get information about session duration and performance ses_data = raw.load_data(ses_path) start_time, end_time = _get_session_times(ses_path, md, ses_data) n_trials, n_correct_trials = _get_session_performance(md, ses_data) # this is the generic relative path: subject/yyyy-mm-dd/NNN gen_rel_path = Path(subject['nickname'], md['SESSION_DATE'], '{0:03d}'.format(int(md['SESSION_NUMBER']))) # if nothing found create a new session in Alyx task_protocol = md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION_TAG'] alyx_procedure = _alyx_procedure_from_task(task_protocol) if not session: ses_ = {'subject': subject['nickname'], 'users': [username], 'location': md['PYBPOD_BOARD'], 'procedures': [] if alyx_procedure is None else [alyx_procedure], 'lab': subject['lab'], # 'project': project['name'], 'type': 'Experiment', 'task_protocol': task_protocol, 'number': md['SESSION_NUMBER'], 'start_time': ibllib.time.date2isostr(start_time), 'end_time': ibllib.time.date2isostr(end_time) if end_time else None, 'n_correct_trials': n_correct_trials, 'n_trials': n_trials, 'json': md, } session = self.one.alyx.rest('sessions', 'create', data=ses_) if md['SUBJECT_WEIGHT']: wei_ = {'subject': subject['nickname'], 'date_time': ibllib.time.date2isostr(start_time), 'weight': md['SUBJECT_WEIGHT'], 'user': username } self.one.alyx.rest('weighings', 'create', data=wei_) else: # TODO: if session exists and no json partial_upgrade it session = self.one.alyx.rest('sessions', 'read', id=session_id[0]) _logger.info(session['url'] + ' ') # create associated water administration if not found if not session['wateradmin_session_related'] and ses_data: wa_ = { 'subject': subject['nickname'], 'date_time': ibllib.time.date2isostr(end_time), 'water_administered': ses_data[-1]['water_delivered'] / 1000, 'water_type': md.get('REWARD_TYPE') or 'Water', 'user': username, 'session': session['url'][-36:], 'adlib': False} self.one.alyx.rest('water-administrations', 'create', data=wa_) # at this point the session has been created. If create only, exit if not file_list: return # register all files that match the Alyx patterns, warn user when files are encountered rename_files_compatibility(ses_path, md['IBLRIG_VERSION_TAG']) F = [] # empty list whose keys will be relative paths and content filenames md5s = [] file_sizes = [] for fn in _glob_session(ses_path): if fn.suffix in EXCLUDED_EXTENSIONS: _logger.debug('Excluded: ', str(fn)) continue if not _check_filename_for_registration(fn, self.registration_patterns): _logger.warning('No matching dataset type for: ' + str(fn)) continue if fn.suffix not in self.file_extensions: _logger.warning('No matching dataformat (ie. file extension) for: ' + str(fn)) continue if not _register_bool(fn.name, file_list): _logger.debug('Not in filelist: ' + str(fn)) continue try: assert (str(gen_rel_path) in str(fn)) except AssertionError as e: strerr = 'ALF folder mismatch: data is in wrong subject/date/number folder. \n' strerr += ' Expected ' + str(gen_rel_path) + ' actual was ' + str(fn) _logger.error(strerr) raise e # extract the relative path of the file rel_path = Path(str(fn)[str(fn).find(str(gen_rel_path)):]) F.append(str(rel_path.relative_to(gen_rel_path).as_posix())) file_sizes.append(fn.stat().st_size) md5s.append(hashfile.md5(fn) if fn.stat().st_size < 1024 ** 3 else None) _logger.info('Registering ' + str(fn)) r_ = {'created_by': username, 'path': str(gen_rel_path.as_posix()), 'filenames': F, 'hashes': md5s, 'filesizes': file_sizes, 'versions': [version.ibllib() for _ in F] } self.one.alyx.post('/register-file', data=r_)
def _alyx_procedure_from_task(task_protocol): task_type = ibllib.io.extractors.base.get_task_extractor_type(task_protocol) return _alyx_procedure_from_task_type(task_type) def _alyx_procedure_from_task_type(task_type): lookup = {'biased': 'Behavior training/tasks', 'biased_opto': 'Behavior training/tasks', 'habituation': 'Behavior training/tasks', 'training': 'Behavior training/tasks', 'ephys': 'Ephys recording with acute probe(s)', 'ephys_biased_opto': 'Ephys recording with acute probe(s)', 'ephys_training': 'Ephys recording with acute probe(s)', 'mock_ephys': 'Ephys recording with acute probe(s)', 'sync_ephys': 'Ephys recording with acute probe(s)'} if task_type in lookup: return lookup[task_type] def _register_bool(fn, file_list): if isinstance(file_list, bool): return file_list if isinstance(file_list, str): file_list = [file_list] return any([str(fil) in fn for fil in file_list]) def _read_settings_json_compatibility_enforced(json_file): with open(json_file) as js: md = json.load(js) if 'IBLRIG_VERSION_TAG' not in md.keys(): md['IBLRIG_VERSION_TAG'] = '3.2.3' if not md['IBLRIG_VERSION_TAG']: _logger.warning("You appear to be on an untagged version...") return md # 2018-12-05 Version 3.2.3 fixes (permanent fixes in IBL_RIG from 3.2.4 on) if version.le(md['IBLRIG_VERSION_TAG'], '3.2.3'): if 'LAST_TRIAL_DATA' in md.keys(): md.pop('LAST_TRIAL_DATA') if 'weighings' in md['PYBPOD_SUBJECT_EXTRA'].keys(): md['PYBPOD_SUBJECT_EXTRA'].pop('weighings') if 'water_administration' in md['PYBPOD_SUBJECT_EXTRA'].keys(): md['PYBPOD_SUBJECT_EXTRA'].pop('water_administration') if 'IBLRIG_COMMIT_HASH' not in md.keys(): md['IBLRIG_COMMIT_HASH'] = 'f9d8905647dbafe1f9bdf78f73b286197ae2647b' # parse the date format to Django supported ISO dt = dateparser.parse(md['SESSION_DATETIME']) md['SESSION_DATETIME'] = ibllib.time.date2isostr(dt) # add the weight key if it doesn't already exists if 'SUBJECT_WEIGHT' not in md.keys(): md['SUBJECT_WEIGHT'] = None return md
[docs]def rename_files_compatibility(ses_path, version_tag): if not version_tag: return if version.le(version_tag, '3.2.3'): task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy') for fn in task_code: fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy')) task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip') for fn in task_code: fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
def _get_session_times(fn, md, ses_data): """ Get session start and end time from the Bpod data """ start_time = ibllib.time.isostr2date(md['SESSION_DATETIME']) if not ses_data: return start_time, None c = 0 for sd in reversed(ses_data): ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] - sd['behavior_data']['Bpod start timestamp']) if ses_duration_secs < (6 * 3600): break c += 1 if c: _logger.warning((f'Trial end timestamps of last {c} trials above 6 hours ' f'(most likely corrupt): ') + str(fn)) end_time = start_time + datetime.timedelta(seconds=ses_duration_secs) return start_time, end_time def _get_session_performance(md, ses_data): """Get performance about the session from bpod data""" if not ses_data: return None, None n_trials = ses_data[-1]['trial_num'] # checks that the number of actual trials and labeled number of trials check out assert (len(ses_data) == n_trials) # task specific logic if 'habituationChoiceWorld' in md['PYBPOD_PROTOCOL']: n_correct_trials = 0 else: n_correct_trials = ses_data[-1]['ntrials_correct'] return n_trials, n_correct_trials def _glob_session(ses_path): """ Glob for files to be registered on an IBL session :param ses_path: pathlib.Path of the session :return: a list of files to potentially be registered """ fl = [] for gp in REGISTRATION_GLOB_PATTERNS: fl.extend(list(ses_path.glob(gp))) return fl