我try 为NatS限制队列编写订阅者:
sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if errors.Is(err, nats.ErrSlowConsumer) {
log.Printf("Slow consumer error returned. Waiting for reset...")
time.Sleep(50 * time.Millisecond)
continue
} else {
return err
}
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := handler(&message)
if err == nil {
msg.Ack()
msg.Term()
} else {
msg.Nak()
return err
}
callback(ctx)
此代码的目标是使用关于多个主题的任何消息,并调用与该主题相关联的回调函数.这段代码可以工作,但我遇到的问题是,如果该函数没有返回错误,我希望在调用handler
之后删除该消息.我认为这就是msg.Term
正在做的事情,但我仍然看到队列中的所有消息.
我最初是围绕工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它.有什么办法能让这件事奏效吗?