我一直在试验python中的多处理模块,我想知道如何通过派生的进程来处理不同并行化方法的参数.这里是我使用的代码:

import os
import time
import multiprocessing


class StateClass:
    def __init__(self):
        self.state = 0

    def __call__(self):
        return f"I am {id(self)}: {self.state}"


CONTEXT = multiprocessing.get_context("fork")

nb_workers = 2

stato = StateClass()


def wrapped_work_function(a1, a2, sss, qqq):
    time.sleep(a1 + 1)
    if a1 == 0:
        sss.state = 0
    else:
        sss.state = 123
    for eee in a2:
        time.sleep(a1 + 1)
        sss.state += eee
        print(
            f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
        )
    return sss


print("main", id(stato), stato)

manager = CONTEXT.Manager()
master_workers_queue = manager.Queue()

work_args_list = [
    (
        worker_index,
        [iii for iii in range(4)],
        stato,
        master_workers_queue,
    )
    for worker_index in range(nb_workers)
]

pool = CONTEXT.Pool(nb_workers)
result = pool.starmap_async(wrapped_work_function, work_args_list)

pool.close()
pool.join()
print("Finish")
bullo = result.get(timeout=100)
bullo.append(stato)
for sss in bullo:
    print(sss, id(sss), sss.state)

例如,我从中获得以下输出:

main 140349939506416 <__main__.StateClass object at 0x7fa5c449dcf0>
Worker 0 in process 9075 (parent process 9047): 0, I am 140350069832528: 0
Worker 0 in process 9075 (parent process 9047): 1, I am 140350069832528: 1
Worker 1 in process 9077 (parent process 9047): 0, I am 140350069832528: 123
Worker 0 in process 9075 (parent process 9047): 2, I am 140350069832528: 3
Worker 0 in process 9075 (parent process 9047): 3, I am 140350069832528: 6
Worker 1 in process 9077 (parent process 9047): 1, I am 140350069832528: 124
Worker 1 in process 9077 (parent process 9047): 2, I am 140350069832528: 126
Worker 1 in process 9077 (parent process 9047): 3, I am 140350069832528: 129
Finish
<__main__.StateClass object at 0x7fa5c43ac190> 140349938516368 6
<__main__.StateClass object at 0x7fa5c43ac4c0> 140349938517184 129
<__main__.StateClass object at 0x7fa5c449dcf0> 140349939506416 0

初始类实例stato的id为140349939506416,并像我所期望的那样在其整个生命周期中保持它.在starmap_async方法中,我确实得到了同一类的两个不同实例(每个工作者/进程一个实例),我可以修改它们,并且在脚本结束之前保留它们的state属性.无论如何,这些实例的id最初是相同的(140350069832528),并且在脚本末尾,它们都有另一个id,这也不同于原始实例的id.

推荐答案

首先,当我运行这个(Debian Linux、Python 3.9.7)时,我发现这两个子流程的sss个实例的ID并不相同:

main 140614771273680 <__main__.StateClass object at 0x7fe36d7defd0>
Worker 0 in process 19 (parent process 13): 0, I am 140614770671776: 0
Worker 0 in process 19 (parent process 13): 1, I am 140614770671776: 1
Worker 1 in process 20 (parent process 13): 0, I am 140614761373648: 123
Worker 0 in process 19 (parent process 13): 2, I am 140614770671776: 3
Worker 0 in process 19 (parent process 13): 3, I am 140614770671776: 6
Worker 1 in process 20 (parent process 13): 1, I am 140614761373648: 124
Worker 1 in process 20 (parent process 13): 2, I am 140614761373648: 126
Worker 1 in process 20 (parent process 13): 3, I am 140614761373648: 129
Finish
<__main__.StateClass object at 0x7fe36ce7b7f0> 140614761428976 6
<__main__.StateClass object at 0x7fe36ce7b520> 140614761428256 129
<__main__.StateClass object at 0x7fe36d7defd0> 140614771273680 0

即使您正在Forking 新进程,work_args_list列表中的stato个实例也将作为sss传递给您的工作函数.传递给在不同进程/地址空间中运行的池工作函数的参数由pickle完成,pickle序列化然后反序列化实例,从而生成一个副本,当其反序列化时,in general将具有不同的id.在这种特殊情况下,每个进程在使用fork方法时继承全局变量stato,并且在所有进程/地址空间中都应该具有相同的id.如果我们修改wrapped_work_function以打印出stato的id,我们可以验证这一点,因此:

def wrapped_work_function(a1, a2, sss, qqq):
    print('The id of the inherited stato is', id(stato))
    time.sleep(a1 + 1)
    if a1 == 0:
        sss.state = 0
    else:
        sss.state = 123
    for eee in a2:
        time.sleep(a1 + 1)
        sss.state += eee
        print(
            f"Worker {a1} in process {os.getpid()} (parent process {os.getppid()}): {eee}, {sss()}"
        )
    return sss

则打印输出为:

main 140456701534160 <__main__.StateClass object at 0x7fbe9fcd1fd0>
The id of the inherited stato is 140456701534160
The id of the inherited stato is 140456701534160
Worker 0 in process 43 (parent process 37): 0, I am 140456700920112: 0
Worker 0 in process 43 (parent process 37): 1, I am 140456700920112: 1
Worker 1 in process 44 (parent process 37): 0, I am 140456700920112: 123
Worker 0 in process 43 (parent process 37): 2, I am 140456700920112: 3
Worker 0 in process 43 (parent process 37): 3, I am 140456700920112: 6
Worker 1 in process 44 (parent process 37): 1, I am 140456700920112: 124
Worker 1 in process 44 (parent process 37): 2, I am 140456700920112: 126
Worker 1 in process 44 (parent process 37): 3, I am 140456700920112: 129
Finish
<__main__.StateClass object at 0x7fbe9f36e880> 140456691689600 6
<__main__.StateClass object at 0x7fbe9f36eb20> 140456691690272 129
<__main__.StateClass object at 0x7fbe9fcd1fd0> 140456701534160 0

所有地址空间都会看到stato的相同id,即140456701534160.如果每个地址空间都看到继承stato的相同id,那么sss的id(应该是stato,的单独副本)不能与stato具有相同的id.当我运行代码时,正如我所期望的那样,它们具有不同的ID.但在不同地址空间中运行的每个sss可能彼此具有相同的id,但这并不能保证(在第二次运行时,它们是相同的).

But even if the 100 instances have the same id and the same address, these are two instances existing in two different processes and thus two different address spaces. That is why they can maintain distinct states.另一方面,当您的worker函数返回sss时,它将使用pickle传递回主进程,pickle对实例进行序列化和反序列化,从而实际上生成原始实例的副本.这就是为什么返回的ID不同.

另一方面:您有bullo = result.get(timeout=100)个可能超时的测试.但在这条语句之前,会调用pool.close()pool.join().这两个调用将等待所有提交的任务完成.因此,当您调用result.get时,任务保证已完成,并且永远不会导致超时异常.

Python相关问答推荐

DataFrame groupby函数从列返回数组而不是值

_repr_html_实现自定义__getattr_时未显示

如何从具有不同len的列表字典中创建摘要表?

如何将一个动态分配的C数组转换为Numpy数组,并在C扩展模块中返回给Python

组/群集按字符串中的子字符串或子字符串中的字符串轮询数据框

从一个系列创建一个Dataframe,特别是如何重命名其中的列(例如:使用NAs/NaN)

将输入聚合到统一词典中

如何根据一列的值有条件地 Select 前N个组,然后按两列分组?

连接一个rabrame和另一个1d rabrame不是问题,但当使用[...]'运算符会产生不同的结果

Django admin Csrf令牌未设置

Python—为什么我的代码返回一个TypeError

30个非DATETIME天内的累计金额

如何训练每一个pandaprame行的线性回归并生成斜率

ModuleNotFoundError:Python中没有名为google的模块''

操作布尔值的Series时出现索引问题

为什么在Python中00是一个有效的整数?

查找查找表中存在的列值组合

为什么我的scipy.optimize.minimize(method=";newton-cg";)函数停留在局部最大值上?

如何通过特定导入在类中执行Python代码

如何在Python中画一个只能在对角线内裁剪的圆?