代码:paho Retained消息代码

Retained消息和LWTKeep Alive

Retained消息

Retained消息是指在PUBLISH数据包中Retain标识设为1的消息,Broker收到这样的PUBLISH包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker会马上将这个消息发送给订阅者。有以下这些特点:

  • 一个Topic只能有一条Retained消息,发布新的Retained消息将覆盖老的Retained消息(所以想删除一个Retained消息也很简单,只要向这个主题发布一个Payload长度为0Retained消息就可以了
  • 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的Retained消息;
  • 只有新的订阅者才会收到Retained消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到Retained消息;
  • Broker收到Retained消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息(Retained标识为0。当有新订阅者的时候, Broker会把保存的这条消息发给新订阅者(Retained标识为1

Retained消息和持久性会话的区别:

  • Retained消息是Broker为每一个Topic单独存储的;
  • 持久性会话是Broker为每一个Client单独存储的

1.1.代码实践

下面是publisher的代码,在发送消息时指定retaintrue

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.publish("test", payload="hello world", qos=0, retain=True)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

下面是subscriber的代码

import paho.mqtt.client as mqtt

'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
    print("granted_qos:", granted_qos)

'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
    print("message retain", message.retain)
    print("message topic", message.topic)
    print("message payload", message.payload)

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        print("subscribing")
        client.subscribe("test", 0)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

在指定retainTrue的情况下,先运行publisher的代码,之后再运行subscriber的代码,在subscriber运行的终端界面输出如下信息:

subscribing
granted_qos: (0,)
message retain 1
message topic test
message payload b'hello world'

输出的信息中message retain的值为1,表示收到的消息为retained消息。

当再次运行publisher的代码,运行subscriber的控制台会输出如下内容:

message retain 0
message topic test
message payload b'hello world'

上述的输出结果同Retained消息特点中的第四点“Broker收到Retained消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息”所述一致,因为当前订阅者已经订阅了相应的话题,当Broker收到Retained消息之后,先保存下来,然而因为这个消息对于当前已经订阅了相应话题的订阅者来说是一个普通的消息所以message retain的值为0

2. LWT(Last Will and Testament)

LWT是之前讲过的Client连接到Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。当Broker检测到Client非正常地断开连接的时候,就会向遗愿主题发布一条消息。遗愿相关的设置是在建立连接的时候,CONNECT数据包里面指定的。包括以下这些设置:

  • Will Flag:是否使用LWT
  • Will QoS:发布遗愿消息时使用的QoS
  • Will Retain:遗愿消息的Retain标识
  • Will Topic:遗愿主题名,不可使用通配符
  • Will Message:遗愿消息内容

Broker在以下情况下认为Client是非正常断开连接的:

  • Broker检测到底层的I/O异常;
  • Client未能在Keep Alive的间隔内和Broker之间有消息交互;
  • Client在关闭底层TCP连接前没有发送DISCONNECT数据包;
  • Broker因为协议错误关闭和Client的连接,比如Client发送了一个格式错误的MQTT数据包。

如果Client通过发布DISCONNECT数据包断开连接,是属于正常断开连接,不会触发LWT的机制,同时Broker会丢掉这个Client在连接时指定的LWT参数。

2.1.代码实践:监控Client的状态

Client在连接的时候,指定Will Topicwill_testWill Message"client is offline",并设置该消息的QoS1retain也置为True(设置为True表示会被Broker保留,同Retained消息。同时在连接成功之后,向主题will_test发布一个内容为"client is online"Retained消息。这样订阅者,无论在任何时候订阅"will_test",都会获取Client当前的连接状态。client_will.py代码如下:

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.publish("will_test", payload="client is online", qos=1, retain=True)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.will_set("will_test", payload="client is offline", qos=1, retain=True)
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

而负责监控的代码,则订阅will_test,订阅的QoS1client_monitor_will.py代码如下:

import paho.mqtt.client as mqtt

def on_message(client, userdata, message):
    print("message retain", message.retain)
    print("message payload", message.payload)

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        client.subscribe("will_test", 1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

首先运行client_will.py,之后再运行client_monitor_will.py,终端输出如下信息:

message retain 1
message payload b'client is online'

因为client_will.py运行之后,发布了一个Retained消息,当运行client_monitor_will.py之后,因为订阅了相应的话题,所以会收到该消息。这时候终止掉client_will.py的运行,输出如下信息:

message retain 0
message payload b'client is offline'

因为在终止的时候已经订阅了相应的话题,所以当终止之后,虽然遗愿消息中的retain被设为1了,但是对当前的订阅者来说是普通消息,所以message retain0。当这个时候终止掉client_monitor_will.py的运行,再次重新运行client_monitor_will.py,输出如下信息:

message retain 1
message payload b'client is offline'

因为终止掉client_will.py的时候,发送的遗愿消息的retain被设为了1Broker会保证发送的遗愿消息,当新的订阅者出现的时候,会把这个Retained消息发送给订阅者。

3. Keep Alive(连接保活)

Broker需要知道Client是否正常地断开了和它的连接,以发送遗愿消息。实际上Client也需要能够很快地检测它失去了和Broker的连接,以便重新连接,虽然TCP协议在丢失连接时会通知上层应用,但是TCP有一个半打开连接的问题(half-open connection,在这种状态下,一端的TCP连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见。所以仅仅依赖TCP的连接状态检测是不够的,于是MQTT协议设计了一套Keep Alive机制。

MQTT协议是基于TCP的一个应用层协议

在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中规定:**1.5倍的Keep Alive(1.5*Keep Alive)的时间间隔内,如果Broker没有收到来自Client的任何数据包,那么Broker认为它和Client之间的连接已经断开;同样如果Client没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开。**BrokerClient之间没有任何数据包传输的时候,MQTT中通过PINGREQ/PINGRESP来满足Keep Alive的约定和侦测连接状态。

  • PINGREQ

PINGREQ数据包中没有可变头和消息体,当Client在一个Keep Alive时间间隔内没有向Broker发送任何数据包,比如PUBLISHSUBSCRIBE的时候,它应该向Broker发送PINGREQ数据包。

  • PINGRESP

PINGRESP数据包中没有可变头和消息体,当Broker收到来自ClientPINGREQ数据包之后,它会回复Client一个PINGRESP数据包。

对于Keep ALive机制,还需要注意以下几点:

  • 如果在一个Keep Alive时间间隔内,ClientBroker有过数据包传输,比如PUBLISH数据包,Client就没有必要再使用PINGREQ了;
  • Keep Alive值是由Client指定,不同的Client可以指定不同的值;
  • Keep Alive的最大值为18小时1215秒即65535秒;
  • Keep Alive的值设为0的话,代表不使用Keep Alive机制