"""Lab server pipeline construction and task runner.This is the module called by the job services on the lab servers. Seeiblscripts/deploy/serverpc/crons for the service scripts that employ this module."""importloggingimporttimefromdatetimeimportdatetimefrompathlibimportPathimportreimportsubprocessimportsysimporttracebackimportimportlibimportimportlib.metadatafromone.apiimportONEfromone.webclientimportAlyxClientfromone.remote.globusimportget_lab_from_endpoint_id,get_local_endpoint_idfromone.alf.specimportis_session_pathfromone.alf.pathimportsession_path_partsfromibllibimport__version__asibllib_versionfromibllib.pipesimporttasksfromibllib.timeimportdate2isostrfromibllib.oneibl.registrationimportIBLRegistrationClientfromibllib.oneibl.data_handlersimportget_local_data_repositoryfromibllib.io.session_paramsimportread_paramsfromibllib.pipes.dynamic_pipelineimportmake_pipeline,acquisition_description_legacy_session_logger=logging.getLogger(__name__)LARGE_TASKS=['EphysVideoCompress','TrainingVideoCompress','SpikeSorting','EphysDLC','MesoscopePreprocess']def_run_command(cmd):process=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)info,error=process.communicate()ifprocess.returncode!=0:returnNoneelse:returninfo.decode('utf-8').strip()def_get_volume_usage(vol,label=''):cmd=f'df {vol}'res=_run_command(cmd)# size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']size_list=re.split(' +',res.split('\n')[-1])fac=1024**2d={'total':int(size_list[1])/fac,'used':int(size_list[2])/fac,'available':int(size_list[3])/fac,'volume':size_list[5]}return{f"{label}_{k}":d[k]forkind}
[docs]defreport_health(alyx):""" Get a few indicators and label the json field of the corresponding lab with them. """status={'python_version':sys.version,'ibllib_version':ibllib_version,'phylib_version':importlib.metadata.version('phylib'),'local_time':date2isostr(datetime.now())}status.update(_get_volume_usage('/mnt/s0/Data','raid'))status.update(_get_volume_usage('/','system'))data_repos=alyx.rest('data-repository','list',globus_endpoint_id=get_local_endpoint_id())fordrindata_repos:alyx.json_field_update(endpoint='data-repository',uuid=dr['name'],field_name='json',data=status)
[docs]defjob_creator(root_path,one=None,dry=False,rerun=False):""" Create new sessions and pipelines. Server function that will look for 'raw_session.flag' files and for each: 1) create the session on Alyx 2) create the tasks to be run on Alyx For legacy sessions the raw data are registered separately, instead of within a pipeline task. Parameters ---------- root_path : str, pathlib.Path Main path containing sessions or a session path. one : one.api.OneAlyx An ONE instance for registering the session(s). dry : bool If true, simply log the session_path(s) found, without registering anything. rerun : bool If true and session pipeline tasks already exist, set them all to waiting. Returns ------- list of ibllib.pipes.tasks.Pipeline The pipelines created. list of dicts A list of any datasets registered (only for legacy sessions) """_logger.info('Start looking for new sessions...')ifnotone:one=ONE(cache_rest=None)rc=IBLRegistrationClient(one=one)flag_files=Path(root_path).glob('*/????-??-??/*/raw_session.flag')flag_files=filter(lambdax:is_session_path(x.parent),flag_files)pipes=[]all_datasets=[]forflag_fileinflag_files:session_path=flag_file.parentifsession_path_parts(session_path)[1]in('test','test_subject'):_logger.debug('skipping test session %s',session_path)continue_logger.info(f'creating session for {session_path}')ifdry:continuetry:# if the subject doesn't exist in the database, skiprc.register_session(session_path,file_list=False)# NB: all sessions now extracted using dynamic pipelineifread_params(session_path)isNone:# Create legacy experiment description fileacquisition_description_legacy_session(session_path,save=True)pipe=make_pipeline(session_path,one=one)ifrerun:rerun__status__in='__all__'else:rerun__status__in=['Waiting']pipe.create_alyx_tasks(rerun__status__in=rerun__status__in)flag_file.unlink()ifpipeisnotNone:pipes.append(pipe)exceptException:_logger.error('Failed to register session %s:\n%s',session_path.relative_to(root_path),traceback.format_exc())continuereturnpipes,all_datasets
[docs]deftask_queue(mode='all',lab=None,alyx=None,env=(None,)):""" Query waiting jobs from the specified Lab Parameters ---------- mode : {'all', 'small', 'large'} Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs. lab : str Lab name as per Alyx, otherwise try to infer from local Globus install. alyx : one.webclient.AlyxClient An Alyx instance. env : list One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`. Returns ------- list of dict A list of Alyx tasks associated with `lab` that have a 'Waiting' status. """defpredicate(task):classe=tasks.str2class(task['executable'])return(mode=='all'orclasse.job_size==mode)andclasse.envinenvalyx=alyxorAlyxClient(cache_rest=None)iflabisNone:_logger.debug('Trying to infer lab from globus installation')lab=get_lab_from_endpoint_id(alyx=alyx)iflabisNone:_logger.error('No lab provided or found')return# if the lab is none, this will return empty tasks each timedata_repo=get_local_data_repository(alyx)# Filter for taskswaiting_tasks=alyx.rest('tasks','list',status='Waiting',django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}',no_cache=True)# Filter tasks by sizefiltered_tasks=filter(predicate,waiting_tasks)# Order tasks by prioritysorted_tasks=sorted(filtered_tasks,key=lambdad:d['priority'],reverse=True)returnsorted_tasks
[docs]deftasks_runner(subjects_path,tasks_dict,one=None,dry=False,count=5,time_out=None,**kwargs):""" Function to run a list of tasks (task dictionary from Alyx query) on a local server Parameters ---------- subjects_path : str, pathlib.Path The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'. tasks_dict : list of dict A list of tasks to run. Typically the output of `task_queue`. one : one.api.OneAlyx An instance of ONE. dry : bool, default=False If true, simply prints the full session paths and task names without running the tasks. count : int, default=5 The maximum number of tasks to run from the tasks_dict list. time_out : float, optional The time in seconds to run tasks before exiting. If set this will run tasks until the timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task. kwargs See ibllib.pipes.tasks.run_alyx_task. Returns ------- list of pathlib.Path A list of datasets registered to Alyx. """ifoneisNone:one=ONE(cache_rest=None)tstart=time.time()c=0last_session=Noneall_datasets=[]fortdictintasks_dict:# if the count is reached or if the time_out has been elapsed, break the loop and returnifc>=countor(time_outandtime.time()-tstart>time_out):break# reconstruct the session local path. As many jobs belong to the same session# cache the resultiflast_session!=tdict['session']:ses=one.alyx.rest('sessions','list',django=f"pk,{tdict['session']}")[0]session_path=Path(subjects_path).joinpath(Path(ses['subject'],ses['start_time'][:10],str(ses['number']).zfill(3)))last_session=tdict['session']ifdry:print(session_path,tdict['name'])else:task,dsets=tasks.run_alyx_task(tdict=tdict,session_path=session_path,one=one,**kwargs)ifdsets:all_datasets.extend(dsets)c+=1returnall_datasets