我刚刚开始使用Python中的多处理.
我在程序中使用multiprocessing.queues
,并将其从一个流程传递到另一个流程.
问题是,当我使用myprocess.join()
时,只有当我要写入的队列为空时,它才会终止.
所以我想知道原因是什么,为什么以这种方式构建以及如何绕过它.我可以清空主流程中的所有队列,但我更喜欢它.
漏洞部分: 我向您解释的是包含超过100个元素的队列的基本条件.
在我的例子中,如果您输入in range(100)
:in range(100)
的初始数据列表,那么p4.join()
将按预期被调用,但如果您输入in range(100)
0或更多,它将停止工作,正如我解释的那样.所以我有点迷失了.
编辑:该错误出现在输入数据大小为533或534个元素时
谢谢,
这里有一个小例子给您看.
import random
import multiprocessing
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
p4.join()
print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
print(end_time - start_time)