背景:

我们有一个微服务,它使用(订阅)来自50多个RabbitMQ队列的消息.

为这个队列生成消息发生在两个地方

  1. 应用程序进程在遇到短时间延迟执行的业务逻辑(如发送邮箱或通知另一个服务)时,应用程序会直接将消息发送到exchange(exchange反过来会将消息发送到队列).

  2. 当我们遇到长时间/延迟执行业务逻辑时,我们有messages个表,其中包含一段时间后必须执行的消息条目.

现在我们有了cron worker,它每10分钟运行一次,扫描messages个表并将消息推送到RabbitMQ.

脚本:

假设messages表中有10000条消息,它们将在下一次cron运行时排队,

  1. 上午9:00-Cron worker运行,并将10000条消息排入RabbitMQ队列.
  2. 我们确实有一些用户正在监听队列并开始使用消息,但由于系统中的一些问题或第三方响应时间延迟,每条消息需要1 Min分钟才能完成.
  3. 上午9点10分-现在cron worker再次运行接下来的10分钟,看到还有9000多条消息尚未完成,时间也过了,所以它再次将9000多条重复消息推送到队列中.

Note: The subscribers which consumes the messages are idempotent, so there is no issue in duplicate processing

我脑子里有个设计 idea ,但不是最好的逻辑

我可以有4种状态(RequiresQueuing、Queued、Completed、Failed)

  1. 无论何时插入信息,我都可以将状态设置为RequiresQueuing
  2. 接下来,当cron-worker成功地拾取并将消息推送到队列时,我可以将其设置为Queued
  3. 订阅服务器完成后,将队列状态标记为Completed / Failed.

上述逻辑存在一个问题,比如RabbitMQ以某种方式出现故障,或者在某些使用中,我们需要清除队列以进行维护.

现在标记为Queued的消息处于错误状态,因为它们必须再次被识别,并且需要手动更改状态.

另一个例子

假设我将RabbitMQ队列命名为(事件)

此事件队列有5个订阅者,每个订阅者从队列中获取1条消息,并使用REST API将此事件发布到另一个微服务(事件聚合器).每次API调用通常需要50毫秒.

用例:

  1. 由于高负载,生成的事件数变为3倍.
  2. 此外,接受事件的微服务(事件聚合器)的处理速度也变慢,响应时间从50毫秒增加到1分钟.
  3. Cron workers遵循上面提到的设计,每分钟将消息排队一次.现在队列变得太大了,但我也不能增加订阅者的数量,因为依赖的微服务(事件聚合器)也落后了.

现在的问题是,如果继续将消息发送到事件队列,这只会使队列inflating .

https://www.rabbitmq.com/memory.html-在阅读本页时,我发现rabbitmq甚至不会接受高水印分数(默认值为40%)的连接.当然这是可以改变的,但这需要人工干预.

因此,如果队列长度增加,它会影响rabbitmq内存,这就是我考虑在生产者级别进行节流的原因.

问题

  1. 我如何限制我的cron工作者跳过特定的运行,或者以某种方式判断队列,并确定它已被重载,因此不推送消息?
  2. 我如何处理上面提到的用例?有解决我问题的设计吗?有人面临同样的问题吗?

提前谢谢.

答复

使用queueCount判断节流的已接受答案注释

推荐答案

您可以结合QoS(服务质量)和手动确认来解决这个问题.

假设您有1个发布者和5个工作脚本.假设这些数据来自同一个队列.每个工作脚本需要1分钟来处理一条消息.您可以在通道级别设置QoS.如果将其设置为1,那么在这种情况下,每个工作脚本将只分配一条消息.因此,我们一次处理5条消息.在5个工作脚本之一执行手动确认之前,不会传递新消息.

如果要提高消息处理的吞吐量,可以增加工作 node 数.

基于消息状态更新表的 idea 不是一个好的 Select ,DB轮询是系统使用队列的主要原因,它会导致扩展问题.在某一点上,您必须更新这些表,并且由于锁定和隔离级别,您可能会遇到瓶颈.

Node.js相关问答推荐

无法在我的 node 项目中转让Google Drive v3 API中的所有权

如何在不丢失其他键的情况下解开子文档数组,然后反转该过程?

构建期间 Docker 容器中的 npm 安装失败

Mongodb - 在数组数组中查找()

Typescript :泛型类又扩展了另一个泛型类

如何获取需要加载cheerio的网站部分数据?

我应该如何解决这个 Angular node 包模块依赖冲突?

module.exports=require('other') 和临时变量有什么区别?

Typescript typeRoots 未检测到类型定义

如何使用 Jest 模拟异步函数的延迟时间

discordjs如何添加所有意图/权限

2 x 匹配标准显示没有结果

在安装 tensorflow 时遇到问题

响应发送 200 而不是 403

如何将`yarn.lock`与`package.json`同步?

如何为 node.js 服务器分配域名?

nodeJS - 如何使用 express 创建和读取会话

Node.js 中的 PHP exit()/die() 类似功能是什么

如何断言不为空?

'node' 未被识别为内部或外部命令