此代码未按预期工作的原因是,多进程不与子进程共享其状态.这意味着您启动的每个进程p1
、p2
和p3
都会获得类Shop
对象的副本.它不是同一个对象.有two种方法可以解决这个问题,与进程共享实例属性stock
,或者共享整个对象本身.第二种方法可能更适合于更大的用例,如果shop对象保存了需要在进程之间共享的其他数据.
Method 1:
要仅共享stock
实例变量的值,可以使用multiprocessing.Value.使用此方法创建共享整数并访问其值的方法如下:
shared_int = multiprocessing.Value('i', 5)
print(f'Value is {shared_int.value}') # 5
根据您的用例进行调整,代码将变成:
import multiprocessing
class Shop:
def __init__(self, stock=5):
self.stock = multiprocessing.Value('i', stock)
def get_item(self, l, x):
l.acquire()
if self.stock.value >= x:
self.stock.value -= x
print(f"{self.stock.value} = remaining")
l.release()
if __name__ == "__main__":
l = multiprocessing.Lock()
obj = Shop()
p1 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p2 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p3 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("Final: ", obj.stock.value)
Output
4 = remaining
3 = remaining
2 = remaining
Final: 2
Method 2
共享整个复杂对象是一个更复杂的过程.我最近提出了一个关于共享复杂对象(如本例中的类store 对象)的类似问题,该问题也涵盖了下面提供的代码背后的推理.我建议您阅读一下,因为它更详细地解释了底部提供的代码背后的逻辑.这个用例唯一的主要区别是您希望使用multiprocess,一个多处理分支,而不是多处理.该库的工作原理与内置的多处理相同,只是它提供了我们所需要的更好的酸洗支持.
基本上,您需要使用multiprocessing.Managers来共享状态,并使用合适的代理来访问状态.下面代码中提供的ObjProxy
就是这样一个代理,它共享名称空间和实例方法(除了受保护/私有属性之外).一旦有了这些,您只需要使用管理器和代理创建Shop
类的对象.这是使用新添加的Shop
类create
方法完成的.这是一个类构造函数,Shop
的所有对象都应仅使用此方法创建,而不是直接调用构造函数.完整代码:
import multiprocess
from multiprocess import Manager, Process
from multiprocess.managers import NamespaceProxy, BaseManager
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
class Shop:
def __init__(self, stock=5):
self.stock = stock
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
def get_item(self, l, x):
with l:
if self.stock >= x:
self.stock -= x
print(f"{self.stock} = remaining")
def k(self, l, n):
pass
if __name__ == "__main__":
manager = Manager()
l = manager.Lock()
obj = Shop.create()
p1 = Process(target=obj.get_item, args=(l, 1, ))
p2 = Process(target=obj.get_item, args=(l, 1, ))
p3 = Process(target=obj.get_item, args=(l, 1, ))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("Final: ", obj.stock)
Output
4 = remaining
3 = remaining
2 = remaining
Final: 2
Note:这两行的说明:
manager = Manager()
l = manager.Lock()
在您的示例中,之前我们不需要为锁创建管理器(以及随后的代理)的原因如下所述.如果不创建代理,它就不能使用上面的代码,原因是我们不再在主进程中创建进程,并且锁在当前进程内存空间中不存在(因为为复杂对象创建管理器以共享其状态产生了自己的服务器进程)