我需要帮助理解multiprocessing.Queue.我面临的问题是,与对queue.put(...)的调用和队列的缓冲区(deque)相比,从queue.get(...)获取结果的速度要慢得多.

这个泄漏的抽象让我调查了队列的内部 struct .它直截了当的source code让我想到了deque implementation,这似乎也足够简单,我无法用它来解释我看到的行为.我还读到队列使用管道,但在源代码中似乎找不到.

我把它归结为一个再现问题的最小示例,并在下面指定了一个可能的输出.

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))

输出:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28

我希望您注意以下结果:在插入元素28001后,工作人员发现队列中没有剩余的元素,而还有几十个.因为同步,我可以只得到所有的,除了少数.但它只找到了two个!

这种模式还在继续.

这似乎与我放入队列的对象的大小有关.对于小型物体,比如i而不是list(range(i)),这个问题不会出现.但所讨论的对象的大小仍然是千字节,几乎不足以体现如此显著的延迟(在我的现实世界的非最小示例中,这很容易就花了几分钟)

My question specifically is:我如何在Python中的进程之间共享(不是这样)大量数据?

推荐答案

我也遇到了这个问题.我发送的是大型numpy数组(约300MB),在mp时速度非常慢.队列get().

看了几眼Python 之后.7 mp的源代码.在队列中,我发现最慢的部分(在类unix系统上)是socket_connection.c分之_conn_recvall(),但我没有深入研究.

为了解决这个问题,我构建了一个实验包FMQ.

这个项目的灵感来自多处理的使用.队列(mp.队列).议员.由于管道(在类Unix系统上)的速度限制,大型数据项的队列速度较慢.

和议员.队列处理进程间传输时,FMQ实现了一个窃取者线程,它从mp中窃取一个项目.在任何项目可用时排队,并将其放入队列.队列然后,使用者进程可以从队列中获取数据.马上排队.

加速是基于这样的假设:生产者和消费者过程都是计算密集型的(因此需要多处理),数据量大(例如,>;50 227x227图像).否则议员.使用多处理或队列的队列.带线程的队列就足够了.

fmq.队列就像议员一样容易使用.队列

请注意,由于该项目处于早期阶段,目前仍有大约Known Issues个项目.

Python-3.x相关问答推荐

如何转换Pandas中的数据,以使我 Select 的列名变为行值并增加行?

我不能使用拆分来分隔数据

在 python 中使用正则表达式在行尾查找特定元素

安装没有 sudo 权限的 python3 和 pip3

列出相同索引的Pandas

Python ** 用于负数

XPATH:使用 .find_elements_by_xpath 为未知数量的 xpath 输入值

用于 BIG 数组计算的多处理池映射比预期的要慢

根据另一列值对多个数据框列进行分组

BeautifulSoup 和 pd.read_html - 如何将链接保存到最终数据框中的单独列中?

使用一周的特定第一天将每日日期转换为每周

迭代dict值

为什么等效的 Python 代码要慢得多

如何模拟 open(...).write() 而不会出现没有这样的文件或目录错误?

如何区分文件之类的对象和文件路径之类的对象

TypeError:多个基地有实例布局冲突

如何从 Python 3 导入 FileNotFoundError?

如何避免使用我的 python 包构建 C 库?

在 macbook pro M1 上安装 Tensorflow 时出现zsh:非法硬件指令 python

在 Visual Studio Code 中调试 Scrapy 项目