我有一个这样的WebSocket连接管理器:

class ConnectionManager:
    def __init__(self) -> None:
        self.connections  = {}
 
    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket

    async def disconnect(self, user_id):
        websocket: WebSocket = self.connections[user_id]
        await websocket.close()
        del self.connections[user_id]

    async def send_messages(self, user_ids, message):
        for user_id in user_ids:
            websocket: WebSocket = self.connections[user_id]
            await websocket.send_json(message

和WebSocket路由:

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
    user_id = redis.get(token)
    if user_id:
        redis.expire(user_id)
    else:
        raise redis_error
    
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

我想存储用户连接,当Redis pubsub message通过时,处理消息,然后向一些用户发送WebSocket message.处理消息的模块不是Fastapi应用程序的一部分.

我试图通过实现threadingasyncioFastapi应用程序中实现这一点,但它们会中断Fastapi应用程序本身.

我如何在Fastapi application之外触发WebSocket objects的发送消息?

What I've tried:

redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
         # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

但我从redis那里得到了pubSub错误,它说我还没有订阅,除非我这样做:


@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    redis = Redis(redis_host, redis_port)
    pubsub = redis.pubsub()
    pubsub.subscribe("channel_signal")

    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
        # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

但是这会 for each 连接到WebSocket的用户创建一个redis连接,有没有为所有用户全局定义一个redis连接的方法呢?

Update 1:

在Vonc Answer中,我写道:

from fastapi import FastAPI, WebSocket, WebSocketException
from v1.endpoints.user.auth import router as auth_router
from v1.endpoints.signals import router as signals_router
from configs.connection_config import redis_host, redis_port
import redis.asyncio as aioredis
import threading
import asyncio
import uuid

app = FastAPI()
app.include_router(auth_router, prefix="/users/auth", tags = ["auth"])
app.include_router(signals_router, prefix="/signals", tags = ["signals"])



class ConnectionManager:
    last_message = ""
    def __init__(self) -> None:
        self.connections  = {}
 
    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket

    async def disconnect(self, user_id):
        websocket: WebSocket = self.connections[user_id]
        await websocket.close()
        del self.connections[user_id]

    async def send_messages(self, user_ids, message):
        for user_id in user_ids:
            websocket: WebSocket = self.connections[user_id]
            await websocket.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    try:
        await manager.connect(str(uuid.uuid4()), websocket)
    except WebSocketException:
        await manager.disconnect(str(uuid.uuid4()))

redis_client = None
@app.on_event("startup")
async def startup_event_connect_redis():
    global redis_client
    redis_client = aioredis.Redis(host=redis_host, port=redis_port)

def listen_to_redis():
    pubsub = redis_client.pubsub()
    pubsub.subscribe("channel_signal")
    while True:
        message = pubsub.get_message(ignore_subscribe_messages=True)
        if message:
            print(message["data"])

@app.on_event("startup")
async def startup_event_listen_redis():
    # Starting the separate thread to listen to Redis Pub/Sub messages
    threading.Thread(target=listen_to_redis, daemon=True).start()

因为aioredis在使用aioredisredispy版本时已被弃用.

我对这个部分有一个问题:

while True:
   message = pubsub.get_message(ignore_subscribe_messages=True)
   if message:
       print("hi")

if message总是实现的,并且at连续打印hi,因此任何事件都将被无限期地触发.

Update 2:

虽然我不能完全测试答案,因为fastapi的另一个问题(我将在这里为它打开一个新的线程),更新添加到实现的为所有用户定义一个全局redis连接并独立于fast api生命周期监听它的答案中,但我不得不使用实际的aioredis库而不是redispy版本.

推荐答案

要实现全局Redis连接并在FastAPI应用程序中跨不同的WebSocket连接使用它,您应该考虑创建一个shared Redis客户端,该客户端在启动FastAPI应用程序时启动.

要独立于FastAPI请求-响应生命周期来处理Redis发布订阅消息,您可以创建一个单独的线程或进程来监听Redis发布订阅消息并相应地触发WebSocket消息.

from fastapi import FastAPI, WebSocket
from aioredis import create_redis_pool
import threading
import json
import asyncio

app = FastAPI()

# Connection manager class definition (place your existing implementation here)
class ConnectionManager:
    # same as your code

# Global variables
manager = ConnectionManager()
redis_client = None

# Step 0: WebSocket Connection Request (Client)
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # (handle received messages if necessary)
    except:
        await manager.disconnect(user_id)

# Step 1: Global Redis Client and Connection Manager
@app.on_event("startup")
async def startup_event():
    global redis_client
    redis_client = await create_redis_pool("redis://localhost:6379")

# Step 2: Separate Thread for Redis Pub/Sub Listener
def listen_to_redis():
    pubsub = redis_client.pubsub()
    pubsub.subscribe("channel_signal")
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            process_redis_message(data)

@app.on_event("startup")
async def startup_event_2():
    # Starting the separate thread to listen to Redis Pub/Sub messages
    threading.Thread(target=listen_to_redis, daemon=True).start()

# Step 4: Processing Redis Messages
def process_redis_message(data):
    user_ids = data.get('user_ids')
    content = data.get('content')
    if user_ids and content:
        send_websocket_messages(user_ids, content)

# Step 5: Sending WebSocket Messages
def send_websocket_messages(user_ids, content):
    for user_id in user_ids:
        websocket = manager.connections.get(user_id)
        if websocket:
            asyncio.run(websocket.send_text(content))

从FastAPI应用程序外部向WebSocket连接发送消息的方式是通过Redis pubSub消息,遵循以下工作流程:

  External System           FastAPI App                       Redis Server
        |                         |                                   |
        |                         |                                   |
        |                         |--------(1) Initialize ----------->|
        |                         |        Redis Client               |
        |                         |                                   |
        |                         |-------(2) Start Thread ---------->|
        |                         |       (listen_to_redis)           |
        |                         |                                   |
 (0) WebSocket                    |                                   |
 Connection --------------------> |                                   |
 Request (Client)                 |                                   |
        |                         |                                   |
        |                         |                                   |
        |----(3) Publish Msg----->|                                   |
        |  to Redis channel       |                                   |
        |                         |                                   |
        |                         |                                   |
        |                         |<-------(4) Redis Listener --------|
        |                         |      (receive & process msg)      |
        |                         |                                   |
        |                         |                                   |
        |                         |------(5) WebSocket Msg ---------->|
        |                         |       (send to users)             |
        |                         |                                   |
  1. WebSocket Connection Request (Client):客户端通过向FastAPI应用程序中的WebSocket端点发送请求来启动WebSocket连接.

    Inside this endpoint, you use the connection manager to manage (store, retrieve, delete) these WebSocket connections.
    That step sets up the necessary infrastructure to manage WebSocket connections, which are used in step 5 to send messages to the connected clients.

    from fastapi import FastAPI, WebSocket
    
    app = FastAPI()
    manager = ConnectionManager()  # Make sure to define ConnectionManager class
    
    @app.websocket("/ws/{token}")
    async def websocket_endpoint(websocket: WebSocket, token: str):
        user_id = await redis.get(token)
        if user_id is None:
            # Handle unauthorized access
            return
    
        await manager.connect(user_id.decode('utf-8'), websocket)  # Assuming user_id is stored as bytes in Redis
        try:
            while True:
                data = await websocket.receive_text()
                # Handle incoming WebSocket messages (if any)
        except:
            await manager.disconnect(user_id.decode('utf-8'))
    
  2. Global Redis Client and Connection Manager:在您的FastAPI应用程序开始时建立一个全局Redis客户端和WebSocket连接管理器.连接管理器跟踪所有活动的WebSocket连接.

    from fastapi import FastAPI, WebSocket, Depends
    from aioredis import create_redis_pool
    
    app = FastAPI()
    redis_client = None
    manager = ConnectionManager()
    
    class ConnectionManager:
        # (same as your existing implementation)
    

    在您的FastAPI应用程序启动时初始化Redis客户端:

    @app.on_event("startup")
    async def startup_event():
        global redis
        redis_client = await create_redis_pool("redis://localhost:6379")  # replace with your Redis server details
    

    (我使用的是aioredis,这是一个Redis客户端,专为使用Python的Asyncio而设计.根据您正在使用的Redis客户端库调整Redis客户端的实例化和使用情况.)

  3. Separate Thread for Redis Pub/Sub Listener、单独发起一个线程,持续监听Redis发布订阅消息.

    That thread operates independently of the FastAPI application, in the sense that it is not bound to any specific route or endpoint in your FastAPI app.
    It can receive and process messages even if they are published from outside the FastAPI application - potentially from another service or application that has access to the same Redis instance.

    import threading
    
    def listen_to_redis():
        global redis
        pubsub = redis.pubsub()
        pubsub.subscribe("channel_signal")
    
        while True:
            message = pubsub.get_message(ignore_subscribe_messages=True)
            if message:
                user_ids = process_redis_message(message)  # Step 4 function called here
                send_websocket_messages(user_ids, message)  # Step 5 function called here
    
    # Starting the separate thread to listen to Redis Pub/Sub messages
    threading.Thread(target=listen_to_redis, daemon=True).start()
    
  4. Publishing Messages to Redis from Outside the FastAPI App:任何外部系统或服务想要向您的FastAPI应用程序管理的WebSocket连接发送消息,都可以通过将消息发布到您的FastAPI应用程序订阅的Redis频道(在本例中为"channel_signal")来实现.

    该外部系统/服务不需要与您的FastAPI应用程序直接交互;它只需要能够将消息发布到Redis通道.

    类似于:

    import redis
    
    redis_host = "your_redis_server_host"  # Replace with your Redis server host
    redis_port = "your_redis_server_port"  # Replace with your Redis server port
    
    client = redis.Redis(host=redis_host, port=redis_port)
    
    channel_name = "channel_signal"
    message = {"user_ids": ["123", "456"], "content": "Hello, World!"}
    
    client.publish(channel_name, json.dumps(message))
    
  5. Processing Redis Messages:当一条消息发布到Redis通道时,运行listen_to_redis函数的单独线程将获取该消息.

    在该函数中,您将实现处理Redis消息的逻辑,并确定消息应该发送到的user_ids个WebSocket连接.

    def process_redis_message(data):
        user_ids = data.get('user_ids')
        content = data.get('content')
        if user_ids and content:
            send_websocket_messages(user_ids, content)
    
  6. Sending WebSocket Messages:一旦识别了user_ids,listen_to_redis函数就调用连接管理器的send_messages方法将消息发送到适当的WebSocket连接.

    该方法迭代所标识的user_ids并将消息发送到每个对应的WebSocket连接.

    def send_websocket_messages(user_ids, content):
        for user_id in user_ids:
            websocket = manager.connections.get(user_id)
            if websocket:
                asyncio.run(websocket.send_text(content))
    

因此,要触发从FastAPI应用程序外部发送的WebSocket消息:

  • 外部系统/服务向Redis频道发布消息.
  • 在单独的线程中运行的listen_to_redis函数接收和处理此消息.
  • 然后,通过连接管理器的send_messages方法将处理后的消息发送到适当的WebSocket连接.

该机制应该允许消息通过Redis发布/订阅的中介从FastAPI应用程序外部发送到WebSocket连接.


我对这个部分有一个问题:

while True:
  message = pubsub.get_message(ignore_subscribe_messages=True)
  if message:
      print("hi")

if message总是实现的,并且at连续打印hi,因此任何事件都将被无限期地触发.

即使在没有消息可用的情况下,listen_to_redis函数,特别是pubsub.get_message(ignore_subscribe_messages=True)似乎总是返回非None的值,从而导致if message:语句的计算结果为True.

Your function listen_to_redis seems to be synchronous, while the rest of your FastAPI application is asynchronous.
And the get_message() (message loop) function is non-blocking, meaning that it will immediately return None if no message is available, leading to high CPU usage in a while True loop.

对于像处理WebSocket和Redis发布/订阅这样的异步操作,最好使用asyncio.您可以像这样异步收听Redis频道:

from asyncio import run

async def listen_to_redis():
    pubsub = await redis_client.subscribe("channel_signal")
    channel = pubsub[0]

    while await channel.wait_message():
        message = await channel.get(encoding="utf-8")
        print("Message:", message)

收听Redis的启动事件也可以是异步的,确保了与FastAPI更好的兼容性.

@app.on_event("startup")
async def startup_event_listen_redis():
    asyncio.create_task(listen_to_redis())

我们的目标是让您的Redis pub/subb能够异步运行,并且更好地适应FastAPI生态系统.请注意,这将需要使用与FastAPI的异步功能兼容的异步Redis客户端.

Python相关问答推荐

如何修复使用turtle和tkinter制作的绘画应用程序的撤销功能

拆分pandas列并创建包含这些拆分值计数的新列

将轨迹优化问题描述为NLP.如何用Gekko解决这个问题?当前面临异常:@错误:最大方程长度错误

Python多处理:当我在一个巨大的pandas数据框架上启动许多进程时,程序就会陷入困境

max_of_three使用First_select、second_select、

重新匹配{ }中包含的文本,其中文本可能包含{{var}

在Pandas DataFrame操作中用链接替换'方法的更有效方法

切片包括面具的第一个实例在内的眼镜的最佳方法是什么?

如何更改分组条形图中条形图的 colored颜色 ?

如何在WSL2中更新Python到最新版本(3.12.2)?

pandas:排序多级列

为什么Django管理页面和我的页面的其他CSS文件和图片都找不到?'

如何在FastAPI中为我上传的json文件提供索引ID?

在二维NumPy数组中,如何 Select 内部数组的第一个和第二个元素?这可以通过索引来实现吗?

处理Gekko的非最优解

递归函数修饰器

如何将相同组的值添加到嵌套的Pandas Maprame的倒数第二个索引级别

每次查询的流通股数量

PYTHON中的pd.wide_to_long比较慢

启动线程时,Python键盘模块冻结/不工作