我需要帮助理解multiprocessing.Queue
.我面临的问题是,与对queue.put(...)
的调用和队列的缓冲区(deque)相比,从queue.get(...)
获取结果的速度要慢得多.
这个泄漏的抽象让我调查了队列的内部 struct .它直截了当的source code让我想到了deque implementation,这似乎也足够简单,我无法用它来解释我看到的行为.我还读到队列使用管道,但在源代码中似乎找不到.
我把它归结为一个再现问题的最小示例,并在下面指定了一个可能的输出.
import threading
import multiprocessing
import queue
q = None
def enqueue(item):
global q
if q is None:
q = multiprocessing.Queue()
process = threading.Thread(target=worker, args=(q,)) # or multiprocessing.Process Doesn't matter
process.start()
q.put(item)
print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
def worker(local_queue):
while True:
try:
while True: # get all items
item = local_queue.get(block=False)
print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
except queue.Empty:
print('empty')
if __name__ == '__main__':
for i in range(1, 100000, 1000):
enqueue(list(range(i)))
输出:
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
我希望您注意以下结果:在插入元素28001后,工作人员发现队列中没有剩余的元素,而还有几十个.因为同步,我可以只得到所有的,除了少数.但它只找到了two个!
这种模式还在继续.
这似乎与我放入队列的对象的大小有关.对于小型物体,比如i
而不是list(range(i))
,这个问题不会出现.但所讨论的对象的大小仍然是千字节,几乎不足以体现如此显著的延迟(在我的现实世界的非最小示例中,这很容易就花了几分钟)
My question specifically is:我如何在Python中的进程之间共享(不是这样)大量数据?