我面临一个问题,在几次断开连接和重新连接之后,我的MQTT客户端(比方说A
)停止接收来自另一个发布客户端(B
)的消息.
如果我用mosquitto_sub
手动订阅主题B
正在发布,我可以看到所有消息都按预期发布.如果我手动发布(mosquitto_pub
)到订阅的主题A
,A
也会收到这些消息,所以订阅似乎起作用了.只有当B
发布到该主题时,A
才不会接收消息.
这两个客户端都连接到蚊子代理(1.6.12版).一切都在RaspberryPI CM3上运行.客户端是使用PAHO MQTT库(1.4.2版)编写的,并作为系统服务启动.客户端使用以下选项进行初始化:
opts.SetCleanSession(false)
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)
此外,两个客户端都获得唯一的ID. 我查看了蚊子的日志(log),注意到一些消息在重新连接的客户订阅之前被发送到他.
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m722, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m723, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m904, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1085, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1086, 'example-topic-1/b', ... (118 bytes))
1678786214: Sending PUBLISH to telemetry (d1, q1, r0, m1267, 'example-topic-1/a', ... (109 bytes))
...
...
1678786214: Sending PUBLISH to telemetry (d0, q1, r0, m2293, 'example-topic-1/c'1, ... (119 bytes))
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m1, 'example-topic-2', ... (7 bytes))
1678786214: Sending PUBREC to telemetry (m1, rc0)
1678786214: Received PUBLISH from telemetry (d0, q2, r1, m2, 'example-topic-1/a', ... (109 bytes))
1678786214: Sending PUBREC to telemetry (m2, rc0)
1678786214: Received PUBREL from telemetry (Mid: 1)
1678786214: Sending PUBCOMP to telemetry (m1)
1678786214: Received PUBREL from telemetry (Mid: 2)
1678786214: Sending PUBCOMP to telemetry (m2)
1678786214: Received SUBSCRIBE from telemetry
1678786214: shutdown (QoS 2)
1678786214: telemetry 2 shutdown
1678786214: Sending SUBACK to telemetry
1678786214: Received SUBSCRIBE from telemetry
1678786214: example-topic-1/# (QoS 1)
1678786214: telemetry 1 example-topic-1/#
1678786214: Sending SUBACK to telemetry
在MQTT库的文档中,它声明
如果之前已经创建了QOS1+订阅,并且您使用设置为FALSE的CleanSession进行连接,则代理可能会在调用订阅之前交付保留的消息.要处理这些消息,可以使用Addroute配置处理程序,也可以设置DefaultPublishHandler.
这是否意味着消息将会丢失,或者它实际上是以某种方式阻止了主题?
到目前为止,我try 从最初的1.6.10版更新到1.6.12版,但没有解决问题.重新启动蚊子服务似乎解决了这个问题,但并不是真正的解决方案.我的下一步将是设置DefaultPublishHandler
来处理过早发送的消息.
预先感谢您所能提供的任何帮助.如果需要更多信息,请让我知道!
编辑:
我try 了更多,并查看了蚊子到日志(log),可以将问题缩小一点,因为它似乎与消息的Qos级别有关.
当客户端A
没有接收到客户端B
发送的任何消息时,我try 注册一个新的客户端(使用mosquitto_sub
),让我们称其为C
,它从客户端B
接收消息,而不考虑订阅的服务质量级别.当与客户端C
一起发布时,客户端A
仅接收以0的服务质量发送的消息.如果我将Qos指定为1或2,则客户端A
不会收到任何内容.由于客户端B
发送的所有消息的服务质量都为1,因此代理似乎不会将任何服务质量为0的消息发送到客户端A
,而是发送到其他客户端.
对于所有客户端,OrderMatters
都设置为FALSE,并且代理的唯一配置是max_queued_messages 0
.
以下是一些相关的日志(log)片段.
使用QOS 1的客户端B发布
1678874048: Sending PUBACK to clientB (m732, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m733, 'topic/b', ... (128 bytes))
1678874048: Sending PUBACK to clientB (m733, rc0)
1678874048: Received PUBLISH from clientB (d0, q1, r0, m734, 'topic/c', ... (135 bytes))
1678874048: Sending PUBACK to clientB (m734, rc0)
请注意,没有消息发布到客户端A.
使用QOS 1的客户端C发布
1678874088: No will message specified.
1678874088: Sending CONNACK to mosq-glZkfjX79CriC0qOK5 (0, 0)
1678874088: Received PUBLISH from mosq-glZkfjX79CriC0qOK5 (d0, q1, r0, m1, 'topic/a', ... (130 bytes))
1678874088: Sending PUBACK to mosq-glZkfjX79CriC0qOK5 (m1, rc0)
1678874088: Received DISCONNECT from mosq-glZkfjX79CriC0qOK5
1678874088: Client mosq-glZkfjX79CriC0qOK5 disconnected.
与使用新客户端发布相同.
具有服务质量0的客户端C发布
1678874043: No will message specified.
1678874043: Sending CONNACK to mosq-pba5gDo7lNe7Sqz5jd (0, 0)
1678874043: Received PUBLISH from mosq-pba5gDo7lNe7Sqz5jd (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Sending PUBLISH to clientA (d0, q0, r0, m0, 'topic/a', ... (130 bytes))
1678874043: Received DISCONNECT from mosq-pba5gDo7lNe7Sqz5jd
1678874043: Client mosq-pba5gDo7lNe7Sqz5jd disconnected.
使用新客户端发布且Qos为0的发布将发布到客户端A.
客户端C订阅,服务质量为1
1678874687: No will message specified.
1678874687: Sending CONNACK to mosq-vkwirAtvYxHwoLnCQH (0, 0)
1678874687: Received SUBSCRIBE from mosq-vkwirAtvYxHwoLnCQH
1678874687: topic/+ (QoS 1)
1678874687: mosq-vkwirAtvYxHwoLnCQH 1 topic/+
1678874687: Sending SUBACK to mosq-vkwirAtvYxHwoLnCQH
...
1678874695: Received PUBLISH from clientB (d0, q1, r0, m673, 'topic/a', ... (137 bytes))
1678874695: Sending PUBACK to clientB (m673, rc0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m3, 'topic/a', ... (137 bytes))
1678874695: Received PUBLISH from clientB (d0, q1, r0, m674, 'topic/b', ... (130 bytes))
1678874695: Sending PUBACK to clientB (m674, rc0)
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 3, RC:0)
1678874695: Sending PUBLISH to mosq-vkwirAtvYxHwoLnCQH (d0, q1, r0, m4, 'topic/b', ... (130 bytes))
1678874695: Received PUBACK from mosq-vkwirAtvYxHwoLnCQH (Mid: 4, RC:0)
使用具有Qos 1的新客户端订阅也是有效的...
老实说,我现在有点想不起来了,因为我的行为很奇怪.回调处理程序似乎不会阻塞clientA
s端,因为仍会处理服务质量为0的消息.是否有任何设置或配置可能会扰乱QOS 1订阅?
编辑2:
以下日志(log)基本上是重启服务之前和重启后的日志(log),导致了所描述的行为.因为不止这两个客户经常发送东西,所以我又剪掉了一些部分,但关于这两个客户的所有东西都应该在这里.从第一次连接和几个工作发布到重新启动和不再发送消息.
我注意到的另一件事是,当关闭服务时,客户端从不呼叫Disconnect
.它试图取消订阅,但有时甚至无法完成.这会导致这样的问题吗?
1678880076: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880076: No will message specified.
1678880076: Sending CONNACK to clientA (1, 0)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19039, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19041, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19331, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19333, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19453, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19454, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19575, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m19576, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20053, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20054, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20235, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20236, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20349, 'topic1/ostree_sha_rollback', ... (172 bytes))
... (lots of PUBLISH withouth PUBACK from clientA)
1678880076: Sending PUBLISH to clientA (d1, q1, r0, m20540, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880076: Sending PUBREC to clientA (m1, rc0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBREC to clientA (m2, rc0)
1678880076: Received PUBREL from clientA (Mid: 1)
1678880076: Sending PUBCOMP to clientA (m1)
1678880076: Received PUBREL from clientA (Mid: 2)
1678880076: Sending PUBCOMP to clientA (m2)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20720, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Received PUBLISH from clientA (d0, q2, r1, m3, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Sending PUBREC to clientA (m3, rc0)
1678880076: Received SUBSCRIBE from clientA
1678880076: shutdown (QoS 2)
1678880076: clientA 2 shutdown
1678880076: Sending SUBACK to clientA
1678880076: Received PUBREL from clientA (Mid: 3)
1678880076: Sending PUBCOMP to clientA (m3)
1678880076: Sending PUBLISH to clientA (d0, q1, r0, m20721, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received SUBSCRIBE from clientA
1678880076: telemetry/+ (QoS 1)
1678880076: clientA 1 telemetry/+
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20722, 'telemetry/geolocation', ... (51 bytes))
1678880076: Received PUBACK from clientA (Mid: 20722, RC:0)
1678880076: Received SUBSCRIBE from clientA
1678880076: telemetry-batch (QoS 1)
1678880076: clientA 1 telemetry-batch
1678880076: Sending SUBACK to clientA
1678880076: Received SUBSCRIBE from clientA
1678880076: topic1/# (QoS 1)
1678880076: clientA 1 topic1/#
1678880076: Sending SUBACK to clientA
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20723, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880076: Sending PUBLISH to clientA (d0, q1, r1, m20724, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880076: Received PUBACK from clientA (Mid: 20723, RC:0)
1678880076: Received PUBACK from clientA (Mid: 20724, RC:0)
1678880076: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880076: Sending PUBREC to clientA (m8, rc0)
1678880076: Received PUBREL from clientA (Mid: 8)
1678880076: Sending PUBCOMP to clientA (m8)
...
1678880083: New connection from 127.0.0.1 on port 1883.
1678880083: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880083: No will message specified.
1678880083: Sending CONNACK to clientB (1, 0)
1678880085: Received PUBLISH from clientA (d0, q2, r1, m9, 'session', ... (36 bytes))
1678880085: Sending PUBREC to clientA (m9, rc0)
1678880085: Received PUBREL from clientA (Mid: 9)
1678880085: Sending PUBCOMP to clientA (m9)
...
1678880086: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/net_response', ... (118 bytes))
1678880086: Sending PUBACK to clientB (m3, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20725, 'topic1/net_response', ... (118 bytes))
1678880086: Received PUBACK from clientA (Mid: 20725, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Sending PUBACK to clientB (m4, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20726, 'topic1/public_ip_addr', ... (116 bytes))
1678880086: Received PUBACK from clientA (Mid: 20726, RC:0)
... (lots of PUBLISH with PUBACK from telemetry)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20742, 'topic1/swap', ... (169 bytes))
1678880086: Received PUBACK from clientA (Mid: 20742, RC:0)
1678880086: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/syslog', ... (508219 bytes))
1678880086: Sending PUBACK to clientB (m21, rc0)
1678880086: Sending PUBLISH to clientA (d0, q1, r0, m20743, 'topic1/syslog', ... (508219 bytes))
1678880086: Received PUBACK from clientA (Mid: 20743, RC:0)
...
1678880088: Received PUBLISH from clientB (d0, q1, r0, m22, 'topic1/version_ca_lockdown', ... (133 bytes))
1678880088: Sending PUBACK to clientB (m22, rc0)
... (here the services get restarted and afterwards the subscription seems broken for QoS >= 1)
1678880118: Received UNSUBSCRIBE from clientA
1678880118: telemetry/+
1678880118: clientA telemetry/+
1678880118: Sending UNSUBACK to clientA
1678880118: Socket error on client clientA, disconnecting.
1678880119: Received PUBLISH from clientB (d0, q1, r0, m136, 'topic1/release_id', ... (126 bytes))
1678880119: Sending PUBACK to clientB (m136, rc0)
1678880119: Received PUBLISH from clientB (d0, q1, r0, m137, 'topic1/version_ca_configurator', ... (137 bytes))
1678880119: Sending PUBACK to clientB (m137, rc0)
...
1678880125: New connection from 127.0.0.1 on port 1883.
1678880125: New client connected from 127.0.0.1 as clientA (p2, c0, k10).
1678880125: No will message specified.
1678880125: Sending CONNACK to clientA (1, 0)
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18734, 'topic1/matlab_version_expected', ... (152 bytes))
1678880125: Sending PUBLISH to clientA (d1, q1, r0, m18735, 'topic1/coreagent_ota_state', ... (112 bytes))
... (again, lots of PUBLISH without PUBACK)
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20873, 'topic1/proc_led', ... (151 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20874, 'topic1/coreagent_ota_state', ... (112 bytes))
1678880125: Sending PUBLISH to clientA (d0, q1, r0, m20875, 'topic1/ssid_wlan0', ... (99 bytes))
1678880125: Received PUBLISH from clientA (d0, q2, r1, m1, 'conn-status', ... (7 bytes))
1678880125: Sending PUBREC to clientA (m1, rc0)
1678880125: Received PUBLISH from clientA (d0, q2, r1, m2, 'topic1/telemetry_data_sent', ... (109 bytes))
1678880125: Sending PUBREC to clientA (m2, rc0)
1678880125: Received PUBREL from clientA (Mid: 1)
1678880125: Sending PUBCOMP to clientA (m1)
1678880125: Received PUBREL from clientA (Mid: 2)
1678880125: Sending PUBCOMP to clientA (m2)
1678880125: Received SUBSCRIBE from clientA
1678880125: shutdown (QoS 2)
1678880125: clientA 2 shutdown
1678880125: Sending SUBACK to clientA
1678880125: Received PUBLISH from clientA (d0, q2, r1, m4, 'topic1/telemetry_queue_size', ... (119 bytes))
1678880125: Sending PUBREC to clientA (m4, rc0)
1678880125: Received SUBSCRIBE from clientA
1678880125: telemetry/+ (QoS 1)
1678880125: clientA 1 telemetry/+
1678880125: Sending SUBACK to clientA
1678880125: Received PUBREL from clientA (Mid: 4)
1678880125: Sending PUBCOMP to clientA (m4)
1678880125: Received SUBSCRIBE from clientA
1678880125: telemetry-batch (QoS 1)
1678880125: clientA 1 telemetry-batch
1678880125: Sending SUBACK to clientA
1678880125: Received SUBSCRIBE from clientA
1678880125: topic1/# (QoS 1)
1678880125: clientA 1 topic1/#
1678880125: Sending SUBACK to clientA
1678880126: Received PUBLISH from clientA (d0, q2, r1, m8, 'conn-status', ... (6 bytes))
1678880126: Sending PUBREC to clientA (m8, rc0)
1678880126: Received PUBREL from clientA (Mid: 8)
1678880126: Sending PUBCOMP to clientA (m8)
...
1678880133: New connection from 127.0.0.1 on port 1883.
1678880133: New client connected from 127.0.0.1 as clientB (p2, c0, k10).
1678880133: No will message specified.
1678880133: Sending CONNACK to clientB (1, 0)
...
1678880135: Received PUBLISH from clientB (d0, q1, r0, m3, 'topic1/provision_date', ... (131 bytes))
1678880135: Sending PUBACK to clientB (m3, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m4, 'topic1/processes', ... (99 bytes))
... (There are no more PUBLISH message from broker to clientA)
1678880135: Sending PUBACK to clientB (m20, rc0)
1678880135: Received PUBLISH from clientB (d0, q1, r0, m21, 'topic1/net_response', ... (118 bytes))
1678880135: Sending PUBACK to clientB (m21, rc0)
我仍在研究一个最小的、可重现的示例,但不幸的是,到目前为止还没有在嵌入式环境之外重现这个错误.
编辑3:
为了完整起见,我设法构建了一个最小的、可重现的示例,我想我也会把它张贴出来.
package main
import (
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
)
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
stop := make(chan bool, 1)
optsA := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
logrus.Info("setup and connect clientA")
clientA := setup(optsA, "clientA")
if err := connect(clientA); err != nil {
panic(err)
}
if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
}); tok.Wait() && tok.Error() != nil {
panic(tok.Error())
}
logrus.Infof("setup and connect clientB")
optsB := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
clientB := setup(optsB, "clientB")
if err := connect(clientB); err != nil {
panic(err)
}
logrus.Info("start publishing from clientB")
go publish(clientB, stop)
logrus.Info("let clientA receive some messages from clientB")
time.Sleep(500 * time.Millisecond)
clientA.Disconnect(10)
logrus.Info("wait until ClientB published more than max_inflight_messages")
time.Sleep(1100 * time.Millisecond)
logrus.Infof("connecting clientA again")
clientA = setup(optsA, "clientA")
if err := connect(clientA); err != nil {
panic(err)
}
logrus.Info("wait shortly before subscribing")
time.Sleep(1 * time.Second)
logrus.Info("subscribe with clientA")
if tok := clientA.Subscribe("topic/+", 1, func(c mqtt.Client, m mqtt.Message) {
logrus.Infof("received on topic %s ; message: %s", string(m.Topic()), string(m.Payload()))
}); tok.Wait() && tok.Error() != nil {
panic(tok.Error())
}
<-c
stop <- true
}
func publish(client mqtt.Client, stop chan bool) {
for {
select {
case <-stop:
return
default:
if tok := client.Publish("topic/exmaple", 1, false, "message"); tok.Wait() && tok.Error() != nil {
logrus.WithError(tok.Error()).Warnf("failed to publish, continuing")
continue
}
time.Sleep(50 * time.Millisecond)
}
}
}
func connect(client mqtt.Client) error {
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
logrus.WithError(tok.Error()).Error("failed to connect")
return tok.Error()
}
return nil
}
func setup(opts *mqtt.ClientOptions, id string) mqtt.Client {
opts.SetClientID(id)
opts.SetOrderMatters(false)
opts.SetCleanSession(false)
// opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
// logrus.Infof("received message on topic %s which does not match any subscriptions (yet)", msg.Topic())
// })
opts.SetKeepAlive(10 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectTimeout(15 * time.Second)
cl := mqtt.NewClient(opts)
return cl
}
正如@Brits在他的回答中解释的那样,如果我取消对这DefaultPublishHandler
条的 comments ,这些消息就会得到确认,订阅就会继续下go .