我在Golang的NAT客户端上编写包装器,我想获取处理函数,一旦我从NAT服务器获得消息,就可以从消费者那里调用它. 我想保留自定义订阅方法,直到它收到来自NAT的消息.
发布:
func (busConfig BusConfig) Publish(service string, data []byte) error {
pubErr := conn.Publish(service, data)
if pubErr != nil {
return pubErr
}
return nil
}
订阅:
func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
fmt.Println("Subscrbing on : ", subject)
//wg := sync.WaitGroup{}
//wg.Add(1)
subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
go func() {
handler(msg.Data)
}()
//wg.Done()
})
if err != nil {
fmt.Println("Subscriber error : ", err)
}
//wg.Wait()
defer subscription.Unsubscribe()
}
测试用例:
func TestLifeCycleEvent(t *testing.T) {
busClient := GetBusClient()
busClient.Subscribe(SUBJECT, func(input []byte) {
fmt.Println("Life cycle event received :", string(input))
})
busClient.Publish(SUBJECT, []byte("complete notification"))
}
我看到消息被发布但没有订阅,我试图使用waitgroup保持订阅方法,但我认为这不是正确的解决方案.