时间太长;我没读过
warnings.catch_warnings()
上下文管理器是not thread safe.如何在并行处理环境中使用它?
出身背景
下面的代码使用Python的multiprocessing
模块并行处理来解决最大化问题.它获取一个(不可变的)小部件列表,将它们进行分区(参见Efficient multiprocessing of massive, brute force maximization in Python 3),找到所有分区的最大值("最终参与者"),然后找到这些"最终参与者"的最大值("冠军")如果我正确地理解了自己的代码(如果我理解了,我就不会在这里),我将与所有子进程共享内存,为它们提供输入小部件,并且multiprocessing
使用操作系统级别的管道和酸洗,在工作完成后将最终的小部件发送回主进程.
问题的根源
I want to catch the redundant widget warnings being caused by widgets' re-instantiation after the unpickling当小部件从进程间管道中出来时就会发生这种情况.当小部件对象实例化时,它们会验证自己的数据,并从Python standard warnings
模块发出警告,告诉应用程序的用户小部件怀疑用户的输入数据有问题.因为取消勾选会导致对象实例化,所以我对代码的理解意味着,每个小部件对象只重新实例化一次,当且仅当它从管道中出来后是最终对象时——请参阅下一节以了解为什么这是不正确的.
这些小部件在被蛙鸣之前就已经创建好了,所以用户已经痛苦地意识到自己的输入错误了,不想再听到它了.这些是我想通过warnings
模块的catch_warnings()
上下文管理器(即with
语句)捕捉到的警告.
失败的解决方案
在我的测试中,当多余的警告被发送到我下面标记为Line A和Line B之间的任何地方时,我缩小了范围.令我惊讶的是,这些警告并非在接近output_queue.get()
个地方发出.这对我来说意味着multiprocessing
使用酸洗将小部件发送给工人.
结果是,将warnings.catch_warnings()
创建的上下文管理器放在Line A到Line B之间,并在该上下文中设置正确的警告过滤器,并不能捕获警告.这对我来说意味着警告是在工作进程中发出的.将此上下文管理器置于工作代码周围也不会捕获警告.
密码
本例省略了用于确定问题大小是否太小而无法进行Forking 处理、导入多处理以及定义my_frobnal_counter
和my_load_balancer
的代码.
"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"
def frobnicate_parallel_worker(widgets, output_queue):
resultant_widget = max(widgets, key=my_frobnal_counter)
output_queue.put(resultant_widget)
def frobnicate_parallel(widgets):
output_queue = multiprocessing.Queue()
# partitions: Generator yielding tuples of sets
partitions = my_load_balancer(widgets)
processes = []
# Line A: Possible start of where the warnings are coming from.
for partition in partitions:
p = multiprocessing.Process(
target=frobnicate_parallel_worker,
args=(partition, output_queue))
processes.append(p)
p.start()
finalists = []
for p in processes:
finalists.append(output_queue.get())
# Avoid deadlocks in Unix by draining queue before joining processes
for p in processes:
p.join()
# Line B: Warnings no longer possible after here.
return max(finalists, key=my_frobnal_counter)