下面的代码为第三个result生成TimeoutError.

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def sleeping(i):
    time.sleep(i)

    return f'slept {i} seconds'


with ThreadPoolExecutor(max_workers=1) as exe:
    try:
        for result in exe.map(sleeping, [1, 2, 10, 3], timeout=5):
            print(result)
    except TimeoutError:
        print('Timeout waiting for map()')

问题是当前代码判断每个结果是否发生超时,然后整个线程池被关闭.我想完成所有的任务. 预期输出:

slept 1 seconds
slept 2 seconds
Time- out waiting for map()
slept 3 seconds

请咨询

推荐答案

不幸的是,这不是一件小事—

调用executor.map引发的超时将始终取消所有剩余任务.执行器本身被not关闭,但map调用中的所有后续任务都被取消.无论如何,concurrent. futures要"恢复"超时的worker会很麻烦(比如:如果一个worker正忙于运行前一个任务,它就不能真正重用).

无论如何,如果你需要重试超时或其他什么,这里的解决方案是自己管理超时,in the function inside worker thread:代码中引发的异常不会扰乱执行器,并且可以通过使用executor.submit并处理返回的Future对象在控制器线程上单独处理.而不是executor.map

然而,还有一个额外的问题:在Python中,在同一个线程中引起超时错误并不容易.(即工作线程必须自己"超时").

(0)如果你只想继续执行任务,并且不介意超时任务只消耗一个工作线程直到它完成,这只是一个使用.submit executor方法和处理future,而不是.map调用的问题. 下面我介绍了一些代码为这个 case .

(1)如果你的代码可以使用gallcio异步运行,那么就有gallcio. Timeout.但是,如果你的工作器可以重写为异步代码,那么你的整个项目可能会重写为异步代码,而不是开始的多线程.

或者(2)您可以对代码进行仪表化,以便它测量时间并停止处理本身—并引发TimeoutError而不是继续—这是最简单、最可靠的,但并不总是可能的.不幸的是,一个"超时decorator "不能在没有使用其他线程的情况下编写.

(3)另一层线程工作器必须创建在主线程轮询执行器的每个工作器内部—以便第一层可以管理第二层超时并在需要时重新启动内部工作器.如果第二层可以由子进程而不是线程组成,那么如果超时线程确实卡住了,就有可能通过kill then来释放资源.

(4)就像(0)一样,但是创建一个更大的执行器,用多余的工作线程,并创建一个高级替代"map"调用,当第一个线程陷入处理时,它将开始使用更多的线程.这个可以在内部使用concurrent. futures的submitwait方法.

因此,在示例代码中,其中工作者只是调用"time. sleep",例如,人们不能真正 Select 上面的建议#2:函数将记录开始时间,并在其进行计算的循环中,周期性地将当前时间与开始时间进行比较,如果超时过期,则引发超时错误.(可以写一个特殊的"睡眠"呼叫来这样做,但这将是毫无意义的).

如果你的工作人员正在处理I/O,而他们失败的原因是有时候正在获取数据的服务器本身超时,那么重写代码以使用ICPO可能真的更容易(建议#1).

上述选项#3实施起来很复杂,但可行.如果第二层可以使用子进程,那么如果超时永远运行,这可能是唯一能够释放卡住的资源的方法.(如果相反,他们只是花了比他们有用的时间,但最终结束了,这可能会更简单)

不管怎样,下面是上面选项#0的代码—如果需要的话,它可以被改进来完成我在#4或#3中的建议:

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, wait


def sleeping(i):
    time.sleep(i)

    return f'slept {i} seconds'




data = [1, 2, 9, 3]
TIMEOUT = 5

with ThreadPoolExecutor(max_workers=1) as exe:

    futures = {exe.submit(sleeping, i) for i in data}
    while futures and not all(getattr(future, "soft_cancel", False) == True for future in futures):
        done, pending = wait(futures, timeout=TIMEOUT)
        for future in done:
            print(future.result())
        futures = set()
        for future in pending:
            if started_at:=getattr(future, "started_at", None):
                if (measured:=time.monotonic() - started_at) >= TIMEOUT:
                    future.soft_cancel = True
                    print(f"{future} canceled due to timeout: {measured}")
                    future = None
            elif future.running():
                future.started_at = time.monotonic()
            if future is not None:
                print("waiting for another cycle: ", future)
                futures.add(future)

请注意,代码将"无关"的属性附加到future上,而不改变类:Python允许我们这样做,这里需要它,否则我们需要一种外部方法来将每个运行的future映射到我们这里需要的元数据(soft_cancel,started_at).这是对OOP的"滥用",但不是对动态语言的滥用.

asyncio库,不像concurrent.futures库允许一个子类Futures,这些属性可以以一种更优雅的方式添加—但它们的功能是相同的.

Python相关问答推荐

剧作家Python没有得到回应

Pandas 填充条件是另一列

如何比较numPy数组中的两个图像以获取它们不同的像素

难以在Manim中正确定位对象

'discord.ext. commanders.cog没有属性监听器'

在Pandas DataFrame操作中用链接替换'方法的更有效方法

当从Docker的--env-file参数读取Python中的环境变量时,每个\n都会添加一个\'.如何没有额外的?

如何在Polars中从列表中的所有 struct 中 Select 字段?

迭代嵌套字典的值

如何更新pandas DataFrame上列标题的de值?

Pandas GroupBy可以分成两个盒子吗?

为什么\b在这个正则表达式中不解释为反斜杠

如何在Python中使用Pandas将R s Tukey s HSD表转换为相关矩阵''

使用Python查找、替换和调整PDF中的图像'

在numpy数组中寻找楼梯状 struct

GPT python SDK引入了大量开销/错误超时

一个telegram 机器人应该发送一个测验如何做?""

Python协议不兼容警告

为什么我的scipy.optimize.minimize(method=";newton-cg";)函数停留在局部最大值上?

如何在基于时间的数据帧中添加计算值