如何更新下面的multi_proc_parallel_functions函数以接受参数列表.这是使用multiprocess模块.

请注意,我将在AWS Lambda中使用此功能,其他多处理模块可能会在Lambda上出现问题.

下面的adder个函数只是用于演示问题的玩具函数.

import multiprocess as mp

def parallel_functions(function,send_end):

    send_end.send(function())


def multi_proc_parallel_functions(function_list,target_func):
    jobs = []
    pipe_list = []

    for function in function_list:

        recv_end, send_end = mp.Pipe(False)
        p = mp.Process(target=target_func, args=(function,send_end))

        jobs.append(p)

        pipe_list.append(recv_end)
        p.start()

    result_list = [x.recv() for x in pipe_list]

    for proc in jobs:
        proc.join()

    return result_list
    

def adder10():
    return np.random.randint(5) + 10

def adder1000():
    return np.random.randint(5) + 1000

创建函数列表

function_list = [adder10,adder10,adder10,adder1000]

运行所有功能

multi_proc_parallel_functions(function_list,parallel_functions)

[13, 13, 13, 1003]

如何更新multi_proc_parallel_functions以接受不同长度的参数列表,这些参数将根据函数的不同而变化,如下所示:

def adder10(x,y):
    return np.random.randint(5) + 10 + x * y

def adder1000(a,b, c):
    return np.random.randint(5) + 1000 -a + b +c

我认为这需要*args.

推荐答案

这是一种方法(使用位置参数支持):

import multiprocessing as mp

def parallel_functions(function, send_end, *args):
    send_end.send(function(*args))

def multi_proc_parallel_functions(function_list, target_func):
    jobs = []
    pipe_list = []

    for (function, *args) in function_list:
        recv_end, send_end = mp.Pipe(False)
        p = mp.Process(target=target_func, args=(function, send_end, *args))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    result_list = [x.recv() for x in pipe_list]

    for proc in jobs:
        proc.join()

    return result_list
import numpy as np

def adder10(x, y):
    return np.random.randint(5) + 10 + x * y

def adder1000(a, b, c):
    return np.random.randint(5) + 1000 -a + b +c

multi_proc_parallel_functions(
  [ (adder10, 5, 4),
    (adder10, 1, 2),
    (adder1000, 5, 6, 7) ],
  parallel_functions
)

请注意,多处理模块的工作方式将取决于您是在Windows、macOS还是Linux上.

在Linux上,创建mp.Process的默认方法是使用fork系统调用,这意味着函数/其参数不需要可序列化/可以pickle.子进程将从父进程继承内存.macOS支持fork,Windows不支持.

在Windows/macOS上,默认情况下使用spawn系统调用.这要求发送到子进程的所有内容都可以序列化/可能进行pickle.例如,这意味着您将无法发送lambda表达式或动态创建的函数.

可以在Linux(使用您的原始实现)上工作,但不能在Windows(默认情况下为macOS)上工作的示例:

multi_proc_parallel_functions(
  [ lambda: adder10(5, 4),
    lambda: adder10(1, 2),
    lambda: adder1000(5, 6, 7) ],
  parallel_functions
)

# spawn: _pickle.PicklingError: Can't pickle <function <lambda> at 0x7fce7cc43010>: attribute lookup <lambda> on __main__ failed
# fork: [30, 12, 1008]

Python相关问答推荐

替换字符串中的多个重叠子字符串

将特定列信息移动到当前行下的新行

Pandas 滚动最接近的价值

删除任何仅包含字符(或不包含其他数字值的邮政编码)的观察

追溯(最近最后一次调用):文件C:\Users\Diplom/PycharmProject\Yolo01\Roboflow-4.py,第4行,在模块导入roboflow中

为什么符号没有按顺序添加?

pandas滚动和窗口中有效观察的最大数量

从groupby执行计算后创建新的子框架

如何调整QscrollArea以正确显示内部正在变化的Qgridlayout?

对象的`__call__`方法的setattr在Python中不起作用'

如何在Pyplot表中舍入值

Python避免mypy在相互引用中从另一个类重定义类时失败

当单元测试失败时,是否有一个惯例会抛出许多类似的错误消息?

Pandas—MultiIndex Resample—我不想丢失其他索引的信息´

如何训练每一个pandaprame行的线性回归并生成斜率

按列表分组到新列中

如何在Polars中将列表中的新列添加到现有的数据帧中?

如何在不遇到IndexError的情况下将基数10的整数转换为基数80?

使代码更快地解决哪个字母代表给定公式中的哪个数字

S最大值除以最小值,然后减1的结果是什么?