我面临一个问题,在几次断开连接和重新连接之后,我的MQTT客户端(比方说A)停止接收来自另一个发布客户端(B)的消息.

如果我用mosquitto_sub手动订阅主题B正在发布,我可以看到所有消息都按预期发布.如果我手动发布(mosquitto_pub)到订阅的主题A,A也会收到这些消息,所以订阅似乎起作用了.只有当B发布到该主题时,A才不会接收消息.

这两个客户端都连接到蚊子代理(1.6.12版).一切都在RaspberryPI CM3上运行.客户端是使用PAHO MQTT库(1.4.2版)编写的,并作为系统服务启动.客户端使用以下选项进行初始化:

opts.SetCleanSession(false)
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)

此外,两个客户端都获得唯一的ID. 我查看了蚊子的日志(log),注意到一些消息在重新连接的客户订阅之前被发送到他.

1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m722, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m723, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m904, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1085, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1086, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1267, 'example-topic-1/a', ... (109 bytes))
...
...
1678786214: Sending PUBLISH to telemetry (d0, q1, r0, m2293, 'example-topic-1/c'1, ... (119 bytes))
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m1, 'example-topic-2', ... (7 bytes))
1678786214: Sending PUBREC to telemetry (m1, rc0)
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m2, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBREC to telemetry (m2, rc0)
1678786214: Received PUBREL from telemetry (Mid: 1)
1678786214: Sending PUBCOMP to telemetry (m1)
1678786214: Received PUBREL from telemetry (Mid: 2)
1678786214: Sending PUBCOMP to telemetry (m2)
1678786214: Received SUBSCRIBE from telemetry
1678786214:         shutdown (QoS 2)
1678786214: telemetry 2 shutdown
1678786214: Sending SUBACK to telemetry
1678786214: Received SUBSCRIBE from telemetry
1678786214:         example-topic-1/# (QoS 1)
1678786214: telemetry 1 example-topic-1/#
1678786214: Sending SUBACK to telemetry

在MQTT库的文档中,它声明

如果之前已经创建了QOS1+订阅,并且您使用设置为FALSE的CleanSession进行连接,则代理可能会在调用订阅之前交付保留的消息.要处理这些消息,可以使用Addroute配置处理程序,也可以设置DefaultPublishHandler.

这是否意味着消息将会丢失,或者它实际上是以某种方式阻止了主题?

到目前为止,我try 从最初的1.6.10版更新到1.6.12版,但没有解决问题.重新启动蚊子服务似乎解决了这个问题,但并不是真正的解决方案.我的下一步将是设置DefaultPublishHandler来处理过早发送的消息.

预先感谢您所能提供的任何帮助.如果需要更多信息,请让我知道!

编辑:

我try 了更多,并查看了蚊子到日志(log),可以将问题缩小一点,因为它似乎与消息的Qos级别有关.

当客户端A没有接收到客户端B发送的任何消息时,我try 注册一个新的客户端(使用mosquitto_sub),让我们称其为C,它从客户端B接收消息,而不考虑订阅的服务质量级别.当与客户端C一起发布时,客户端A仅接收以0的服务质量发送的消息.如果我将Qos指定为1或2,则客户端A不会收到任何内容.由于客户端B发送的所有消息的服务质量都为1,因此代理似乎不会将任何服务质量为0的消息发送到客户端A,而是发送到其他客户端.

对于所有客户端,OrderMatters都设置为FALSE,并且代理的唯一配置是max_queued_messages 0. 以下是一些相关的日志(log)片段.

使用QOS 1的客户端B发布

1678874048: Sending PUBACK to clientB (m732, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m733, 'topic/b', ... (128 bytes))
1678874048: Sending PUBACK to clientB (m733, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m734, 'topic/c', ... (135 bytes))
1678874048: Sending PUBACK to clientB (m734, rc0)

请注意,没有消息发布到客户端A.

使用QOS 1的客户端C发布

1678874088: No will message specified.
1678874088: Sending CONNACK to mosq-glZkfjX79CriC0qOK5 (0, 0)
1678874088: Received PUBLISH from mosq-glZkfjX79CriC0qOK5 (d0, q1, r0, m1, 'topic/a', ... (130 bytes))
1678874088: Sending PUBACK to mosq-glZkfjX79CriC0qOK5 (m1, rc0)
1678874088: Received DISCONNECT from mosq-glZkfjX79CriC0qOK5
1678874088: Client mosq-glZkfjX79CriC0qOK5 disconnected.

与使用新客户端发布相同.

具有服务质量0的客户端C发布

1678874043: No will message specified.
1678874043: Sending CONNACK to mosq-pba5gDo7lNe7Sqz5jd (0, 0)
1678874043: Received PUBLISH from mosq-pba5gDo7lNe7Sqz5jd (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Sending PUBLISH to clientA (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Received DISCONNECT from mosq-pba5gDo7lNe7Sqz5jd
1678874043: Client mosq-pba5gDo7lNe7Sqz5jd disconnected.

使用新客户端发布且Qos为0的发布将发布到客户端A.

客户端C订阅,服务质量为1

1678874687: No will message specified.
1678874687: Sending CONNACK to mosq-vkwirAtvYxHwoLnCQH (0, 0)
1678874687: Received SUBSCRIBE from mosq-vkwirAtvYxHwoLnCQH
1678874687:         topic/+ (QoS 1)
1678874687: mosq-vkwirAtvYxHwoLnCQH 1 topic/+
1678874687: Sending SUBACK to mosq-vkwirAtvYxHwoLnCQH
...
1678874695: Received PUBLISH from clientB (d0, q1, r0, m673, 'topic/a', ... (137 bytes))
1678874695: Sending PUBACK to clientB (m673, rc0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m3, 'topic/a', ... (137 bytes))
1678874695: Received PUBLISH from clientB (d0, q1, r0, m674, 'topic/b', ... (130 bytes))
1678874695: Sending PUBACK to clientB (m674, rc0)
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 3, RC:0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m4, 'topic/b', ... (130 bytes))
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 4, RC:0)

使用具有Qos 1的新客户端订阅也是有效的...

老实说,我现在有点想不起来了,因为我的行为很奇怪.回调处理程序似乎不会阻塞clientAs端,因为仍会处理服务质量为0的消息.是否有任何设置或配置可能会扰乱QOS 1订阅?

编辑2:

以下日志(log)基本上是重启服务之前和重启后的日志(log),导致了所描述的行为.因为不止这两个客户经常发送东西,所以我又剪掉了一些部分,但关于这两个客户的所有东西都应该在这里.从第一次连接和几个工作发布到重新启动和不再发送消息.

我注意到的另一件事是,当关闭服务时,客户端从不呼叫Disconnect.它试图取消订阅,但有时甚至无法完成.这会导致这样的问题吗?

1678880076: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880076: No will message specified.
1678880076: Sending CONNACK to clientA (1, 0)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19039, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19041, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19331, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19333, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19453, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19454, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19575, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19576, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20053, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20054, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20235, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20236, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20349, 'topic1/ostree_sha_rollback', ... (172 bytes))
... (lots of PUBLISH withouth PUBACK from clientA)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20540, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880076: Sending PUBREC to clientA (m1, rc0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBREC to clientA (m2, rc0)
1678880076: Received PUBREL from clientA (Mid: 1)
1678880076: Sending PUBCOMP to clientA (m1)
1678880076: Received PUBREL from clientA (Mid: 2)
1678880076: Sending PUBCOMP to clientA (m2)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20720, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m3, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBREC to clientA (m3, rc0)
1678880076: Received SUBSCRIBE from clientA
1678880076:         shutdown (QoS 2)
1678880076: clientA 2 shutdown
1678880076: Sending SUBACK to clientA
1678880076: Received PUBREL from clientA (Mid: 3)
1678880076: Sending PUBCOMP to clientA (m3)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20721, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received SUBSCRIBE from clientA
1678880076:         telemetry/+ (QoS 1)
1678880076: clientA 1 telemetry/+
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20722, 'telemetry/geolocation', ... (51 bytes))
1678880076: Received PUBACK from clientA (Mid: 20722, RC:0)
1678880076: Received SUBSCRIBE from clientA
1678880076:         telemetry-batch (QoS 1)
1678880076: clientA 1 telemetry-batch
1678880076: Sending SUBACK to clientA
1678880076: Received SUBSCRIBE from clientA
1678880076:         topic1/# (QoS 1)
1678880076: clientA 1 topic1/#
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20723, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20724, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBACK from clientA (Mid: 20723, RC:0)
1678880076: Received PUBACK from clientA (Mid: 20724, RC:0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880076: Sending PUBREC to clientA (m8, rc0)
1678880076: Received PUBREL from clientA (Mid: 8)
1678880076: Sending PUBCOMP to clientA (m8)
...
1678880083: New connection from 127.0.0.1 on port 1883.
1678880083: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880083: No will message specified.
1678880083: Sending CONNACK to clientB (1, 0)
1678880085: Received PUBLISH from clientA (d0, q2, r1, m9, 'session', ... (36 bytes))
1678880085: Sending PUBREC to clientA (m9, rc0)
1678880085: Received PUBREL from clientA (Mid: 9)
1678880085: Sending PUBCOMP to clientA (m9)
...
1678880086: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/net_response', ... (118 bytes))
1678880086: Sending PUBACK to clientB (m3, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20725, 'topic1/net_response', ... (118 bytes))
1678880086: Received PUBACK from clientA (Mid: 20725, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Sending PUBACK to clientB (m4, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20726, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Received PUBACK from clientA (Mid: 20726, RC:0)
... (lots of PUBLISH with PUBACK from telemetry)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20742, 'topic1/swap', ... (169 bytes))
1678880086: Received PUBACK from clientA (Mid: 20742, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/syslog', ... (508219 bytes))
1678880086: Sending PUBACK to clientB (m21, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20743, 'topic1/syslog', ... (508219 bytes))
1678880086: Received PUBACK from clientA (Mid: 20743, RC:0)
...
1678880088: Received PUBLISH from clientB (d0, q1, r0, m22, 'topic1/version_ca_lockdown', ... (133 bytes))
1678880088: Sending PUBACK to clientB (m22, rc0)
... (here the services get restarted and afterwards the subscription seems broken for QoS >= 1)
1678880118: Received UNSUBSCRIBE from clientA
1678880118:         telemetry/+
1678880118: clientA telemetry/+
1678880118: Sending UNSUBACK to clientA
1678880118: Socket error on client clientA, disconnecting.
1678880119: Received PUBLISH from clientB (d0, q1, r0, m136, 'topic1/release_id', ... (126 bytes))
1678880119: Sending PUBACK to clientB (m136, rc0)
1678880119: Received PUBLISH from clientB (d0, q1, r0, m137, 'topic1/version_ca_configurator', ... (137 bytes))
1678880119: Sending PUBACK to clientB (m137, rc0)
...
1678880125: New connection from 127.0.0.1 on port 1883.
1678880125: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880125: No will message specified.
1678880125: Sending CONNACK to clientA (1, 0)
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
... (again, lots of PUBLISH without PUBACK)
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20873, 'topic1/proc_led', ... (151 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20874, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20875, 'topic1/ssid_wlan0', ... (99 bytes))
1678880125: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880125: Sending PUBREC to clientA (m1, rc0)
1678880125: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880125: Sending PUBREC to clientA (m2, rc0)
1678880125: Received PUBREL from clientA (Mid: 1)
1678880125: Sending PUBCOMP to clientA (m1)
1678880125: Received PUBREL from clientA (Mid: 2)
1678880125: Sending PUBCOMP to clientA (m2)
1678880125: Received SUBSCRIBE from clientA
1678880125:         shutdown (QoS 2)
1678880125: clientA 2 shutdown
1678880125: Sending SUBACK to clientA
1678880125: Received PUBLISH from clientA (d0, q2, r1, m4, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880125: Sending PUBREC to clientA (m4, rc0)
1678880125: Received SUBSCRIBE from clientA
1678880125:         telemetry/+ (QoS 1)
1678880125: clientA 1 telemetry/+
1678880125: Sending SUBACK to clientA
1678880125: Received PUBREL from clientA (Mid: 4)
1678880125: Sending PUBCOMP to clientA (m4)
1678880125: Received SUBSCRIBE from clientA
1678880125:         telemetry-batch (QoS 1)
1678880125: clientA 1 telemetry-batch
1678880125: Sending SUBACK to clientA
1678880125: Received SUBSCRIBE from clientA
1678880125:         topic1/# (QoS 1)
1678880125: clientA 1 topic1/#
1678880125: Sending SUBACK to clientA
1678880126: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880126: Sending PUBREC to clientA (m8, rc0)
1678880126: Received PUBREL from clientA (Mid: 8)
1678880126: Sending PUBCOMP to clientA (m8)
...
1678880133: New connection from 127.0.0.1 on port 1883.
1678880133: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880133: No will message specified.
1678880133: Sending CONNACK to clientB (1, 0)
...
1678880135: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/provision_date', ... (131 bytes))
1678880135: Sending PUBACK to clientB (m3, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/processes', ... (99 bytes))
... (There are no more PUBLISH message from broker to clientA)
1678880135: Sending PUBACK to clientB (m20, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/net_response', ... (118 bytes))
1678880135: Sending PUBACK to clientB (m21, rc0)

我仍在研究一个最小的、可重现的示例,但不幸的是,到目前为止还没有在嵌入式环境之外重现这个错误.

编辑3:

为了完整起见,我设法构建了一个最小的、可重现的示例,我想我也会把它张贴出来.

package main

import (
    "os"
    "os/signal"
    "syscall"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/sirupsen/logrus"
)

func main() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    stop := make(chan bool, 1)
    optsA := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
    logrus.Info("setup and connect clientA")
    clientA := setup(optsA, "clientA")
    if err := connect(clientA); err != nil {
        panic(err)
    }

    if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
        logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
    }); tok.Wait() && tok.Error() != nil {
        panic(tok.Error())
    }

    logrus.Infof("setup and connect clientB")
    optsB := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
    clientB := setup(optsB, "clientB")
    if err := connect(clientB); err != nil {
        panic(err)
    }

    logrus.Info("start publishing from clientB")
    go publish(clientB, stop)

    logrus.Info("let clientA receive some messages from clientB")
    time.Sleep(500 * time.Millisecond)

    clientA.Disconnect(10)

    logrus.Info("wait until ClientB published more than max_inflight_messages")
    time.Sleep(1100 * time.Millisecond)

    logrus.Infof("connecting clientA again")
    clientA = setup(optsA, "clientA")
    if err := connect(clientA); err != nil {
        panic(err)
    }

    logrus.Info("wait shortly before subscribing")
    time.Sleep(1 * time.Second)

    logrus.Info("subscribe with clientA")
    if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
        logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
    }); tok.Wait() && tok.Error() != nil {
        panic(tok.Error())
    }

    <-c
    stop <- true
}

func publish(client mqtt.Client, stop chan bool) {
    for {
        select {
        case <-stop:
            return
        default:
            if tok := client.Publish("topic/exmaple", 1, false, "message"); tok.Wait() && tok.Error() != nil {
                logrus.WithError(tok.Error()).Warnf("failed to publish, continuing")
                continue
            }
            time.Sleep(50 * time.Millisecond)
        }
    }
}

func connect(client mqtt.Client) error {
    if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
        logrus.WithError(tok.Error()).Error("failed to connect")
        return tok.Error()
    }
    return nil
}

func setup(opts *mqtt.ClientOptions, id string) mqtt.Client {
    opts.SetClientID(id)
    opts.SetOrderMatters(false)
    opts.SetCleanSession(false)
    // opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
    //  logrus.Infof("received message on topic %s which does not match any subscriptions (yet)", msg.Topic())
    // })
    opts.SetKeepAlive(10 * time.Second)
    opts.SetPingTimeout(1 * time.Second)
    opts.SetAutoReconnect(true)
    opts.SetConnectTimeout(15 * time.Second)

    cl := mqtt.NewClient(opts)
    return cl
}

正如@Brits在他的回答中解释的那样,如果我取消对这DefaultPublishHandler条的 comments ,这些消息就会得到确认,订阅就会继续下go .

推荐答案

感谢你的日志(log);事实上,在连接建立后,Mosquito没有收到PUBACK分的消息,这让我找到了可能的原因.

对于Mosquito v1.6.x,max_inflight_messages默认为10;因此,在10条未确认的消息之后,Mosquito将不再发送任何消息.这就是它停止向clientA发送的原因.

如果没有处理程序,paho.mqtt.golang将不会确认消息(如果启用日志(log)记录,则会在发生这种情况时输出警告).这样做的理由在时间的迷雾中消失了(我添加了警告),但我怀疑这是因为,如果没有处理程序,消息就不能被说成已经处理(因此不应该被确认).早期版本的Mosquito用于重新发送尚未被确认的消息,但现在不再是这样了(并且在v5规范中被禁止),这意味着它实际上是一个永久的阻止.

在您的例子中,这两个因素是结合在一起的;您连接、接收10PUBLISH个包,然后订阅(设置处理程序),但在这一点上,Mosquito有10条消息在传输,不会再发送了.

解决办法是添加以下内容:

opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})

这将添加一个默认的发布处理程序(忽略消息);该处理程序的存在意味着将确认PUBLISH个包.

ClientA是否需要在离线时接收消息(它正在从telemetry/+取消订阅,因此无论如何都不会收到这些消息).如果不是,则使用opts.SetCleanSession(true)是避免此问题的另一种方式.

如果您确实需要处理消息,那么在连接之前使用AddRoute来配置您的消息处理程序(我通常有一个通用的DefaultPublishHandler,它只记录消息,这样我就可以看到遗漏了什么).

Go相关问答推荐

运行add. inf,这样我们就可以在app.conf中使用. inf参数了?

禁用Golang中的终端

如何将文件从AWS S3存储桶复制到Azure BLOB存储

使用Digitorus/pdfsign在GO(Golang)中签署pdf文件

从使用Golang otelmux检测的Otel跟踪中获取trace_id

如何在出现错误时停止从通道读取?

Golang在不写入磁盘的情况下为jpeg图像生成一致的哈希

Golang Gorm Fiber - 如何将定义为别名的名称发送到索引模板?

Json.Unmarshal() 和 gin.BindJson() 之间的区别

用 fork 替换 Go 依赖:...用于两个不同的模块路径

启动套接字服务器会干扰 gRPC/http 客户端服务器通信 Golang

由于 main.go 文件中的本地包导入导致构建 docker 容器时出错

枚举的 Golang 验证器自定义验证规则

如何将多个切片打印为一个切片?

go:识别重新定义标志的包

如何在 golang 中同时加载 .env 文件和 os 环境变量

如何在 GORM 中获取字段值

递归数据 struct 解组在 Go Lang Protobuf 中给出错误无法解析无效的线格式数据

Golang 泛型

如何断言类型是指向golang中接口的指针