为了分割视频和数据,我们可以将视频流映射到stderr
管道,将KLV数据流映射到stdout
管道.
在my following answer中,同样的技术用于分离视频和音频.
当每个视频帧都有专用KLV数据(按顺序同步)时,视频帧和相应数据之间的精确同步相对简单.
The Day Flight.mpg sample file has much fewer data packets than frames, and accurate synchronization is not possible using the suggested solution (I don't think it is possible using the pipes approach).
We may still apply some coarse synchronization - assume the data and the frame are read in time proximity.
分割视频和数据的建议方法:
-----------
--->| Raw Video | ---> stderr (pipe)
----------- ------------- | -----------
| Input | | FFmpeg | |
| Video with| ---> | sub-process | ---
| Data | | | |
----------- ------------- | -----------
--->| KLV data | ---> stdout (pipe)
-----------
视频和数据在两个单独的线程中读取:
- 视频阅读器线程-读取原始视频帧(BGR格式).
- 数据读取器线程-读取和解析KLV数据.
根据Wikipedia,KLV格式没有得到很好的定义:
Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.
在示例视频中,密钥长度为16字节,但不能保证...
Reading the KLV data from stdout pipe:
When reading data from a pipe (in real-time like manner), we need to know the expected number of bytes to read.
That forces us to do partial parsing of the KLV data:
- 读取"密钥"(假设长度为16字节).
- 阅读"长度"——对于"BER长度"标准,存在一些挑战.
- 读取"数据"(要读取的大小由长度定义).
读取密钥、长度和数据后,我们有一个"KLV数据包",我们可以发送到KLV data parser.
下面是一个使用Day Flight.mpg
个示例输入文件的代码示例:
#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO
# Video reader thread.
def video_reader(pipe):
cols, rows = 1280, 720 # Assume we know frame size is 1280x720
counter = 0
while True:
raw_image = pipe.read(cols*rows*3) # Read raw video frame
# Break the loop when length is too small
if len(raw_image) < cols*rows*3:
break
if (counter % 60) == 0:
# Show video frame evey 60 frames
image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
cv2.imshow('Video', image) # Show video image for testing
cv2.waitKey(1)
counter += 1
# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
"""Return integer given bytes."""
return int.from_bytes(bytes(value), byteorder='big', signed=signed)
# Data reader thread (read KLV data).
def data_reader(pipe):
key_length = 16 # Assume key length is 16 bytes.
f = open('data.bin', 'wb') # For testing - store the KLV data to data.bin (binary file)
while True:
# https://en.wikipedia.org/wiki/KLV
# The first few bytes are the Key, much like a key in a standard hash table data structure.
# Keys can be 1, 2, 4, or 16 bytes in length.
# Presumably in a separate specification document you would agree on a key length for a given application.
key = pipe.read(key_length) # Read the key
if len(key) < key_length:
break # Break the loop when length is too small
f.write(key) # Write data to binary file for testing
# https://github.com/paretech/klvdata/tree/master/klvdata
# Length field
len_byte = pipe.read(1)
if len(len_byte) < 1:
break # Break the loop when length is too small
f.write(len_byte) # Write data to binary file for testing
byte_length = bytes_to_int(len_byte)
# https://github.com/paretech/klvdata/tree/master/klvdata
if byte_length < 128:
# BER Short Form
length = byte_length
ber_len_bytes = b''
else:
# BER Long Form
ber_len = byte_length - 128
ber_len_bytes = pipe.read(ber_len)
if len(ber_len_bytes) < ber_len:
break # Break the loop when length is too small
f.write(ber_len_bytes) # Write ber_len_bytes to binary file for testing
length = bytes_to_int(ber_len_bytes)
# Read the value (length bytes)
value = pipe.read(length)
if len(value) < length:
break # Break the loop when length is too small
f.write(value) # Write data to binary file for testing
klv_data = key + len_byte + ber_len_bytes + value # Concatenate key length and data
klv_data_as_bytes_io = BytesIO(klv_data) # Wrap klv_data with BytesIO (before parsing)
# Parse the KLV data
for packet in klvdata.StreamParser(klv_data_as_bytes_io):
metadata = packet.MetadataList()
print(metadata)
print() # New line
# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet ' # Set loglevel to quiet for disabling the prints ot stderr
'-i "Day Flight.mpg" ' # Input video "Day Flight.mpg"
'-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
'-map 0:d -c copy -copy_unknown -f:d data pipe:1 ' # Copy the data without ddecoding.
'-report'), # Create a log file (because we can't the statuses that are usually printed to stderr).
stdout=sp.PIPE, stderr=sp.PIPE)
# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()
# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()
# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()
The above code saves the data to data.bin
(for testing).
data.bin
may be used for consistency check.
Execute FFmpeg CLI for extracting the data stream:
ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin
验证raw.bin
和data.bin
个文件是否相等.