Source code for sportslabkit.camera.videoreader

import threading
from queue import Queue
from time import sleep

import cv2
import numpy as np


[docs]class VideoReader: """Pythonic wrapper around OpenCV's VideoCapture(). This class provides a convenient way to access and manipulate video files using OpenCV's VideoCapture object. It implements several convenient methods and properties to make it easy to work with video files, including slicing and indexing into the video file, iteration through the video frames, and more. Args: filename (str): The path to the video file. threaded (bool): Whether to run the video reading in a separate thread. queue_size (int): The size of the queue for storing video frames. Properties: frame_width (int): The width of the video frames. frame_height (int): The height of the video frames. frame_channels (int): The number of channels in the video frames. frame_rate (float): The frame rate of the video. frame_shape (tuple): The shape of the video frames (height, width, channels). number_of_frames (int): The total number of frames in the video. fourcc (int): The fourcc code of the video. current_frame_pos (int): The current position of the video frame. Methods: read(frame_number=None): Read the next frame or a specified frame from the video. close(): Close the video file. """ def __init__(self, filename: str, threaded=False, queue_size=10): """Open video in filename.""" self._filename = filename self._vc = cv2.VideoCapture(str(self._filename)) self.threaded = threaded self.stopped = False self.q: Queue = Queue(maxsize=queue_size) if threaded: t = threading.Thread(target=self.read_thread) t.daemon = True t.start() def __del__(self): try: self._vr.release() except AttributeError: # if file does not exist this will be raised since _vr does not exist pass def __len__(self): """Length is number of frames.""" return self.number_of_frames def __getitem__(self, index): # numpy-like slice imaging into arbitrary dims of the video # ugly.hacky but works frames = None if isinstance(index, int): # single frame ret, frames = self.read(index) frames = cv2.cvtColor(frames, cv2.COLOR_BGR2RGB) elif isinstance(index, slice): # slice of frames frames = np.stack([self[ii] for ii in range(*index.indices(len(self)))]) elif isinstance(index, range): # range of frames frames = np.stack([self[ii] for ii in index]) elif isinstance(index, tuple): # unpack tuple of indices if isinstance(index[0], slice): indices = range(*index[0].indices(len(self))) elif isinstance(index[0], (np.integer, int)): indices = int(index[0]) else: indices = None if indices is not None: frames = self[indices] # index into pixels and channels for cnt, idx in enumerate(index[1:]): if isinstance(idx, slice): ix = range(*idx.indices(self.shape[cnt + 1])) elif isinstance(idx, int): ix = range(idx - 1, idx) else: continue if frames.ndim == 4: # ugly indexing from the back (-1,-2 etc) cnt = cnt + 1 frames = np.take(frames, ix, axis=cnt) if self.remove_leading_singleton and frames is not None: if frames.shape[0] == 1: frames = frames[0] return frames def __repr__(self): return f"{self._filename} with {len(self)} frames of size {self.frame_shape} at {self.frame_rate:1.2f} fps" def __iter__(self): return self def __next__(self): ret, frame = self.read() if ret: return frame raise StopIteration def __enter__(self): return self def __exit__(self): """Release video file.""" del self
[docs] def close(self): """Close video file.""" self._vc.release()
[docs] def read_thread(self): while not self.stopped: if not self.q.full(): ret, frame = self._vc.read() if not ret: self.stopped = True self.q.put(frame)
[docs] def read(self, frame_number=None): """Read next frame or frame specified by `frame_number`.""" if not self.stopped and self.threaded: sleep(10**-6) # wait for frame to be read? frame = self.q.get(0.1) return True, frame is_current_frame = frame_number == self.current_frame_pos # no need to seek if we are at the right position # - greatly speeds up reading sunbsequent frames if frame_number is not None and not is_current_frame: self._seek(frame_number) ret, frame = self._vc.read() return ret, frame
def _reset(self): """Re-initialize object.""" self.__init__(self._filename) def _seek(self, frame_number): """Go to frame.""" self._vc.set(cv2.CAP_PROP_POS_FRAMES, frame_number) @property def number_of_frames(self): return int(self._vc.get(cv2.CAP_PROP_FRAME_COUNT)) @property def frame_rate(self): return self._vc.get(cv2.CAP_PROP_FPS) @property def frame_height(self): return int(self._vc.get(cv2.CAP_PROP_FRAME_HEIGHT)) @property def frame_width(self): return int(self._vc.get(cv2.CAP_PROP_FRAME_WIDTH)) @property def frame_channels(self): n_channels = int(self._vc.get(cv2.CAP_PROP_CHANNEL)) if n_channels == 0: # if channel is 0, backend is not supported self._reset() n_channels = self.read(0)[1].shape[-1] return n_channels @property def fourcc(self): return int(self._vc.get(cv2.CAP_PROP_FOURCC)) @property def frame_format(self): return int(self._vc.get(cv2.CAP_PROP_FORMAT)) @property def current_frame_pos(self): return int(self._vc.get(cv2.CAP_PROP_POS_FRAMES)) @property def frame_shape(self): return (self.frame_height, self.frame_width, self.frame_channels) @property def shape(self): return ( self.number_of_frames, self.frame_height, self.frame_width, self.frame_channels, )