我需要在OpenCV/Python中实时处理视频流和klvdata流.我使用FFMPEG读取文件或流,因为OpenCV不保留klvdata.我通过子流程模块将数据传递给OpenCV.

我的问题是,我不知道如何将视频和klvdata同时映射到同一个子流程管道?

我的代码:

#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy

command = ['ffmpeg',
    '-i', 'DayFlight.mpg',
    '-map', '0:0',
    '-map', '0:d',        
    '-pix_fmt', 'bgr24',
    '-c:v', 'rawvideo',      
    '-an','-sn',              
    '-f', 'image2pipe', '-',
    '-c:d', 'copy',
    '-f','data',
    ]

pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)

while True:
   raw_image = pipe.stdout.read(1280*720*3)
   image =  numpy.fromstring(raw_image, dtype='uint8')
   image = image.reshape((720,1280,3))          
   if image is not None:
      cv2.imshow('Video', image)
   if cv2.waitKey(1) & 0xFF == ord('q'):
      break
   for packet in klvdata.StreamParser(pipe.stdout): 
      metadata = packet.MetadataList()
      print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()

产生以下错误:

Traceback (most recent call last):
  File "test_cv.py", line 32, in <module>
    metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'

非常感谢您的帮助.

推荐答案

为了分割视频和数据,我们可以将视频流映射到stderr管道,将KLV数据流映射到stdout管道.

在my following answer中,同样的技术用于分离视频和音频.

当每个视频帧都有专用KLV数据(按顺序同步)时,视频帧和相应数据之间的精确同步相对简单.

The Day Flight.mpg sample file has much fewer data packets than frames, and accurate synchronization is not possible using the suggested solution (I don't think it is possible using the pipes approach).
We may still apply some coarse synchronization - assume the data and the frame are read in time proximity.

分割视频和数据的建议方法:

                                            -----------
                                       --->| Raw Video | ---> stderr (pipe)
 -----------        -------------     |     -----------    
| Input     |      | FFmpeg      |    |
| Video with| ---> | sub-process | ---      
| Data      |      |             |    |    
 -----------        -------------     |     -----------
                                       --->| KLV data  | ---> stdout (pipe)
                                            -----------

视频和数据在两个单独的线程中读取:

  • 视频阅读器线程-读取原始视频帧(BGR格式).
  • 数据读取器线程-读取和解析KLV数据.

根据Wikipedia,KLV格式没有得到很好的定义:

Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.

在示例视频中,密钥长度为16字节,但不能保证...


Reading the KLV data from stdout pipe:
When reading data from a pipe (in real-time like manner), we need to know the expected number of bytes to read.
That forces us to do partial parsing of the KLV data:

  • 读取"密钥"(假设长度为16字节).
  • 阅读"长度"——对于"BER长度"标准,存在一些挑战.
  • 读取"数据"(要读取的大小由长度定义).

读取密钥、长度和数据后,我们有一个"KLV数据包",我们可以发送到KLV data parser.


下面是一个使用Day Flight.mpg个示例输入文件的代码示例:

#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO

# Video reader thread.
def video_reader(pipe):
    cols, rows = 1280, 720  # Assume we know frame size is 1280x720

    counter = 0
    while True:
        raw_image = pipe.read(cols*rows*3)  # Read raw video frame

        # Break the loop when length is too small
        if len(raw_image) < cols*rows*3:
            break

        if (counter % 60) == 0:
            # Show video frame evey 60 frames
            image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
            cv2.imshow('Video', image) # Show video image for testing
            cv2.waitKey(1)
        counter += 1



# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
    """Return integer given bytes."""
    return int.from_bytes(bytes(value), byteorder='big', signed=signed)


# Data reader thread (read KLV data).
def data_reader(pipe):
    key_length = 16  # Assume key length is 16 bytes.

    f = open('data.bin', 'wb')  # For testing - store the KLV data to data.bin (binary file)

    while True:
        # https://en.wikipedia.org/wiki/KLV
        # The first few bytes are the Key, much like a key in a standard hash table data structure.
        # Keys can be 1, 2, 4, or 16 bytes in length.
        # Presumably in a separate specification document you would agree on a key length for a given application.
        key = pipe.read(key_length)  # Read the key
        
        if len(key) < key_length:
            break  # Break the loop when length is too small
        f.write(key)  # Write data to binary file for testing

        # https://github.com/paretech/klvdata/tree/master/klvdata
        # Length field
        len_byte = pipe.read(1)

        if len(len_byte) < 1:
            break  # Break the loop when length is too small
        f.write(len_byte)  # Write data to binary file for testing

        byte_length = bytes_to_int(len_byte)

        # https://github.com/paretech/klvdata/tree/master/klvdata                                                
        if byte_length < 128:
            # BER Short Form
            length = byte_length
            ber_len_bytes = b''
        else:
            # BER Long Form
            ber_len = byte_length - 128
            ber_len_bytes = pipe.read(ber_len)

            if len(ber_len_bytes) < ber_len:
                break  # Break the loop when length is too small
            f.write(ber_len_bytes)  # Write ber_len_bytes to binary file for testing

            length = bytes_to_int(ber_len_bytes)

        # Read the value (length bytes)
        value = pipe.read(length)
        if len(value) < length:
            break  # Break the loop when length is too small
        f.write(value)  # Write data to binary file for testing

        klv_data = key + len_byte + ber_len_bytes + value  # Concatenate key length and data
        klv_data_as_bytes_io = BytesIO(klv_data)  # Wrap klv_data with BytesIO (before parsing)

        # Parse the KLV data
        for packet in klvdata.StreamParser(klv_data_as_bytes_io): 
            metadata = packet.MetadataList()
            print(metadata)
            print() # New line

# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet '                        # Set loglevel to quiet for disabling the prints ot stderr
                               '-i "Day Flight.mpg" '                                        # Input video "Day Flight.mpg"
                               '-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
                               '-map 0:d -c copy -copy_unknown -f:d data pipe:1 '            # Copy the data without ddecoding.
                               '-report'),                                                   # Create a log file (because we can't the statuses that are usually printed to stderr).
                                stdout=sp.PIPE, stderr=sp.PIPE)


# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()

# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()


# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()

The above code saves the data to data.bin (for testing).
data.bin may be used for consistency check.
Execute FFmpeg CLI for extracting the data stream:

ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin

验证raw.bindata.bin个文件是否相等.

Python相关问答推荐

如何在PIL、Python中对图像应用彩色面膜?

按照行主要蛇扫描顺序对点列表进行排序

通过交换 node 对链接列表进行 Select 排序

Chatgpt API不断返回错误:404未能从API获取响应

如何在图片中找到这个化学测试条?OpenCV精明边缘检测不会绘制边界框

如何比较numPy数组中的两个图像以获取它们不同的像素

在Wayland上使用setCellWidget时,try 编辑QTable Widget中的单元格时,PyQt 6崩溃

当从Docker的--env-file参数读取Python中的环境变量时,每个\n都会添加一个\'.如何没有额外的?

Python+线程\TrocessPoolExecutor

pandas:排序多级列

pandas在第1列的id,第2列的标题,第3列的值,第3列的值?

Pandas Loc Select 到NaN和值列表

基于行条件计算(pandas)

在代码执行后关闭ChromeDriver窗口

为什么'if x is None:pass'比'x is None'单独使用更快?

为什么调用函数的值和次数不同,递归在代码中是如何工作的?

如何使用正则表达式修改toml文件中指定字段中的参数值

如何在Python 3.9.6和MacOS Sonoma 14.3.1下安装Pyregion

裁剪数字.nd数组引发-ValueError:无法将空图像写入JPEG

用来自另一个数据框的列特定标量划分Polars数据框中的每一列,