"""Functions for fetching video frames, meta data and file locations"""
import sys
import re
from datetime import timedelta
from pathlib import Path
import cv2
import numpy as np
from iblutil.util import Bunch
from one.api import ONE
from one import params
VIDEO_LABELS = ('left', 'right', 'body', 'belly')
class VideoStreamer:
Provides a wrapper to stream a video from a password protected HTTP server using opencv
def __init__(self, url_vid):
TODO Allow auth as input
:param url_vid: full url of the video or dataset dictionary as output by alyx rest datasets
:returns cv2.VideoCapture object
# pop the data url from the dataset record if the input is a dictionary
if isinstance(url_vid, dict):
url_vid = next(fr['data_url'] for fr in url_vid['file_records'] if fr['data_url'])
self.url = url_vid
self._par = params.get(silent=True)
self.cap = cv2.VideoCapture(self._url)
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
def _url(self):
username = self._par.HTTP_DATA_SERVER_LOGIN
password = self._par.HTTP_DATA_SERVER_PWD
return re.sub(r'(^https?://)', r'\1' + f'{username}:{password}@', self.url)
def get_frame(self, frame_index):
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
return self.cap.read()
def get_video_frame(video_path, frame_number):
Obtain numpy array corresponding to a particular video frame in video_path
:param video_path: local path to mp4 file
:param frame_number: video frame to be returned
:return: numpy array corresponding to frame of interest. Dimensions are (w, h, 3)
is_url = isinstance(video_path, str) and video_path.startswith('http')
cap = VideoStreamer(video_path).cap if is_url else cv2.VideoCapture(str(video_path))
# 0-based index of the frame to be decoded/captured next.
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame_image = cap.read()
return frame_image
def get_video_frames_preload(vid, frame_numbers=None, mask=Ellipsis, as_list=False,
func=lambda x: x, quiet=False):
Obtain numpy array corresponding to a particular video frame in video.
Fetching and returning a list is about 33% faster but may be less memory controlled. NB: Any
gain in speed will be lost if subsequently converted to array.
:param vid: URL or local path to mp4 file or cv2.VideoCapture instance.
:param frame_numbers: video frames to be returned. If None, return all frames.
:param mask: a logical mask or slice to apply to frames
:param as_list: if true the frames are returned as a list, this is faster but may be less
memory efficient
:param func: Function to be applied to each frame. Applied after masking if applicable.
:param quiet: if true, suppress frame loading progress output.
:return: numpy array corresponding to frame of interest, or list if as_list is True.
Default dimensions are (n, w, h, 3) where n = len(frame_numbers)
Example - Load first 1000 frames, keeping only the first colour channel:
frames = get_video_frames_preload(vid, range(1000), mask=np.s_[:, :, 0])
is_cap = not isinstance(vid, (str, Path))
if is_cap:
cap = vid
is_url = isinstance(vid, str) and vid.startswith('http')
cap = VideoStreamer(vid).cap if is_url else cv2.VideoCapture(str(vid))
assert cap.isOpened(), 'Failed to open video'
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_numbers = frame_numbers if frame_numbers is not None else range(frame_count)
# Setting the index is extremely slow; determine where frame index must be set
# The first index is always explicitly set.
to_set = np.insert(np.diff(frame_numbers), 0, 0) != 1
if as_list:
frame_images = [None] * len(frame_numbers)
ret, frame = cap.read()
frame_images = np.zeros((len(frame_numbers), *func(frame[mask or ...]).shape), np.uint8)
for ii, i in enumerate(frame_numbers):
if not quiet:
sys.stdout.write(f'\rloading frame {ii}/{len(frame_numbers)}')
if to_set[ii]:
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
frame_images[ii] = func(frame[mask or ...])
print(f'failed to read frame #{i}')
if not is_cap:
if not quiet:
sys.stdout.write('\x1b[2K\r') # Erase current line in stdout
return frame_images
def url_from_eid(eid, label=None, one=None):
"""Return the video URL(s) for a given eid.
eid : UUID, str
The session ID.
label : str, tuple of str
The video label (e.g. 'body') or a tuple thereof.
one : one.api.One
An instance of ONE.
str, dict of str
The URL string if the label is a string, otherwise a dict of urls with labels as keys.
Video label is unreckognized. See `VIDEO_LABELS` for valid labels.
valid_labels = VIDEO_LABELS
if not (label is None or np.isin(label, valid_labels).all()):
raise ValueError('labels must be one of ("%s")' % '", "'.join(valid_labels))
one = one or ONE()
session_path = one.eid2path(one.to_eid(eid))
# Filter the video files
def match(dataset):
matched = re.match(r'(?:_iblrig_)([a-z]+)(?=Camera.raw.mp4$)', dataset.rsplit('/')[-1])
return matched and matched.group(1) in (label or valid_labels)
if one.mode != 'remote':
datasets = one.list_datasets(eid, details=False)
datasets = [ds for ds in datasets if match(ds)]
urls = [one.path2url(session_path / ds) for ds in datasets]
datasets = one.get_details(eid, full=True)['data_dataset_session_related']
urls = [ds['data_url'] for ds in datasets
if ds['dataset_type'] == '_iblrig_Camera.raw' and match(ds['name'])]
# If one label specified, return the url, otherwise return a dict
if isinstance(label, str):
return urls[0]
urls_dict = {label_from_path(url): url for url in urls}
return {**dict.fromkeys(label), **urls_dict} if label else urls_dict
def label_from_path(video_name):
Return the video label, e.g.. 'left', 'right' or 'body'
:param video_name: A file path, URL or file name for the video
:return: The string label or None if the video doesn't match
result = re.search(r'(?<=_)([a-z]+)(?=Camera)', str(video_name))
return result.group() if result else None
def assert_valid_label(label):
Raises a value error is the provided label is not supported.
:param label: A video label to verify
:return: the label in lowercase
if not isinstance(label, str):
return tuple(map(assert_valid_label, label))
except AttributeError:
raise ValueError('label must be string or iterable of strings')
if label.lower() not in VIDEO_LABELS:
raise ValueError(f"camera must be one of ({', '.join(VIDEO_LABELS)})")
return label.lower()