我有一个小实用程序,可以并行使用asyncio调用同步代码.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        print('running loop scheduling there')
        # implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
        future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
                                                loop)
        print('got the future')
        res = future.result()
        print('got the result')
        return res
    else:
        return loop.run_until_complete(call_many_async(fun, many_kwargs))

而且在Python中使用时效果良好

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

从Python中运行没有任何问题,但我在Inbox中使用它时遇到了问题,我只是得到

running loop scheduling there
got the future

它永远挂着.

本来我刚刚

def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(call_many_async(fun, many_kwargs))

但我遇到了错误

RuntimeError: This event loop is already running

如何解决?

当然

results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4

有效,但我想使用call_many作为我将从jupyter笔记本中调用的更大代码库的一部分. 我已经阅读了https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7,但没有找到解决方案,因为我不想直接从jupyter笔记本单元调用同步代码,而是从同步代码调用.

我想避免使用async def call_many(fun, many_kwargs)的解决方案,因为重点是能够使用从多个地方调用此函数的代码,而不需要具有相同的同步和同步等效功能.

我看过How do I run Python asyncio code in a Jupyter notebook?,但这解释了如何直接调用Delivercio代码,我在上面解释了我对此不感兴趣.

推荐答案

jupyter在主线程中运行自己的eventloop,这就是为什么您可以直接从jupyter调用await,也可以调用asyncio.run_coroutine_threadsafe个状态:

此函数旨在从与 事件循环正在运行的那个.

所以在同一个头上等待它是一个僵局.我可能会在另一个线程中运行您的eventloop.

import threading
import asyncio

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    result = None
    def run_func():
        nonlocal result
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(call_many_async(fun, many_kwargs))
    thread = threading.Thread(target=run_func)
    thread.start()
    thread.join()
    return result

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

否则,您完全可以将async个调用从jupyter的主执行点一直堆叠到您的代码,方法是将所有函数分别设置为asyncawait.

import asyncio

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
async def call_many(fun, many_kwargs):
    return await call_many_async(fun, many_kwargs)

fut = call_many(something_complex, ({"param": i} for i in range(1, 5)))
results = await fut

最后,如果您只使用to_thread,您可能只需创建并使用ThreadPoolExecutor并直接调用其.map函数,而不必创建或使用事件循环.

from concurrent.futures import ThreadPoolExecutor

def call_many(fun, many_kwargs):
    result = None
    with ThreadPoolExecutor() as pool:
          return list(pool.map(fun, many_kwargs))

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

Python相关问答推荐

GEKKO:已知延迟的延迟系统的参数估计

如何编写一个正规表达式来查找序列中具有2个或更多相同辅音的所有单词

NumPy中的右矩阵划分,还有比NP.linalg.inv()更好的方法吗?

如何销毁框架并使其在tkinter中看起来像以前的样子?

三个给定的坐标可以是矩形的点吗

使用numpy提取数据块

如何将ctyles.POINTER(ctyles.c_float)转换为int?

当使用keras.utils.Image_dataset_from_directory仅加载测试数据集时,结果不同

如何使用Python以编程方式判断和检索Angular网站的动态内容?

如何从数据库上传数据到html?

ThreadPoolExecutor和单个线程的超时

计算天数

Python中的变量每次增加超过1

如果初始groupby找不到满足掩码条件的第一行,我如何更改groupby列,以找到它?

在Python中计算连续天数

处理Gekko的非最优解

从源代码显示不同的输出(机器学习)(Python)

无法在Spyder上的Pandas中将本地CSV转换为数据帧

使用类型提示进行类型转换

使用python playwright从 Select 子菜单中 Select 值