import multiprocessing as mp
pool = mp.Pool()

def calc(i):
    return i * 2

def done(results):
    for result in results:
        print(result)

def loop():
    pool.map_async(calc, [0, 1, 2, 3], callback = done):

while True:
    loop()

我使用的是这个简化示例中描述的线程池设置.它按预期工作:主循环要求池在每次运行时通过map_async计算一组结果,池使用calc函数来处理给定的项目,当准备就绪时,调用done函数,其中包含所有结果的列表,在这种情况下总是[0, 2, 4, 6].

我想实现的目标:我希望线程池在任何结果完成处理时运行回调函数,或者将其添加到主线程可以随时查看和修改的列表中.在计算出最后一个结果后,池停止调用回调,直到主循环再次触发它并重复该过程.

目标是让工作线程尽可能快地提交处理后的结果,让主线程在执行时找到尽可能多的结果,即使它们还没有全部准备好.使用result.get()不是一个选项,因为获取结果不能阻塞主线程,如果你想在同步模式下运行,这是一个可选的设置.我很好使用池中定义的回调,或者将每个完成的结果添加到数组中,只要主循环可以看到并处理完成的项目,然后丢弃.

值得注意的是,主线程并不强制将数组提供给线程池,[0, 1, 2, 3]是类中的常量变量,但calc函数工作的其他变量需要在每次调用时从主线程更新.每个线程都会分配一个项目...例如,我的数组假设有4个进程,每个进程都负责计算其中一个数字.我不指望它能像我想象的那样工作,但请让我知道什么是最接近的.

推荐答案

仅当所有结果就绪时才调用map_async的回调函数.

如果你想在函数返回时立即调用回调函数,你可以迭代地将作业(job)提交到apply_async:

import multiprocessing as mp
from time import sleep
from random import random

def calc(i):
    sleep(random())
    return i * 2

def done(result):
    print(result)

def main():
    pool = mp.Pool()
    while True:
        for i in [0, 1, 2, 3]:
            pool.apply_async(calc, (i,), callback=done)

if __name__ == '__main__':
    main()

演示:https://replit.com/@blhsing1/StormyKnottyGeeklog

Python相关问答推荐

在Python中,如何初始化集合列表脚本的输出

修剪Python框架中的尾随NaN值

绘制系列时如何反转轴?

是什么导致对Python脚本的jQuery Ajax调用引发500错误?

将行从一个DF添加到另一个DF

通过交换 node 对链接列表进行 Select 排序

2维数组9x9,不使用numpy.数组(MutableSequence的子类)

我从带有langchain的mongoDB中的vector serch获得一个空数组

numba jitClass,记录类型为字符串

Excel图表-使用openpyxl更改水平轴与Y轴相交的位置(Python)

在Pandas DataFrame操作中用链接替换'方法的更有效方法

运行终端命令时出现问题:pip start anonymous"

如何在solve()之后获得症状上的等式的值

如何将多进程池声明为变量并将其导入到另一个Python文件

优化器的运行顺序影响PyTorch中的预测

ThreadPoolExecutor和单个线程的超时

导入...从...混乱

在Django admin中自动完成相关字段筛选

如何合并两个列表,并获得每个索引值最高的列表名称?

如何排除prefecture_related中查询集为空的实例?