我在Golang的NAT客户端上编写包装器,我想获取处理函数,一旦我从NAT服务器获得消息,就可以从消费者那里调用它. 我想保留自定义订阅方法,直到它收到来自NAT的消息.

发布:

func (busConfig BusConfig) Publish(service string, data []byte) error {
    pubErr := conn.Publish(service, data)
    if pubErr != nil {
        return pubErr
    }
    return nil
}

订阅:

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    fmt.Println("Subscrbing on : ", subject)

    //wg := sync.WaitGroup{}
    //wg.Add(1)
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
        }()
        //wg.Done()
    })
    if err != nil {
        fmt.Println("Subscriber error : ", err)
    }
    //wg.Wait()
    defer subscription.Unsubscribe()

}

测试用例:

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("Life cycle event received :", string(input))
    })

    busClient.Publish(SUBJECT, []byte("complete notification"))
}

我看到消息被发布但没有订阅,我试图使用waitgroup保持订阅方法,但我认为这不是正确的解决方案.

推荐答案

据我所知,我认为在NAT中,即使我们没有提供回复地址,我们也需要响应消息,这样它才能响应消息.

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
            msg.Respond(nil)
        }()
      })
   }

Go相关问答推荐

在字符串与字符串子切片上使用len进行位转移产生意外输出

Go协议缓冲区导入问题

难以为多个平台添加Go Bazel构建选项

链自定义GRPC客户端拦截器/DialOptions

../golang/pkg/mod/github.com/wmentor/lemmas@v0.0.6/processor.go:72:9:未定义:令牌.进程

如何在S汇编器中更高效地将全局数据加载到霓虹灯寄存器?

如何找到一个空闲的TPM句柄来保存新的密钥对对象?

如何在正则表达式中使整个单词可选?

GORM中是否可能自动迁移具有循环关系的表?

下载和合并时输出文件已损坏

闭包所处的环境范围是什么?

如何使用泛型将接口转换为指定类型

泛型:实现嵌套接口

当函数返回一个函数时,为什么 Go 泛型会失败?

使用 GO 在侧 tar 文件中提取 tar 文件的最快方法

Go AST:获取所有 struct

curl:(56)Recv失败:连接由golang中的对等方与docker重置

为什么import和ImportSpec之间可以出现多行注释,而PackageName和ImportPath之间不能出现?

Go模板中的浮点除法

comparable和any有什么区别?