背景:
我们有一个微服务,它使用(订阅)来自50多个RabbitMQ队列的消息.
为这个队列生成消息发生在两个地方
-
应用程序进程在遇到短时间延迟执行的业务逻辑(如发送邮箱或通知另一个服务)时,应用程序会直接将消息发送到exchange(exchange反过来会将消息发送到队列).
-
当我们遇到长时间/延迟执行业务逻辑时,我们有
messages
个表,其中包含一段时间后必须执行的消息条目.
现在我们有了cron worker,它每10分钟运行一次,扫描messages
个表并将消息推送到RabbitMQ.
脚本:
假设messages表中有10000条消息,它们将在下一次cron运行时排队,
- 上午9:00-Cron worker运行,并将10000条消息排入RabbitMQ队列.
- 我们确实有一些用户正在监听队列并开始使用消息,但由于系统中的一些问题或第三方响应时间延迟,每条消息需要
1 Min
分钟才能完成. - 上午9点10分-现在cron worker再次运行接下来的10分钟,看到还有9000多条消息尚未完成,时间也过了,所以它再次将9000多条重复消息推送到队列中.
Note: The subscribers which consumes the messages are idempotent, so there is no issue in duplicate processing
我脑子里有个设计 idea ,但不是最好的逻辑
我可以有4种状态(RequiresQueuing、Queued、Completed、Failed)
- 无论何时插入信息,我都可以将状态设置为
RequiresQueuing
- 接下来,当cron-worker成功地拾取并将消息推送到队列时,我可以将其设置为
Queued
- 订阅服务器完成后,将队列状态标记为
Completed / Failed
.
上述逻辑存在一个问题,比如RabbitMQ以某种方式出现故障,或者在某些使用中,我们需要清除队列以进行维护.
现在标记为Queued
的消息处于错误状态,因为它们必须再次被识别,并且需要手动更改状态.
另一个例子
假设我将RabbitMQ队列命名为(事件)
此事件队列有5个订阅者,每个订阅者从队列中获取1条消息,并使用REST API将此事件发布到另一个微服务(事件聚合器).每次API调用通常需要50毫秒.
用例:
- 由于高负载,生成的事件数变为3倍.
- 此外,接受事件的微服务(事件聚合器)的处理速度也变慢,响应时间从50毫秒增加到1分钟.
- Cron workers遵循上面提到的设计,每分钟将消息排队一次.现在队列变得太大了,但我也不能增加订阅者的数量,因为依赖的微服务(事件聚合器)也落后了.
现在的问题是,如果继续将消息发送到事件队列,这只会使队列inflating .
https://www.rabbitmq.com/memory.html-在阅读本页时,我发现rabbitmq甚至不会接受高水印分数(默认值为40%)的连接.当然这是可以改变的,但这需要人工干预.
因此,如果队列长度增加,它会影响rabbitmq内存,这就是我考虑在生产者级别进行节流的原因.
问题
- 我如何限制我的cron工作者跳过特定的运行,或者以某种方式判断队列,并确定它已被重载,因此不推送消息?
- 我如何处理上面提到的用例?有解决我问题的设计吗?有人面临同样的问题吗?
提前谢谢.
答复
使用queueCount判断节流的已接受答案注释