我有一个程序,其中有很多音乐甲板(甲板1,甲板2,music_clip_deck,speackers_deck,ip_call_1,ip_call_2,ip_call_3).每一副牌在一个单独的过程中工作.我用来裁剪mp3文件/重传流/来自麦克风的语音/来自aiortc—pyav的语音的块时间是125msec.之后,我填充了一些队列(每个单独的进程一个队列),我发送最后一个队列到最后一个线程,以便在听到和传输给客户端之前进行最后的音频处理.

我如何同步所有进程在一起,使每个进程的一个while运行时间正好需要125 msec?

以下是一个帮助数字:

enter image description here

这种方法可能根本没有帮助:

class Deck_1_Proc(Process):
...
...
...
    def run(self):
        while(True):
            t1 = time.time()
            ...
            ...
            ...
            t2 = time.time()
            if t2 - t1 < 0.125:
                time.sleep(0.125 - (t2 - t1))

也许一个更好的方法应该是使用javascript setInterval和time参数:125msec

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()

推荐答案

主要问题是大多数定时器漂移,sleepis not accuratenot even QTimer,所以一个稳定的定时器(在某种意义上,第is not accurate个滴答接近12.5秒)必须做这样的事情.

import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            cv.notify_all()

您可以使用Condition Variable轻松地同步所有进程以同时唤醒,但如果其中一个进程滞后几毫秒,则可能需要multiprocessing.Value来确保它们只在没有滞后的情况下等待,如下所示:

import threading
import time
from multiprocessing import Condition, Value, Process, Event
def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            frame.value += 1
            cv.notify_all()
            if quit_event.is_set():
                return

def worker(cv, frame_number, worker_id, quit_event: Event):
    current_frame = 0
    while True:
        with cv:
            cv.wait_for(lambda: current_frame <= frame_number.value)
            if quit_event.is_set():
                return
        print(f"processed frame {current_frame} in worker {worker_id}")
        current_frame += 1

if __name__ == "__main__":
    condition = Condition()
    frame = Value('q', lock=False)
    quit_event = Event()
    processes = []
    for i in range(4):
        process = Process(target=worker, args=(condition, frame, i, quit_event))
        process.start()
        processes.append(process)
    tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
    tr.start()
    time.sleep(5)
    quit_event.set()

编辑:以multiprocessing.Event的形式添加了一个quit_event,因为它是一个原子.

Edit2:根据@Booboo的 comments ,将值更改为无锁的签名q,这将保存文件描述符并允许负帧编号(因为-1有其用途).

Python相关问答推荐

具有多个选项的计数_匹配

如何自动抓取以下CSV

如果条件为真,则Groupby.mean()

删除任何仅包含字符(或不包含其他数字值的邮政编码)的观察

如何记录脚本输出

为什么sys.exit()不能与subproccess.run()或subprocess.call()一起使用

如何使用Python以编程方式判断和检索Angular网站的动态内容?

cv2.matchTemplate函数匹配失败

在Python中,从给定范围内的数组中提取索引组列表的更有效方法

在单次扫描中创建列表

Flash只从html表单中获取一个值

为什么常规操作不以其就地对应操作为基础?

如何在GEKKO中使用复共轭物

在极点中读取、扫描和接收有什么不同?

如何在一组行中找到循环?

一维不匹配两个数组上的广义ufunc

根据过滤后的牛郎星图表中的数据计算新系列

使用xlsxWriter在EXCEL中为数据帧的各行上色

用LAKEF划分实木地板AWS Wrangler

有什么方法可以在不对多索引DataFrame的列进行排序的情况下避免词法排序警告吗?