不幸的是,这不是一件小事—
调用executor.map
引发的超时将始终取消所有剩余任务.执行器本身被not关闭,但map
调用中的所有后续任务都被取消.无论如何,concurrent. futures要"恢复"超时的worker会很麻烦(比如:如果一个worker正忙于运行前一个任务,它就不能真正重用).
无论如何,如果你需要重试超时或其他什么,这里的解决方案是自己管理超时,in the function inside worker thread:代码中引发的异常不会扰乱执行器,并且可以通过使用executor.submit
并处理返回的Future对象在控制器线程上单独处理.而不是executor.map
然而,还有一个额外的问题:在Python中,在同一个线程中引起超时错误并不容易.(即工作线程必须自己"超时").
(0)如果你只想继续执行任务,并且不介意超时任务只消耗一个工作线程直到它完成,这只是一个使用.submit
executor方法和处理future,而不是.map
调用的问题. 下面我介绍了一些代码为这个 case .
(1)如果你的代码可以使用gallcio异步运行,那么就有gallcio. Timeout.但是,如果你的工作器可以重写为异步代码,那么你的整个项目可能会重写为异步代码,而不是开始的多线程.
或者(2)您可以对代码进行仪表化,以便它测量时间并停止处理本身—并引发TimeoutError而不是继续—这是最简单、最可靠的,但并不总是可能的.不幸的是,一个"超时decorator "不能在没有使用其他线程的情况下编写.
(3)另一层线程工作器必须创建在主线程轮询执行器的每个工作器内部—以便第一层可以管理第二层超时并在需要时重新启动内部工作器.如果第二层可以由子进程而不是线程组成,那么如果超时线程确实卡住了,就有可能通过kill then来释放资源.
(4)就像(0)一样,但是创建一个更大的执行器,用多余的工作线程,并创建一个高级替代"map"调用,当第一个线程陷入处理时,它将开始使用更多的线程.这个可以在内部使用concurrent. futures的submit
和wait
方法.
因此,在示例代码中,其中工作者只是调用"time. sleep",例如,人们不能真正 Select 上面的建议#2:函数将记录开始时间,并在其进行计算的循环中,周期性地将当前时间与开始时间进行比较,如果超时过期,则引发超时错误.(可以写一个特殊的"睡眠"呼叫来这样做,但这将是毫无意义的).
如果你的工作人员正在处理I/O,而他们失败的原因是有时候正在获取数据的服务器本身超时,那么重写代码以使用ICPO可能真的更容易(建议#1).
上述选项#3实施起来很复杂,但可行.如果第二层可以使用子进程,那么如果超时永远运行,这可能是唯一能够释放卡住的资源的方法.(如果相反,他们只是花了比他们有用的时间,但最终结束了,这可能会更简单)
不管怎样,下面是上面选项#0的代码—如果需要的话,它可以被改进来完成我在#4或#3中的建议:
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, wait
def sleeping(i):
time.sleep(i)
return f'slept {i} seconds'
data = [1, 2, 9, 3]
TIMEOUT = 5
with ThreadPoolExecutor(max_workers=1) as exe:
futures = {exe.submit(sleeping, i) for i in data}
while futures and not all(getattr(future, "soft_cancel", False) == True for future in futures):
done, pending = wait(futures, timeout=TIMEOUT)
for future in done:
print(future.result())
futures = set()
for future in pending:
if started_at:=getattr(future, "started_at", None):
if (measured:=time.monotonic() - started_at) >= TIMEOUT:
future.soft_cancel = True
print(f"{future} canceled due to timeout: {measured}")
future = None
elif future.running():
future.started_at = time.monotonic()
if future is not None:
print("waiting for another cycle: ", future)
futures.add(future)
请注意,代码将"无关"的属性附加到future上,而不改变类:Python允许我们这样做,这里需要它,否则我们需要一种外部方法来将每个运行的future映射到我们这里需要的元数据(soft_cancel,started_at).这是对OOP的"滥用",但不是对动态语言的滥用.
asyncio
库,不像concurrent.futures
库允许一个子类Futures,这些属性可以以一种更优雅的方式添加—但它们的功能是相同的.