我有一个这样的WebSocket
连接管理器:
class ConnectionManager:
def __init__(self) -> None:
self.connections = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
async def disconnect(self, user_id):
websocket: WebSocket = self.connections[user_id]
await websocket.close()
del self.connections[user_id]
async def send_messages(self, user_ids, message):
for user_id in user_ids:
websocket: WebSocket = self.connections[user_id]
await websocket.send_json(message
和WebSocket路由:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
user_id = redis.get(token)
if user_id:
redis.expire(user_id)
else:
raise redis_error
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
我想存储用户连接,当Redis pubsub message
通过时,处理消息,然后向一些用户发送WebSocket message
.处理消息的模块不是Fastapi
应用程序的一部分.
我试图通过实现threading
和asyncio
在Fastapi
应用程序中实现这一点,但它们会中断Fastapi
应用程序本身.
我如何在Fastapi application
之外触发WebSocket objects
的发送消息?
What I've tried:个
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
但我从redis那里得到了pubSub错误,它说我还没有订阅,除非我这样做:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
但是这会 for each 连接到WebSocket的用户创建一个redis连接,有没有为所有用户全局定义一个redis连接的方法呢?
Update 1:个
在Vonc Answer中,我写道:
from fastapi import FastAPI, WebSocket, WebSocketException
from v1.endpoints.user.auth import router as auth_router
from v1.endpoints.signals import router as signals_router
from configs.connection_config import redis_host, redis_port
import redis.asyncio as aioredis
import threading
import asyncio
import uuid
app = FastAPI()
app.include_router(auth_router, prefix="/users/auth", tags = ["auth"])
app.include_router(signals_router, prefix="/signals", tags = ["signals"])
class ConnectionManager:
last_message = ""
def __init__(self) -> None:
self.connections = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
async def disconnect(self, user_id):
websocket: WebSocket = self.connections[user_id]
await websocket.close()
del self.connections[user_id]
async def send_messages(self, user_ids, message):
for user_id in user_ids:
websocket: WebSocket = self.connections[user_id]
await websocket.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def ws(websocket: WebSocket):
try:
await manager.connect(str(uuid.uuid4()), websocket)
except WebSocketException:
await manager.disconnect(str(uuid.uuid4()))
redis_client = None
@app.on_event("startup")
async def startup_event_connect_redis():
global redis_client
redis_client = aioredis.Redis(host=redis_host, port=redis_port)
def listen_to_redis():
pubsub = redis_client.pubsub()
pubsub.subscribe("channel_signal")
while True:
message = pubsub.get_message(ignore_subscribe_messages=True)
if message:
print(message["data"])
@app.on_event("startup")
async def startup_event_listen_redis():
# Starting the separate thread to listen to Redis Pub/Sub messages
threading.Thread(target=listen_to_redis, daemon=True).start()
因为aioredis
在使用aioredis
的redispy
版本时已被弃用.
我对这个部分有一个问题:
while True:
message = pubsub.get_message(ignore_subscribe_messages=True)
if message:
print("hi")
if message
总是实现的,并且at连续打印hi
,因此任何事件都将被无限期地触发.
Update 2:个
虽然我不能完全测试答案,因为fastapi
的另一个问题(我将在这里为它打开一个新的线程),更新添加到实现的为所有用户定义一个全局redis连接并独立于fast api生命周期监听它的答案中,但我不得不使用实际的aioredis
库而不是redispy
版本.