代码:paho Retained 消息代码
Retained 消息和LWT 和Keep Alive
Retained 消息
- 一个
Topic 只能有一条Retained 消息,发布新的Retained 消息将覆盖老的Retained 消息(所以想删除一个Retained 消息也很简单,只要向这个主题发布一个Payload 长度为0 的Retained 消息就可以了) ; - 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的
Retained 消息; - 只有新的订阅者才会收到
Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到Retained 消息; Broker 收到Retained 消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息(Retained 标识为0 ) 。当有新订阅者的时候,Broker 会把保存的这条消息发给新订阅者(Retained 标识为1 ) 。
Retained 消息是Broker 为每一个Topic 单独存储的;- 持久性会话是
Broker 为每一个Client 单独存储的
1.1. 代码实践
下面是
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload="hello world", qos=0, retain=True)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
下面是
import paho.mqtt.client as mqtt
'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
print("granted_qos:", granted_qos)
'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
print("message retain", message.retain)
print("message topic", message.topic)
print("message payload", message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
print("subscribing")
client.subscribe("test", 0)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
在指定retain
为True
的情况下,先运行
subscribing
granted_qos: (0,)
message retain 1
message topic test
message payload b'hello world'
输出的信息中message retain
的值为
当再次运行
message retain 0
message topic test
message payload b'hello world'
上述的输出结果同message retain
的值为
2. LWT(Last Will and Testament)
- Will Flag:是否使用
LWT - Will QoS:发布遗愿消息时使用的
QoS - Will Retain:遗愿消息的
Retain 标识 - Will Topic:遗愿主题名,不可使用通配符
- Will Message:遗愿消息内容
Broker 检测到底层的I/O 异常;Client 未能在Keep Alive 的间隔内和Broker 之间有消息交互;Client 在关闭底层TCP 连接前没有发送DISCONNECT 数据包;Broker 因为协议错误关闭和Client 的连接,比如Client 发送了一个格式错误的MQTT 数据包。
如果
2.1. 代码实践:监控Client 的状态
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("will_test", payload="client is online", qos=1, retain=True)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.will_set("will_test", payload="client is offline", qos=1, retain=True)
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
而负责监控的代码,则订阅
import paho.mqtt.client as mqtt
def on_message(client, userdata, message):
print("message retain", message.retain)
print("message payload", message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
client.subscribe("will_test", 1)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
首先运行
message retain 1
message payload b'client is online'
因为
message retain 0
message payload b'client is offline'
因为在终止的时候已经订阅了相应的话题,所以当终止之后,虽然遗愿消息中的
message retain 1
message payload b'client is offline'
因为终止掉
3. Keep Alive(连接保活)
MQTT 协议是基于TCP 的一个应用层协议
在建立连接的时候,我们可以传递一个
- PINGREQ
- PINGRESP
对于
- 如果在一个
Keep Alive 时间间隔内,Client 和Broker 有过数据包传输,比如PUBLISH 数据包,Client 就没有必要再使用PINGREQ 了; Keep Alive 值是由Client 指定,不同的Client 可以指定不同的值;Keep Alive 的最大值为18 小时12 分15 秒即65535 秒;Keep Alive 的值设为0 的话,代表不使用Keep Alive 机制