Source code for docarray.document.mixins.video
from typing import Union, BinaryIO, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from ...typing import T
[docs]class VideoDataMixin:
"""Provide helper functions for :class:`Document` to support video data."""
[docs] def load_uri_to_video_tensor(self: 'T', only_keyframes: bool = False) -> 'T':
"""Convert a :attr:`.uri` to a video ndarray :attr:`.tensor`.
:param only_keyframes: only keep the keyframes in the video
:return: Document itself after processed
"""
import av
with av.open(self.uri) as container:
if only_keyframes:
stream = container.streams.video[0]
stream.codec_context.skip_frame = 'NONKEY'
frames = []
for frame in container.decode(video=0):
img = frame.to_image()
frames.append(np.asarray(img))
self.tensor = np.moveaxis(np.stack(frames), 1, 2)
return self
[docs] def save_video_tensor_to_file(
self: 'T', file: Union[str, BinaryIO], frame_rate: int = 30, codec: str = 'h264'
) -> 'T':
"""Save :attr:`.tensor` as a video mp4/h264 file.
:param file: The file to open, which can be either a string or a file-like object.
:param frame_rate: frames per second
:param codec: the name of a decoder/encoder
:return: itself after processed
"""
if (
self.tensor.ndim != 4
or self.tensor.shape[-1] != 3
or self.tensor.dtype != np.uint8
):
raise ValueError(
f'expects `.tensor` with dtype=uint8 and ndim=4 and the last dimension is 3, '
f'but receiving {self.tensor.shape} in {self.tensor.dtype}'
)
video_tensor = np.moveaxis(np.clip(self.tensor, 0, 255), 1, 2)
import av
with av.open(file, mode='w') as container:
stream = container.add_stream(codec, rate=frame_rate)
stream.width = self.tensor.shape[1]
stream.height = self.tensor.shape[2]
stream.pix_fmt = 'yuv420p'
for b in video_tensor:
frame = av.VideoFrame.from_ndarray(b, format='rgb24')
for packet in stream.encode(frame):
container.mux(packet)
for packet in stream.encode():
container.mux(packet)
return self