消息的发布和订阅
消息的发布和订阅
MQTT 的发布
PUBLISH 数据包
固定头
- 消息重复标识(DUP flag
) :1bit,0 或者1 ,当DUP flag = 1 的时候,代表该消息是一条重发消息,因Receiver 没有确认收到之前的消息而重新发送的。这个标识只在QoS 大于0 的消息中使用。 - QoS:2bit,0、
1 或者2 ,代表PUBLISH 消息的QoS level 。 Retain 标识(Retain flag) :1bit,0 或者1 。在从Client 发送到Broker 的PUBLISH 消息中被设为1 的时候,Broker 应该保存该条消息,当之后有任何新的Subscriber 订阅PUBLISH 消息中指定的主题时,都会先收到该条消息,这种消息也叫Retained 消息。在从Broker 发送到Client 的PUBLISH 消息中被设为1 的时候,代表该条消息是一条Retained 消息。
可变头
- 数据包标识( Packet Identifier
) :2 字节,用来标识一个唯一数据包。数据包标识只需要保证在从Sender 到Receiver 的一次消息交互(比如发送、应答为一次交互)中保持唯一就好,只在QoS 大于1 的消息中使用,因为QoS 大于1 的消息有应答流程。 - 主题名称(Topic Name
) :主题名称是一个UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于1 任何一个字符串(可包含空格) 。但是在实际项目中,我们最好还是遵循以下一些最优方法。
消息体
MQTT 的订阅
订阅主题的流程如下图所示:

Client 向Broker 发送一个SUBSCRIBE 数据包,该数据包中含有Client 想要订阅的主题和其他一些参数;Broker 收到SUBSCRIBE 数据包后,向Client 发送一个SUBACK 数据包作为应答。
SUBSCRIBE 数据包
可变头
数据包标识(Packet Identifier
消息体
-
订阅列表(List of Subscriptions
) :SUBSCRIBE 的消息体中包含Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的QoS 组成。 主题名说明 主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和PUBLISH 中的主题区分,我们叫SUBSCRIBE 中的主题名为主题过滤器(Topic Filter) 。 -
-
单层通配符“+”
: “+”可以用来指代任意一个层级。 举例: 如“sensor/+/tem”,可以匹配: -
- sensor/data/tem
- sensor/cmd/tem
-
不可以匹配:
-
-
sensor/data/01/tem
-
多层通配符“#”
: “#”和“+”的区别在于, “#”可以用来指代任意多个层。** 但是"# “必须是Topic Filter 的最后一个字符,同时必须跟在“/“后面,除非Topic Filter 只包含一个”#“这一个字符。** 如“#”是一个合法的Topic Filter ,而“sensor#”不是一个合法的Topic Filter 。 举例: 如“sensor/data/#”,可匹配: -
- sensor/data
- sensor/data/tem
- sensor/data/tem/01
- sensor/data/tem/01/02
-
不可以匹配:
-
-
- sensor/cmd/tem
-
SUBACK 数据包
为确认每一次的订阅,
可变头
数据包标识(Packet Identifier
消息体
返回码(return codes
返回码含义
返回码
MQTT 的取消订阅

Subscriber 向Broker 发送一个UNSUBSCRIBE 数据包,该数据包包含想要取消订阅的主题;Broker 收到UNSUBSCRIBE 数据包之后,向subscriber 发送一个UNSUBACK 数据包作为应答。
UNSUBSCRIBE 数据包
可变头
数据包标识(Packet Identifier
消息体
主题列表(List of Topics
UNSUBACK 数据包
可变头
数据包标识(Packet Identifier
消息体
4. 代码实践
4.1. 发布消息
向一个主题发布一条
import paho.mqtt.client as mqtt
def on_publish(client, userdata, mid):
print("message ID ", mid)
client.disconnect()
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload="hello world", qos=1)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行上述代码,输出如下:
message ID 1
相应订阅了test
主题的订阅方输出如下:

4.2. 订阅消息
通常我们在建立和
import paho.mqtt.client as mqtt
'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
print("granted_qos:", granted_qos)
'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
print(message.topic, message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
if flags["session present"] == 0:
print("subscribing")
client.subscribe("test", 1)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行上述代码得到如下输出结果:
subscribing
granted_qos: (1,)
当运行的发布消息中的代码之后,将输出:
test b'hello world'
当终止掉订阅消息中的代码运行之后,再次运行该代码,会发现什么都不输出。因为第一次运行的时候,
4.3. 取消订阅
在上述订阅消息中建立连接并订阅了相应主题的基础上,我们取消对之前订阅的主题
import paho.mqtt.client as mqtt
'''
当代理响应取消订阅请求时调用
'''
def on_unsubscribe(client, userdata, mid):
print("message id:", mid)
client.disconnect()
def on_connect(client, userdata, flags, rc):
if rc == 0 :
print("unsubscribing")
client.unsubscribe("test")
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_unsubscribe = on_unsubscribe
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
相应的输出如下:
unsubscribing
message id: 1
之后再运行订阅消息中的代码和发布消息中的代码,此时运行订阅消息的终端不再有输出。