我正在做一个音频播放器,从udp套接字接收样本,一切正常.但当我实现丢失隐藏算法时,播放器未能以预期的速率保持沉默(每10毫秒发送一个160字节的列表).

在使用pyaudio播放音频时,使用阻塞调用write来播放一些示例,我注意到它在示例期间平均被阻塞.所以我创建了一个新的专用流程来播放样本.

主进程处理音频输出流,并使用多处理程序将结果发送给该进程.管我决定使用多重处理.管道,因为它应该比其他方式更快.

不幸的是,当我在虚拟机上运行该程序时,比特率是我在快速PC上获得的比特率的一半,这台PC并没有未能达到目标比特率.

经过一些测试,我得出结论,导致延迟的是管道的功能send.

我编写了一个简单的基准脚本(见下文),以查看各种传输到进程的方法之间的差异.脚本持续发送[b'\x00'*160]个字节,持续5秒,并计算字节对象总共发送了多少字节.我测试了以下发送方法:"不发送",多处理.管道,多处理.队列,多处理.经理,多处理.侦听器/客户端,最后是套接字.插座:

运行Windows 7 x64的"快速"PC的结果:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

运行Windows 7 x64的VirtualBox虚拟机来宾和运行Windows 7 x64的主机的结果:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

使用的脚本:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

问题

  • 如果队列使用管道,在这种情况下它比管道快多少?
  • 在发送延迟较低的情况下,如何保证从一个进程到另一个进程的恒定比特率流?

更新1

在我的程序中,在try 使用队列而不是管道之后.I got an enormous boost

在我的计算机上,使用管道我得到了+-16000 B/s,使用队列我得到了+-750万B/s.在虚拟机上,我得到了+-13000 B/s到650万B/s.这是使用队列而不是管道读取的大约500倍字节.

当然,我不会以每秒数百万字节的速度播放,我只会以正常的速度播放声音.(在我的例子中是16000 B/s,与上面的值一致)

推荐答案

我不能确定,但我认为您正在处理的问题是同步与异步I/O.我的猜测是,管道以某种方式以同步结束,队列以异步结束.下面的问题和答案或许能更好地回答为什么一方违约,另一方违约:

Synchronous/Asynchronous behaviour of python Pipes

Python-3.x相关问答推荐

如何在python中有效地使用多处理和pytube库来加快下载速度?

PythonPandas 创建一个列并添加到DataFrame

如何使用Python将嵌套的XML转换为CSV

Python webdrivermanager 和 Chrome 115.0 的 URL https://chromedriver.storage.googleapis.com/LATEST_RELEASE_115.0.5790 错误没有此类驱动程序

aiogram机器人中处理文本输入异常而不是按钮点击的回调函数.

Pandas 转换为日期时间

如何使用`re.findall`从字符串中提取数据

如何在Pandas 中按条件计算分组?

无法使用 Python 和 Selenium 检索 href 属性

两个Pandas数据框中的共同列列表

获取比较多列的最大值并返回特定值

如何模拟 Django 模型对象(及其方法)?

创建日志(log)文件

在计算之前删除包含某些值的组合

从 csv 中删除单行而不复制文件

连接 dict 值,它们是列表

Python 3.4 多处理队列比 Pipe 快,出乎意料

新项目:Python 2 还是 Python 3?

python中的绝对导入是什么?

如何从集合中删除多个元素?