"""Lab server pipeline construction and task runner.
This is the module called by the job services on the lab servers. See
iblscripts/deploy/serverpc/crons for the service scripts that employ this module.
"""
import logging
import time
from datetime import datetime
from pathlib import Path
import re
import subprocess
import sys
import traceback
import importlib
import importlib.metadata
from one.api import ONE
from one.webclient import AlyxClient
from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id
from ibllib import __version__ as ibllib_version
from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type
from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
from ibllib.time import date2isostr
from ibllib.oneibl.registration import IBLRegistrationClient
from ibllib.oneibl.data_handlers import get_local_data_repository
from ibllib.io.session_params import read_params
from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session
_logger = logging.getLogger(__name__)
LARGE_TASKS = [
'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'
]
def _get_pipeline_class(session_path, one):
pipeline = get_pipeline(session_path)
if pipeline == 'training':
PipelineClass = training_preprocessing.TrainingExtractionPipeline
elif pipeline == 'ephys':
PipelineClass = ephys_preprocessing.EphysExtractionPipeline
else:
# try and look if there is a custom extractor in the personal projects extraction class
import projects.base
task_type = get_session_extractor_type(session_path)
PipelineClass = projects.base.get_pipeline(task_type)
_logger.info(f"Using {PipelineClass} pipeline for {session_path}")
return PipelineClass(session_path=session_path, one=one)
def _run_command(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
info, error = process.communicate()
if process.returncode != 0:
return None
else:
return info.decode('utf-8').strip()
def _get_volume_usage(vol, label=''):
cmd = f'df {vol}'
res = _run_command(cmd)
# size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']
size_list = re.split(' +', res.split('\n')[-1])
fac = 1024 ** 2
d = {'total': int(size_list[1]) / fac,
'used': int(size_list[2]) / fac,
'available': int(size_list[3]) / fac,
'volume': size_list[5]}
return {f"{label}_{k}": d[k] for k in d}
[docs]
def report_health(one):
"""
Get a few indicators and label the json field of the corresponding lab with them.
"""
status = {'python_version': sys.version,
'ibllib_version': ibllib_version,
'phylib_version': importlib.metadata.version('phylib'),
'local_time': date2isostr(datetime.now())}
status.update(_get_volume_usage('/mnt/s0/Data', 'raid'))
status.update(_get_volume_usage('/', 'system'))
data_repos = one.alyx.rest('data-repository', 'list', globus_endpoint_id=get_local_endpoint_id())
for dr in data_repos:
one.alyx.json_field_update(endpoint='data-repository', uuid=dr['name'], field_name='json', data=status)
[docs]
def job_creator(root_path, one=None, dry=False, rerun=False):
"""
Create new sessions and pipelines.
Server function that will look for 'raw_session.flag' files and for each:
1) create the session on Alyx
2) create the tasks to be run on Alyx
For legacy sessions the raw data are registered separately, instead of within a pipeline task.
Parameters
----------
root_path : str, pathlib.Path
Main path containing sessions or a session path.
one : one.api.OneAlyx
An ONE instance for registering the session(s).
dry : bool
If true, simply log the session_path(s) found, without registering anything.
rerun : bool
If true and session pipeline tasks already exist, set them all to waiting.
Returns
-------
list of ibllib.pipes.tasks.Pipeline
The pipelines created.
list of dicts
A list of any datasets registered (only for legacy sessions)
"""
_logger.info('Start looking for new sessions...')
if not one:
one = ONE(cache_rest=None)
rc = IBLRegistrationClient(one=one)
flag_files = list(Path(root_path).glob('**/raw_session.flag'))
pipes = []
all_datasets = []
for flag_file in flag_files:
session_path = flag_file.parent
_logger.info(f'creating session for {session_path}')
if dry:
continue
try:
# if the subject doesn't exist in the database, skip
rc.register_session(session_path, file_list=False)
# NB: all sessions now extracted using dynamic pipeline
if read_params(session_path) is None:
# Create legacy experiment description file
acquisition_description_legacy_session(session_path, save=True)
pipe = make_pipeline(session_path, one=one)
if rerun:
rerun__status__in = '__all__'
else:
rerun__status__in = ['Waiting']
pipe.create_alyx_tasks(rerun__status__in=rerun__status__in)
flag_file.unlink()
if pipe is not None:
pipes.append(pipe)
except Exception:
_logger.error('Failed to register session %s:\n%s', session_path.relative_to(root_path), traceback.format_exc())
continue
return pipes, all_datasets
[docs]
def task_queue(mode='all', lab=None, alyx=None, env=(None,)):
"""
Query waiting jobs from the specified Lab
Parameters
----------
mode : {'all', 'small', 'large'}
Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs.
lab : str
Lab name as per Alyx, otherwise try to infer from local Globus install.
alyx : one.webclient.AlyxClient
An Alyx instance.
env : list
One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`.
Returns
-------
list of dict
A list of Alyx tasks associated with `lab` that have a 'Waiting' status.
"""
def predicate(task):
classe = tasks.str2class(task['executable'])
return (mode == 'all' or classe.job_size == mode) and classe.env in env
alyx = alyx or AlyxClient(cache_rest=None)
if lab is None:
_logger.debug('Trying to infer lab from globus installation')
lab = get_lab_from_endpoint_id(alyx=alyx)
if lab is None:
_logger.error('No lab provided or found')
return # if the lab is none, this will return empty tasks each time
data_repo = get_local_data_repository(alyx)
# Filter for tasks
waiting_tasks = alyx.rest('tasks', 'list', status='Waiting',
django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True)
# Filter tasks by size
filtered_tasks = filter(predicate, waiting_tasks)
# Order tasks by priority
sorted_tasks = sorted(filtered_tasks, key=lambda d: d['priority'], reverse=True)
return sorted_tasks
[docs]
def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
"""
Function to run a list of tasks (task dictionary from Alyx query) on a local server
Parameters
----------
subjects_path : str, pathlib.Path
The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'.
tasks_dict : list of dict
A list of tasks to run. Typically the output of `task_queue`.
one : one.api.OneAlyx
An instance of ONE.
dry : bool, default=False
If true, simply prints the full session paths and task names without running the tasks.
count : int, default=5
The maximum number of tasks to run from the tasks_dict list.
time_out : float, optional
The time in seconds to run tasks before exiting. If set this will run tasks until the
timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task.
**kwargs
See ibllib.pipes.tasks.run_alyx_task.
Returns
-------
list of pathlib.Path
A list of datasets registered to Alyx.
"""
if one is None:
one = ONE(cache_rest=None)
tstart = time.time()
c = 0
last_session = None
all_datasets = []
for tdict in tasks_dict:
# if the count is reached or if the time_out has been elapsed, break the loop and return
if c >= count or (time_out and time.time() - tstart > time_out):
break
# reconstruct the session local path. As many jobs belong to the same session
# cache the result
if last_session != tdict['session']:
ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0]
session_path = Path(subjects_path).joinpath(
Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)))
last_session = tdict['session']
if dry:
print(session_path, tdict['name'])
else:
task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, one=one, **kwargs)
if dsets:
all_datasets.extend(dsets)
c += 1
return all_datasets