编写的代码只在调用端点时触发其他服务器,并等待来自Redis中特定通道的数据进入.我不想知道外部服务器的业务逻辑是由于延迟而完成的.

下面的call_external_server后台任务函数将通知外部服务器向Redis(发布/订阅)发送数据.然而,它不会执行.

以下是我的代码:

async def call_external_server(channel, text):
    print("call_external_server start")
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://localhost:9000/pub?channel={channel}&text={text}") as resp:
            print(resp)
    print("call_external_server finished")
    return {"response": "external_server is done"}


@app.websocket("/ws")
async def websocket_endpoint(channel: str, websocket: WebSocket, background_task: BackgroundTasks):
    await websocket.accept()
    client_info = dict(websocket.headers)
    text = client_info.get("text")
    redis_reader: redis.client.PubSub = await get_redis_pubsub()
    await redis_reader.subscribe(channel)
    

    # Problem is Here 
    background_task.add_task(call_external_server, channel, text)
    # Background task Doesn't work properly 

    try:
        while True:
            message = await redis_reader.get_message(ignore_subscribe_messages=True)
            if message is not None:
                decoded_msg = message["data"].decode()
                if decoded_msg == STOPWORD:
                    print("(Reader) STOP")
                    break
                await websocket.send_text(decoded_msg)
    except Exception as e:
        print(e)
        await websocket.close()
        return
    await websocket.close()
    return

推荐答案

BackgroundTask被执行after returning a response到客户端request(not在正在进行的websocket连接上)-关于后台任务的更多细节,参见this answer以及this answer和这related comment.AS explained by @tiangolo(FastAPI的创建者):

内部后台任务depend on a 100,它们是 执行after将该请求返回给客户端.在以下情况下 WebSocket,因为WebSocket还没有真正"完成",而是is an ongoing connection,这不是一个后台任务.


You could, however, have functions executed in the background, using one of the options described in this answer. If your background task is an async def function, you should rather use Option 3 (see the linked answer above for more details), i.e., using asyncio.create_task(). Have a look at this related answer as well.

示例

async def call_external_server(channel, text):
    pass


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
   await websocket.accept()
   # ...
   asyncio.create_task(call_external_server(channel, text))

我还建议查看this answer,以及this answerthis answer,了解如何在每次需要时使用spawn an HTTP Client once at application startup and reuse it,而不是每次调用终结点时都创建一个新连接(如您的问题所示).

Python相关问答推荐

使用numpy提取数据块

滚动和,句号来自Pandas列

如何将双框框列中的成对变成两个新列

OR—Tools中CP—SAT求解器的IntVar设置值

如何使用Python以编程方式判断和检索Angular网站的动态内容?

对象的`__call__`方法的setattr在Python中不起作用'

如何在UserSerializer中添加显式字段?

如何在图中标记平均点?

字符串合并语法在哪里记录

在两极中过滤

如何使regex代码只适用于空的目标单元格

寻找Regex模式返回与我当前函数类似的结果

matplotlib图中的复杂箭头形状

比Pandas 更好的 Select

Python协议不兼容警告

我可以同时更改多个图像吗?

奇怪的Base64 Python解码

大Pandas 中的群体交叉融合

为什么在安装了64位Python的64位Windows 10上以32位运行?

Fake pathlib.使用pyfakefs的类变量中的路径'