我有一个小实用程序,可以并行使用asyncio
调用同步代码.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
if loop.is_running():
print('running loop scheduling there')
# implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
loop)
print('got the future')
res = future.result()
print('got the result')
return res
else:
return loop.run_until_complete(call_many_async(fun, many_kwargs))
而且在Python中使用时效果良好
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
从Python中运行没有任何问题,但我在Inbox中使用它时遇到了问题,我只是得到
running loop scheduling there
got the future
它永远挂着.
本来我刚刚
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(call_many_async(fun, many_kwargs))
但我遇到了错误
RuntimeError: This event loop is already running
如何解决?
当然
results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4
有效,但我想使用call_many
作为我将从jupyter笔记本中调用的更大代码库的一部分.
我已经阅读了https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7,但没有找到解决方案,因为我不想直接从jupyter笔记本单元调用同步代码,而是从同步代码调用.
我想避免使用async def call_many(fun, many_kwargs)
的解决方案,因为重点是能够使用从多个地方调用此函数的代码,而不需要具有相同的同步和同步等效功能.
我看过How do I run Python asyncio code in a Jupyter notebook?,但这解释了如何直接调用Delivercio代码,我在上面解释了我对此不感兴趣.