我正在开发一个库,它应该支持同步和异步用户,同时最大限度地减少库中的代码重复.理想的情况是实现库异步(因为库进行远程API调用),并通过包装器添加对同步的支持.请考虑将以下函数作为库的一部分.
# Library code
async def internal_function():
"""Internal part of my library doing HTTP call"""
pass
我的 idea 是提供一个包装器来检测库的用户是否使用异步.
# Library code
def api_call():
"""Public api of my library"""
if asyncio.get_event_loop().is_running():
# Async caller
return internal_function() # This returns a coroutine, so the caller needs to await it
# Sync caller, so we need to run the coroutine in a blocking way
return asyncio.run(internal_function())
起初,这似乎是解决之道.有了这个,我就可以支持
- 从异步函数调用该函数的用户,
- 从笔记本(也是异步)调用该函数的用户,以及
- 从纯Python脚本调用该函数的用户(这里它回退到阻塞
asyncio.run
).
但是,在某些情况下,函数是从事件循环内调用的,但直接调用方是同步函数.
# Code from users of the library
async def entrypoint():
"""This entrypoint is called in an event loop, e.g. within fastlib"""
return legacy_sync_code() # Call to sync function
def legacy_sync_code():
"""Legacy code that does not support async"""
# Call to library code from sync function,
# expect value not coroutine, could not use await
api_response = api_call()
return api_response.json() # Needs to be value, not coroutine
在最后一行中,json()
的调用失败.api_call()
错误地推断调用方可以等待响应,并返回协程.
为了支持这类用户,我需要
- 识别直接调用函数是否是同步且期望值而不是路由,
- 有办法以阻塞的方式等待
internal_function()
的结果.使用asyncio.run
只能在普通的Python脚本中工作,如果代码是从堆栈跟踪中更高层的事件循环中调用的,则会失败.
如果我的库提供两个函数(例如,api_call_async()
和api_call_sync()
),则可以减轻第一点.
我希望我的观点足够清楚.我不明白为什么这从根本上来说是不可能的,然而,如果Python的设计不允许我以完全透明的方式支持同步和异步用户,我可以接受.