在Python中,我实现了两种类型的队列读取

不同之处:

  1. 在主进程中创建并执行队列
  2. 队列在主进程中创建,并由其他进程执行.

但是有一个性能差异,我试着调试它,但我看不出为什么!

代码: Queue1.py

import multiprocessing
import time
import cProfile, pstats, io


def put_queue(queue):
    for i in range(500000):
        queue.put(i)

def get_queue(queue):
    pr = cProfile.Profile()
    pr.enable()
    print(queue.qsize())
    while queue.qsize() > 0:
        try:
            queue.get(block=False)
        except:
            pass
    pr.dump_stats("queue1.prof")
    pr.disable()
    s = io.StringIO()
    sortby = "cumtime"
    ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
    ps.print_stats()
    print(s.getvalue())


q1 = multiprocessing.Queue()
t1 = time.time()
put_queue(q1)
t2 = time.time()
print(t2-t1)

t1 = time.time()
p1 = multiprocessing.Process(target=get_queue, args=(q1,))
p1.start()
p1.join()
t2 = time.time()
print(t2-t1)

Queue2.py

import multiprocessing
import time
import cProfile, pstats, io


def put_queue(queue):
    for i in range(500000):
        queue.put(i)


def get_queue(queue):
    pr = cProfile.Profile()
    pr.enable()
    print(queue.qsize())
    while queue.qsize() > 0:
        try:
            queue.get(block=False)
        except:
            pass
    pr.dump_stats("queue2.prof")
    pr.disable()
    s = io.StringIO()
    sortby = "cumtime"
    ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
    ps.print_stats()
    print(s.getvalue())


q2 = multiprocessing.Queue()
t1 = time.time()
put_queue(q2)
t2 = time.time()
print(t2 - t1)

t1 = time.time()
get_queue(q2)
t2 = time.time()
print(t2 - t1)

python Queue2.py takes longer than queue1.py

我还会打印个人资料. enter image description here

Queue2.py cost muth time in built-in method posix.read.

我想知道确切的原因.

推荐答案

我不知道我能不能告诉你你观察你所看到的东西的"确切原因",但我有一个我认为是相当好的解释--即使它不能解释你所看到的all%.

首先,如果您阅读了关于multiprocessing.Queue的文档,您将看到不应该使用对方法qsize 102的调用(如果您遵循我下面对所发生的事情的解释,您很容易就会明白为什么会这样).让我们使用一个遵守文档的更简单的基准:

from multiprocessing import Queue, Process
import time

N = 500_000

def putter(queue):
    for i in range(N):
        queue.put(i)

def getter(queue):
    for i in range(N):
        queue.get()

def benchmark1(queue):
    t = time.time()
    putter(queue)
    getter(queue)
    elapsed = time.time() - t
    print('benchmark 1 time:', elapsed)


def benchmark2(queue):
    t = time.time()
    putter(queue)
    p = Process(target=getter, args=(queue,))
    p.start()
    p.join()
    elapsed = time.time() - t
    print('benchmark 2 time:', elapsed)

if __name__ == '__main__':
    queue = Queue()
    benchmark1(queue)
    benchmark2(queue)

打印:

benchmark 1 time: 19.12191128730774
benchmark 2 time: 8.261705160140991

事实上,我们看到getter在另一个进程中运行的代码版本的速度大约是getter的两倍.

Explanation

这两种情况显然是有区别的,我相信以下就可以解释这一点:

多处理队列构建在multiprocessing.Pipe个实例之上,该实例的容量非常有限.如果您try 通过管道发送数据,而另一端没有另一个线程/进程读取数据,那么在发送了很少的数据后,您很快就会被阻塞.然而,我们知道,如果我们定义一个容量无限的队列,我可以像在上面的基准测试中那样,在不阻塞的情况下处理500_000个put个请求.是什么魔法让这一切成为可能?

创建队列时,将创建一个collections.deque实例作为队列实现的一部分.当put完成后,你只是简单地附加到一个容量不受限制的双排上.此外,还会启动一个线程,等待DQUE获得数据,而实际上正是这个线程负责通过管道发送数据.因此,这个特殊的线程正在阻塞,而主线程可以继续向队列(实际上是底层出队)添加数据,而不会阻塞.

当我们在benchmark1中调用putter函数时,基本上所有的"PUT"数据仍在出列中.当主线程中运行的getter函数开始从队列中获取项时,另一个线程可以通过管道发送更多数据.因此,我们在这两个线程之间快速来回移动,其中一个线程基本上处于空闲状态.以下是其中的一小部分.为简单起见,我假设管道的容量只是一项:

  1. putter线程在管道上被阻塞(未运行),等待某个东西从管道获取数据.
  2. getter线程从队列(实际上是底层管道)获取一个项目,但现在队列暂时为空,直到推杆可以从其出列的下一个项目中取出并通过管道发送它.在此之前,获取程序处于等待状态.
  3. 推杆"醒来"不再受阻于管道,现在可以通过管道发送另一件物品.但现在它再次阻塞,直到发送了getter并可以从管道中获取项.

实际上,管道的容量可能是几个这样的小整数项,因此getter可能会运行并从队列中获取几个这样的项,从而允许推杆同时放置更多的项而不会阻塞.然而,much of the queueing logic is Python bytecode that needs to acquire the GIL before it can execute. In the multiprocessing case, 100, the two threads are running in different processes and so there is no GIL contention. This means items can be moved from the deque through the pipe in parallel with the getter retrieving them and I believe this accounts for the timing difference.

Python相关问答推荐

从webhook中的短代码(而不是电话号码)接收Twilio消息

Polars比较了两个预设-有没有方法在第一次不匹配时立即失败

对整个 pyramid 进行分组与对 pyramid 列子集进行分组

如何获取TFIDF Transformer中的值?

基于字符串匹配条件合并两个帧

如果值发生变化,则列上的极性累积和

joblib:无法从父目录的另一个子文件夹加载转储模型

Python列表不会在条件while循环中正确随机化'

名为__main__. py的Python模块在导入时不运行'

Matplotlib中的字体权重

Python—压缩叶 map html作为邮箱附件并通过sendgrid发送

如何使用正则表达式修改toml文件中指定字段中的参数值

利用SCIPY沿第一轴对数组进行内插

在pandas中,如何在由两列加上一个值列组成的枢轴期间或之后可靠地设置多级列的索引顺序,

无法在盐流道中获得柱子

如何在Pandas中用迭代器求一个序列的平均值?

Django更新视图未更新

某些值的数值幂和**之间的差异

PyTorch变压器编码器中的填充掩码问题

Pandas查找给定时间戳之前的最后一个值