Source code for ibllib.pipes.video_tasks

import logging
import subprocess
import time
import traceback
from pathlib import Path
from functools import partial

import cv2
import pandas as pd
import numpy as np

from ibllib.qc.dlc import DlcQC
from ibllib.io import ffmpeg, raw_daq_loaders
from ibllib.pipes import base_tasks
from ibllib.io.video import get_video_meta
from ibllib.io.extractors import camera
from ibllib.io.extractors.base import run_extractor_classes
from ibllib.io.extractors.ephys_fpga import get_sync_and_chn_map
from ibllib.qc.camera import run_all_qc as run_camera_qc, CameraQC
from ibllib.misc import check_nvidia_driver
from ibllib.io.video import label_from_path, assert_valid_label
from ibllib.plots.snapshot import ReportSnapshot
from ibllib.plots.figures import dlc_qc_plot
from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter

_logger = logging.getLogger('ibllib')


[docs] class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): """ Task to register raw video data. Builds up list of files to register from list of cameras given in session params file """ priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] } return signature
[docs] def assert_expected_outputs(self, raise_error=True): """ frameData replaces the timestamps file. Therefore if frameData is present, timestamps is optional and vice versa. """ assert self.status == 0 _logger.info('Checking output files') everything_is_fine, files = self.assert_expected(self.output_files) required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files)) if not (everything_is_fine and required): for out in self.outputs: _logger.error(f'{out}') if raise_error: raise FileNotFoundError('Missing outputs after task completion') return everything_is_fine, files
[docs] class VideoCompress(base_tasks.VideoTask): """ Task to compress raw video data from .avi to .mp4 format. """ priority = 90 job_size = 'large' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] } return signature def _run(self): # TODO different compression parameters based on whether it is training or not based on number of cameras? # avi to mp4 compression if self.sync == 'bpod': command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' '-nostats -codec:a copy {file_out}') else: command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' '-loglevel 0 -codec:a copy {file_out}') output_files = ffmpeg.iblrig_video_compression(self.session_path, command) if len(output_files) == 0: _logger.info('No compressed videos found') return return output_files
[docs] class VideoConvert(base_tasks.VideoTask): """ Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation """ priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] } return signature def _run(self): output_files = [] for cam in self.cameras: # rename and register the camlog files camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') camlog_file.replace(new_camlog_file) output_files.append(new_camlog_file) # convert the avi files to mp4 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode == 0: # check the video meta matched and remove the original file meta_avi = get_video_meta(avi_file) _ = meta_avi.pop('size') meta_mp4 = get_video_meta(mp4_file) match = True for key in meta_avi.keys(): if meta_avi[key] != meta_mp4[key]: match = False # if all checks out we can remove the original avi if match: avi_file.unlink() output_files.append(mp4_file) else: _logger.error(f'avi and mp4 meta data do not match for {avi_file}') else: _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') return output_files
[docs] class VideoSyncQcCamlog(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation """ priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), ('*.wiring.json', self.sync_collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature
[docs] def extract_camera(self, save=True): extractor = [partial(camera.CameraTimestampsCamlog, label) for label in self.cameras or []] kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) return run_extractor_classes(extractor, session_path=self.session_path, **kwargs)
[docs] def run_qc(self, camera_data=None, update=True): if camera_data is None: camera_data, _ = self.extract_camera(save=False) qc = run_camera_qc( self.session_path, self.cameras, one=self.one, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync, update=update) return qc
def _run(self, update=True, **kwargs): # Video timestamps extraction data, output_files = self.extract_camera(save=True) # Video QC self.run_qc(data, update=update) return output_files
[docs] class VideoSyncQcBpod(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server """ priority = 40 job_size = 'small' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Task collection (this needs to be specified in the task kwargs) self.collection = self.get_task_collection(kwargs.get('collection', None)) # Task type (protocol) self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) self.extractor = None @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature
[docs] def extract_camera(self, save=True): mp4_files = filter(lambda x: label_from_path(x) in self.cameras or [], self.session_path.joinpath(self.device_collection).rglob('*.mp4')) if self.cameras != ['left']: raise NotImplementedError('Bpod Camera extraction currently only supports a left camera') self.extractor = camera.CameraTimestampsBpod(self.session_path) return self.extractor.extract(video_path=next(mp4_files), save=save, task_collection=self.collection)
[docs] def run_qc(self, camera_data=None, update=True): if self.cameras != ['left']: raise NotImplementedError('Bpod camera currently only supports a left camera') if camera_data is None: camera_data, _ = self.extract_camera(save=False) qc = CameraQC( self.session_path, 'left', sync_type='bpod', sync_collection=self.collection, one=self.one) qc.run(update=update) return qc
def _run(self, update=True, **kwargs): # Video timestamps extraction data, output_files = self.extract_camera(save=True) # Video QC self.run_qc(data, update=update) return output_files
[docs] class VideoSyncQcNidq(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server """ priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), (f'_{self.sync_namespace}_*.wiring.json', self.sync_collection, False), (f'_{self.sync_namespace}_*.meta', self.sync_collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False), ('*experiment.description*', '', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature
[docs] def extract_camera(self, save=True): extractor = [partial(camera.CameraTimestampsFPGA, label) for label in self.cameras or []] kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} if self.sync_namespace == 'timeline': # Load sync from timeline file alf_path = self.session_path / self.sync_collection kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path) else: kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) return run_extractor_classes(extractor, session_path=self.session_path, **kwargs)
[docs] def run_qc(self, camera_data=None, update=True): if camera_data is None: camera_data, _ = self.extract_camera(save=False) qc = run_camera_qc( self.session_path, self.cameras, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync, update=update) return qc
def _run(self, update=True, **kwargs): # Video timestamps extraction data, output_files = self.extract_camera(save=True) # Video QC self.run_qc(data, update=update) return output_files
[docs] class DLC(base_tasks.VideoTask): """ This task relies on a correctly installed dlc environment as per https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# If your environment is set up otherwise, make sure that you set the respective attributes: t = EphysDLC(session_path) t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') """ gpu = 1 cpu = 4 io_charge = 100 level = 2 force = True job_size = 'large' dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras] } return signature def _check_dlcenv(self): """Check that scripts are present, dlcenv can be activated and get iblvideo version""" assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" process = subprocess.Popen( command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable='/bin/bash' ) info, error = process.communicate() if process.returncode != 0: raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") version = info.decode('utf-8').strip().split('\n')[-1] return version @staticmethod def _video_intact(file_mp4): """Checks that the downloaded video can be opened and is not empty""" cap = cv2.VideoCapture(str(file_mp4)) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) intact = True if frame_count > 0 else False cap.release() return intact def _run(self, cams=None, overwrite=False): # Check that the cams are valid for DLC, remove the ones that aren't candidate_cams = cams or self.cameras cams = [] for cam in candidate_cams: try: cams.append(assert_valid_label(cam)) except ValueError: _logger.warning(f'{cam} is not a valid video label, this video will be skipped') # Set up self.session_id = self.one.path2eid(self.session_path) actual_outputs = [] # Loop through cams for cam in cams: # Catch exceptions so that following cameras can still run try: # If all results exist and overwrite is False, skip computation expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) if overwrite is False and expected_outputs_present is True: actual_outputs.extend(expected_outputs) return actual_outputs else: file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) if not file_mp4.exists(): # In this case we set the status to Incomplete. _logger.error(f'No raw video file available for {cam}, skipping.') self.status = -3 continue if not self._video_intact(file_mp4): _logger.error(f'Corrupt raw video file {file_mp4}') self.status = -1 continue # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable self.version = self._check_dlcenv() _logger.info(f'iblvideo version {self.version}') check_nvidia_driver() _logger.info(f'Running DLC on {cam}Camera.') command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" _logger.info(command2run) process = subprocess.Popen( command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable='/bin/bash', ) info, error = process.communicate() # info_str = info.decode("utf-8").strip() # _logger.info(info_str) if process.returncode != 0: error_str = error.decode('utf-8').strip() _logger.error(f'DLC failed for {cam}Camera.\n\n' f'++++++++ Output of subprocess for debugging ++++++++\n\n' f'{error_str}\n' f'++++++++++++++++++++++++++++++++++++++++++++\n') self.status = -1 # We dont' run motion energy, or add any files if dlc failed to run continue dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) actual_outputs.append(dlc_result) _logger.info(f'Computing motion energy for {cam}Camera') command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" _logger.info(command2run) process = subprocess.Popen( command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable='/bin/bash', ) info, error = process.communicate() # info_str = info.decode('utf-8').strip() # _logger.info(info_str) if process.returncode != 0: error_str = error.decode('utf-8').strip() _logger.error(f'Motion energy failed for {cam}Camera.\n\n' f'++++++++ Output of subprocess for debugging ++++++++\n\n' f'{error_str}\n' f'++++++++++++++++++++++++++++++++++++++++++++\n') self.status = -1 continue actual_outputs.append(next(self.session_path.joinpath('alf').glob( f'{cam}Camera.ROIMotionEnergy*.npy'))) actual_outputs.append(next(self.session_path.joinpath('alf').glob( f'{cam}ROIMotionEnergy.position*.npy'))) except Exception: _logger.error(traceback.format_exc()) self.status = -1 continue # If status is Incomplete, check that there is at least one output. # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip if self.status == -3 and len(actual_outputs) == 0: actual_outputs = None self.status = -1 return actual_outputs
[docs] class EphysPostDLC(base_tasks.VideoTask): """ The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. """ io_charge = 90 level = 3 force = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.trials_collection = kwargs.get('trials_collection', 'alf') @property def signature(self): return { 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] + # the following are required for the DLC plot only # they are not strictly required, some plots just might be skipped # In particular the raw videos don't need to be downloaded as they can be streamed [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] + [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', False) for cam in self.cameras] + # The trials table is used in the DLC QC, however this is not an essential dataset [('_ibl_trials.table.pqt', self.trials_collection, False), ('_ibl_wheel.position.npy', self.trials_collection, False), ('_ibl_wheel.timestamps.npy', self.trials_collection, False)], 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] + [('licks.times.npy', 'alf', True)] } def _run(self, overwrite=True, run_qc=True, plot_qc=True): """ Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. :param overwrite: bool, whether to recompute existing output files (default is False). Note that the dlc_qc_plot will be (re-)computed even if overwrite = False :param run_qc: bool, whether to run the DLC QC (default is True) :param plot_qc: book, whether to create the dlc_qc_plot (default is True) """ # Check if output files exist locally exist, output_files = self.assert_expected(self.output_files, silent=True) if exist and not overwrite: _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') else: if exist and overwrite: _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') # Find all available DLC files dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*')) for dlc_file in dlc_files: _logger.debug(dlc_file) output_files = [] combined_licks = [] for dlc_file in dlc_files: # Catch unforeseen exceptions and move on to next cam try: cam = label_from_path(dlc_file) # load dlc trace and camera times dlc = pd.read_parquet(dlc_file) dlc_thresh = likelihood_threshold(dlc, 0.9) # try to load respective camera times try: dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy'))) times = True if dlc_t.shape[0] == 0: _logger.error(f'camera.times empty for {cam} camera. ' f'Computations using camera.times will be skipped') self.status = -1 times = False elif dlc_t.shape[0] < len(dlc_thresh): _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' f'Computations using camera.times will be skipped') self.status = -1 times = 'short' except StopIteration: self.status = -1 times = False _logger.error(f'No camera.times for {cam} camera. ' f'Computations using camera.times will be skipped') # These features are only computed from left and right cam if cam in ('left', 'right'): features = pd.DataFrame() # If camera times are available, get the lick time stamps for combined array if times is True: _logger.info(f'Computing lick times for {cam} camera.') combined_licks.append(get_licks(dlc_thresh, dlc_t)) elif times is False: _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available') elif times == 'short': _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short') # Compute pupil diameter, raw and smoothed _logger.info(f'Computing raw pupil diameter for {cam} camera.') features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) try: _logger.info(f'Computing smooth pupil diameter for {cam} camera.') features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], cam) except Exception: _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.') _logger.error(traceback.format_exc()) features['pupilDiameter_smooth'] = np.nan # Save to parquet features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') features.to_parquet(features_file) output_files.append(features_file) # For all cams, compute DLC QC if times available if run_qc is True and times in [True, 'short']: # Setting download_data to False because at this point the data should be there qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) qc.run(update=True) else: if times is False: _logger.warning(f'Skipping QC for {cam} camera as no camera.times available') if not run_qc: _logger.warning(f'Skipping QC for {cam} camera as run_qc=False') except Exception: _logger.error(traceback.format_exc()) self.status = -1 continue # Combined lick times if len(combined_licks) > 0: lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') np.save(lick_times_file, sorted(np.concatenate(combined_licks))) output_files.append(lick_times_file) else: _logger.warning('No lick times computed for this session.') if plot_qc: _logger.info('Creating DLC QC plot') try: session_id = self.one.path2eid(self.session_path) fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') if not fig_path.parent.exists(): fig_path.parent.mkdir(parents=True, exist_ok=True) fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection, trials_collection=self.trials_collection) fig.savefig(fig_path) fig.clf() snp = ReportSnapshot(self.session_path, session_id, one=self.one) snp.outputs = [fig_path] snp.register_images(widths=['orig'], function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) except Exception: _logger.error('Could not create and/or upload DLC QC Plot') _logger.error(traceback.format_exc()) self.status = -1 return output_files
[docs] class LightningPose(base_tasks.VideoTask): # TODO: make one task per cam? gpu = 1 io_charge = 100 level = 2 force = True job_size = 'large' env = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate') scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose') @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] } return signature @staticmethod def _video_intact(file_mp4): """Checks that the downloaded video can be opened and is not empty""" cap = cv2.VideoCapture(str(file_mp4)) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) intact = True if frame_count > 0 else False cap.release() return intact def _check_env(self): """Check that scripts are present, env can be activated and get iblvideo version""" assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \ f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}' assert self.env.exists(), f"environment does not exist in assumed location {self.env}" command2run = f"source {self.env}; python -c 'import iblvideo; print(iblvideo.__version__)'" process = subprocess.Popen( command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable="/bin/bash" ) info, error = process.communicate() if process.returncode != 0: raise AssertionError(f"environment check failed\n{error.decode('utf-8')}") version = info.decode("utf-8").strip().split('\n')[-1] return version def _run(self, overwrite=True, **kwargs): # Gather video files self.session_path = Path(self.session_path) mp4_files = [ self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists() ] labels = [label_from_path(x) for x in mp4_files] _logger.info(f'Running on {labels} videos') # Check the environment self.version = self._check_env() _logger.info(f'iblvideo version {self.version}') # If all results exist and overwrite is False, skip computation expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) if overwrite is False and expected_outputs_present is True: actual_outputs = expected_outputs return actual_outputs # Else, loop over videos actual_outputs = [] for label, mp4_file in zip(labels, mp4_files): # Catch exceptions so that the other cams can still run but set status to Errored try: # Check that the GPU is (still) accessible check_nvidia_driver() # Check that the video can be loaded if not self._video_intact(mp4_file): _logger.error(f"Corrupt raw video file {mp4_file}") self.status = -1 continue t0 = time.time() _logger.info(f'Running Lightning Pose on {label}Camera.') command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.env)} {mp4_file} {overwrite}" _logger.info(command2run) process = subprocess.Popen( command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, executable="/bin/bash", ) info, error = process.communicate() if process.returncode != 0: error_str = error.decode("utf-8").strip() _logger.error(f'Lightning pose failed for {label}Camera.\n\n' f'++++++++ Output of subprocess for debugging ++++++++\n\n' f'{error_str}\n' f'++++++++++++++++++++++++++++++++++++++++++++\n') self.status = -1 continue else: _logger.info(f'{label} camera took {(time.time() - t0)} seconds') result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt')) actual_outputs.append(result) except BaseException: _logger.error(traceback.format_exc()) self.status = -1 continue return actual_outputs