我try 为NatS限制队列编写订阅者:

sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
    return err
}

msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
    if errors.Is(err, nats.ErrSlowConsumer) {
        log.Printf("Slow consumer error returned. Waiting for reset...")
        time.Sleep(50 * time.Millisecond)
        continue
    } else {
        return err
    }
}

msg.InProgress()

var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
    msg.Term()
    return err
}

actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
    msg.Nak()
    continue
}

callback, err := handler(&message)
if err == nil {
    msg.Ack()
    msg.Term()
} else {
    msg.Nak()
    return err
}

callback(ctx)

此代码的目标是使用关于多个主题的任何消息,并调用与该主题相关联的回调函数.这段代码可以工作,但我遇到的问题是,如果该函数没有返回错误,我希望在调用handler之后删除该消息.我认为这就是msg.Term正在做的事情,但我仍然看到队列中的所有消息.

我最初是围绕工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它.有什么办法能让这件事奏效吗?

推荐答案

根据提供的代码,我假设您在使用Jetstream库创建订阅时没有提供流和消费者信息.

SubscribeSync方法的documentation中,它说当流和消费者信息不被提供时,库将创建一个短暂的消费者,并且消费者的名称由服务器挑选.它还试图找出订阅是针对哪个流的.

以下是我认为在您的代码中会发生的情况:

  • 当您调用SubscribeSync方法时,将使用您提供的主题创建一个短暂的使用者.
  • 当调用msg.Ackmsg.Term时,您确实确认了该消息,但仅针对该当前消费者.
  • 下次调用SubscribeSync方法时,将创建一个新的临时使用者,其中包含您已在另一个使用者上删除的消息.这就是Jetstream的流、消费者和订阅概念的设计方式.

根据你想要实现的目标,这里有一些建议:

  • 使用普通的NatS Core库来处理发布/订阅或队列.不要使用JetStream.NatS Core库直接处理主题,而Jetstream库在没有提供信息的情况下在幕后创建额外的东西(流和消费者).
  • 使用Jetstream,但要自己创建一个流和一个持久消费者,要么通过代码,要么直接在NAS服务器上.通过这种方式,已经定义了流和消费者,您应该能够使其按预期工作.

Go相关问答推荐

如何描述OpenAPI规范中围棋的数据类型.JSON?

戈姆:如何将一对一联系起来?

GO框架Echo中间件的使用

如何在链接中写入链接

在 Go 中将元数据从一个 JPEG 复制到另一个

生成一个 CSV/Excel,在 Golang 中该列的下拉选项中指定值

golang yaml 马歇尔网址

Golang 中具体类型的错误片段

使用图像解码 JPEG 时 colored颜色 不正确.解码并写入 PDF?

将 big.Int 转换为 [2]int64,反之亦然和二进制补码

如何使用 Docker 引擎 SDK 和 Golang 运行 docker 挂载卷

Golang 构建多平台问题

查找、解析和验证邮箱地址

Dockerfile 问题 - 为什么找不到二进制 dlv - 没有这样的文件或目录

在 Go 中读取数字行

Go:等待多个通道的性能损失

使用反射在 struct 内迭代切片 struct

Golang - 使用正则表达式提取链接

AWS EKS 上的 Golang REST API 部署因 CrashLoopBackOff 而失败

如何动态解析 Go Fiber 中的请求正文?