首先,我要感谢StackOverflow社区多年来为我提供的巨大帮助,我不必问任何问题.

我对多进程比较陌生;不久前,我成功地使用了多重处理.以一种非常简单的方式创建池,在子进程之间我不需要任何反馈.

我试图从一个类中构建一个并行回火蒙特卡罗算法.

基本类大致如下:

import numpy as np

class monte_carlo:

    def __init__(self):
        self.x=np.ones((1000,3))
        self.E=np.mean(self.x)
        self.Elist=[]

    def simulation(self,temperature):
        self.T=temperature
        for i in range(3000):
            self.MC_step()
            if i%10==0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x=self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1,1,3))
        temp_E=np.mean(self.x)
        if np.random.random()<np.exp((self.E-temp_E)/self.T):
            self.E=temp_E
            self.x=x
        return

显然,我简化了很多(实际的类有500行!),为了简单起见,我们构建了假函数:__init__将一组参数作为参数,除了自身之外,还有更多的度量列表.Elist,以及许多从self派生的array.我用来计算它们的X.关键的一点是,这个类的每个实例都包含很多我想保存在内存中的信息,我不想一次又一次地复制这些信息,以避免急剧减速.否则我只会使用多重处理.池模块.

现在,我想用伪代码进行并行化:

def proba(dE,pT):
    return np.exp(-dE/pT)  
          
Tlist=[1.1,1.2,1.3]
N=len(Tlist)
G=[]
for _ in range(N):
    G.append(monte_carlo())

for _ in range(5):

    for i in range(N): # this loop should be ran in multiprocess
        G[i].simulation(Tlist[i])
    
    for i in range(N//2): 
        dE=G[i].E-G[i+1].E
        pT=G[i].T + G[i+1].T
        p=proba(dE,pT) # (proba is a function, giving a probability depending on dE)
        if np.random.random() < p: 
             T_temp = G[i].T
             G[i].T = G[i+1].T
             G[i+1].T = T_temp

综合:我想在并行子进程中运行monte-carlo类的多个实例,参数T的值不同,然后定期暂停所有操作以更改不同的T,然后从暂停的地方再次运行子进程/类实例.

提前谢谢,如果我不清楚,很抱歉...

编辑:

编辑2:

Edit3:Charchit answer对测试代码非常有效,无论是在我的个人机器上还是在我通常用来运行代码的远程机器上.因此,我认为这是公认的答案.

Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gtk-WARNING **: ##:##:##:###: Locale not supported by C library.
    Using the fallback 'C' locale.
Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: 

gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

"##:#:#:#:#:#:#:##是(或似乎是)IP地址.

我认为这应该发表一个新的问题,但我还是把它放在这里,以防有人有一个快速的答案.

推荐答案

你要找的是sharing state between processes.根据文档,您可以创建shared memory,这对它可以存储的数据有限制,并且不是线程安全的,但提供了更好的速度和性能;或者可以使用server processesmanagers.我们将使用后者,因为您希望共享用户定义数据类型的整个对象.请记住,使用管理器会影响代码的速度,这取决于向托管对象传递和接收的参数的复杂性.

经理、代理人和酸洗

如上所述,管理器创建服务器进程来存储对象,并允许通过代理访问对象.我回答了一个问题,更详细地介绍了它们的工作方式,以及如何创建合适的代理here.我们将使用链接答案中定义的相同代理,有some种变体.也就是说,我已经将__getattr__中的工厂功能替换为可以使用pickle进行酸洗的功能.这意味着您可以运行使用此代理创建的托管对象的实例方法,而无需使用multiprocess.结果是此修改的代理:

from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result

解决方案

现在,我们只需要确保在创建monte_carlo的对象时,使用管理器和上述代理即可.为此,我们创建了一个名为create的类构造函数.应使用此函数创建monte_carlo的所有对象.这样,最终的代码如下所示:

from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result


class monte_carlo:

    def __init__(self, ):
        self.x = np.ones((1000, 3))
        self.E = np.mean(self.x)
        self.Elist = []
        self.T = None

    def simulation(self, temperature):
        self.T = temperature
        for i in range(3000):
            self.MC_step()
            if i % 10 == 0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x = self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1, 1, 3))
        temp_E = np.mean(self.x)
        if np.random.random() < np.exp((self.E - temp_E) / self.T):
            self.E = temp_E
            self.x = x
        return

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst


def proba(dE,pT):
    return np.exp(-dE/pT)


if __name__ == "__main__":
    Tlist = [1.1, 1.2, 1.3]
    N = len(Tlist)
    G = []

    # Create our managed instances
    for _ in range(N):
        G.append(monte_carlo.create())

    for _ in range(5):

        #  Run simulations in the manager server
        results = []
        with Pool(8) as pool:

            for i in range(N):  # this loop should be ran in multiprocess
                results.append(pool.apply_async(G[i].simulation, (Tlist[i], )))

            # Wait for the simulations to complete
            for result in results:
                result.get()

        for i in range(N // 2):
            dE = G[i].E - G[i + 1].E
            pT = G[i].T + G[i + 1].T
            p = proba(dE, pT)  # (proba is a function, giving a probability depending on dE)
            if np.random.random() < p:
                T_temp = Tlist[i]
                Tlist[i] = Tlist[i + 1]
                Tlist[i + 1] = T_temp

    print(Tlist)

这符合您想要的标准.它根本不创建任何副本,而是将simulation方法调用的所有参数在池中序列化,并发送到实际存储对象的管理器服务器.它在那里执行,结果(如果有)被序列化并在主进程中返回.所有这些,只使用内置的!

Output

[1.2, 1.1, 1.3]

Edit

由于您使用的是Linux,我建议您在if __name__ ...子句中使用multiprocessing.set_start_method将start方法设置为"spawn".这样做将确保子进程不能访问子句中定义的变量.

Python-3.x相关问答推荐

在numpy. linalg的qr之后使用scipy. integrate中的solve_ivp时出现了一个奇怪的错误

使用pybind11时,在sys.exit(0)处成功完成测试后,Python单元测试冻结

循环遍历数据框以提取特定值

While循环不停止地等待,直到时间.睡眠结束

当索引大于一个整数而小于前一个索引时,我如何返回列值?

将列表转换为 pandas 数据框,其中列表包含字典

如何将函数映射到所有命名元组的元素?

如何通过 python 使用 auth no priv 获取 SNMPv3?

我想使用命令提示符安装 cv2

单击图形时 plotly graph_objects 持久性数据

总结基于条件的值,如果不匹配则保留当前值

你如何表达一个没有参数的 Python Callable?

如何通过python打开文件

使用 Python3 与 HDFS 交互的最佳模块是什么?

如何使用 asyncio 添加连接超时?

在不关心项目的情况下运行生成器功能的更简单方法

如何使用已打开并使用登录凭据登录的浏览器

将 numpy.float64 列表快速转换为 Python 中的浮点数

连接 dict 值,它们是列表

用 Anaconda 安装了一个包,无法在 Python 中导入