我刚刚开始使用Python中的多处理. 我在程序中使用multiprocessing.queues ,并将其从一个流程传递到另一个流程. 问题是,当我使用myprocess.join()时,只有当我要写入的队列为空时,它才会终止. 所以我想知道原因是什么,为什么以这种方式构建以及如何绕过它.我可以清空主流程中的所有队列,但我更喜欢它.

漏洞部分: 我向您解释的是包含超过100个元素的队列的基本条件.

在我的例子中,如果您输入in range(100):in range(100)的初始数据列表,那么p4.join()将按预期被调用,但如果您输入in range(100)0或更多,它将停止工作,正如我解释的那样.所以我有点迷失了.

编辑:该错误出现在输入数据大小为533或534个元素时

谢谢,

这里有一个小例子给您看.



import random
import multiprocessing
import time


def process1(data, queue1):
    for item in data:
        queue1.put(item)

    queue1.put(None)


def process2(queue1, queue2):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        queue2.put(item * 2)


def process3(queue2, queue3):
    while True:
        item = queue2.get()
        if item is None:
            queue3.put(None)
            break
        queue3.put(item * 2)


def process4(queue3, queue4):
    while True:
        item = queue3.get()
        if item is None:
            queue4.put(None)
            print("None received")
            break
        queue4.put(item * 2)
    print("End process4")


if __name__ == "__main__":
    start_time = time.time()

    data = [random.randint(1, 10000) for _ in range(10000)]

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()
    queue3 = multiprocessing.Queue()
    queue4 = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=process1, args=(data, queue1))
    p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
    p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
    p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print("Proc started")

    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")
    p4.join()
    print("p4 join")

    results = []
    print("While loop")
    while True:
        item = queue4.get()
        if item is None:
            break
        results.append(item)

    end_time = time.time()

    print(end_time - start_time)



推荐答案

该问题在"加入使用队列的流程"部分的https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming中进行了描述.

基本思想是,流程将等待然后终止,直到所有缓冲的项目由"feed"线程提供到底层管道.

import multiprocessing
import random
import time


def process1(data, queue1):
    for item in data:
        queue1.put(item)

    queue1.put(None)


def process2(queue1, queue2):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        queue2.put(item * 2)


def process3(queue2, queue3):
    while True:
        item = queue2.get()
        if item is None:
            queue3.put(None)
            break
        queue3.put(item * 2)


def process4(queue3, queue4):
    # calling queue4.cancel_join_thread() will correctly terminate the process
    # queue4.cancel_join_thread()
    # BUT not all items are put into the queue4 (rest are discarded)

    # see https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
    # section "Joining processes that use queues"

    while True:
        item = queue3.get()
        if item is None:
            queue4.put(None)
            print("None received")
            break
        queue4.put(item * 2)
    print("End process4")
    print(queue4.qsize())


if __name__ == "__main__":
    start_time = time.time()

    data = [random.randint(1, 10000) for _ in range(10000)]

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()
    queue3 = multiprocessing.Queue()
    queue4 = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=process1, args=(data, queue1))
    p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
    p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
    p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print("Proc started")

    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")

    # don't call p4.join() here
    # p4.join()
    # print("p4 join")

    results = []
    print("While loop")
    while True:
        item = queue4.get()
        if item is None:
            break
        results.append(item)

    end_time = time.time()

    # call p4.join() here instead:
    p4.join()
    print("p4 join")

    print(end_time - start_time)

输出:

Proc started
p1 join
None received
End process4
10001
p2 join
p3 join
While loop
p4 join
0.1303730010986328

Python相关问答推荐

了解NP的形状.apply_along_axis输出

为什么pyright在这里发出类型不兼容错误?

七段显示不完整

使用SKLearn KMeans和外部生成的相关矩阵

从收件箱获取特定列中的重复行

当变量也可以是无或真时,判断是否为假

是否有方法将现有的X-Y图转换为X-Y-Y1图(以重新填充)?

将numpy数组与空数组相加

Python中两个矩阵的自定义Hadamard风格产物

Numpy索引argsorted使用integer数组,同时保留排序顺序

如何使用PyTest根据self 模拟具有副作用的属性

从包含数字和单词的文件中读取和获取数据集

Python中的负前瞻性regex遇到麻烦

如何使用没有Selenium的Python在百思买着陆页面上处理国家/地区 Select ?

如何根据另一列值用字典中的值替换列值

由于NEP 50,向uint 8添加-256的代码是否会在numpy 2中失败?

聚合具有重复元素的Python字典列表,并添加具有重复元素数量的新键

当独立的网络调用不应该互相阻塞时,'

使用BeautifulSoup抓取所有链接

dask无groupby(ddf. agg([min,max])?''''