"""Functions for analyzing video frame data"""
import numpy as np
import cv2
[docs]
def frame_diff(frame1, frame2):
"""
Outputs pythagorean distance between two frames.
:param frame1: A numpy array of pixels with a shape of either (m, n, 3) or (m, n)
:param frame2: A numpy array of pixels with a shape of either (m, n, 3) or (m, n)
:return: An array with a shape equal to the input frames
"""
if frame1.shape != frame2.shape:
raise ValueError('Frames must have the same shape')
diff32 = np.float32(frame1) - np.float32(frame2)
if frame1.ndim == 3:
norm32 = np.float32(
np.sqrt(diff32[:, :, 0] ** 2 + diff32[:, :, 1] ** 2 + diff32[:, :, 2] ** 2) /
np.sqrt(255 ** 2 * 3)
)
else:
norm32 = np.float32(np.sqrt(diff32 ** 2 * 3) / np.sqrt(255 ** 2 * 3))
return np.uint8(np.round(norm32 * 255))
[docs]
def frame_diffs(frames, diff=1):
"""
Return the difference between frames.
May also take difference between more than 1 frames. Values are normalized between 0-255.
:param frames: Array or list of frames, where each frame is either (y, x) or (y, x, 3).
:param diff: Take difference between frames N and frames N + diff.
:return: uint8 array with shape (n-diff, y, x).
"""
frames = np.array(frames, dtype=np.float32)
if frames.shape[0] < diff:
raise ValueError('Difference must be less than number of frames')
diff32 = frames[diff:] - frames[:-diff]
# Normalize
if frames.ndim == 4:
norm32 = np.sqrt((diff32 ** 2).sum(axis=3)) / np.sqrt(255 ** 2 * 3).astype(np.float32)
else:
norm32 = np.sqrt(diff32 ** 2 * 3) / np.sqrt(255 ** 2 * 3).astype(np.float32)
return np.uint8(norm32 * 255)
[docs]
def motion_energy(frames, diff=2, kernel=None, normalize=True):
"""
Returns a min-max normalized vector of motion energy between frames.
:param frames: A list of ndarray of frames.
:param diff: Take difference between frames N and frames N + diff.
:param kernel: An optional Gaussian smoothing to apply with a given kernel size.
:param normalize: If True, motion energy is min-max normalized
:return df_: A vector of length n frames - diff, normalized between 0 and 1.
:return stDev: The standard deviation between the frames (not normalized).
Example 1 - Calculate normalized difference between consecutive frames
df, std = motion_energy(frames, diff=1)
Example 2 - Calculate smoothed difference between every 2nd frame
df, _ = motion_energy(frames, kernel=(9, 9))
"""
df = frame_diffs(frames, diff)
# Smooth with a Gaussian blur TODO Use median blur instead
if kernel is not None:
df = cv2.GaussianBlur(df, (9, 9), 0)
stDev = np.array([cv2.meanStdDev(x)[1] for x in df]).squeeze()
# Feature scaling
df_ = df.sum(axis=(1, 2))
if normalize:
df_ = (df_ - df_.min()) / (df_.max() - df_.min())
return df_, stDev