03. 连接与断开
MQTT 的连接过程

Client 发送CONNECT 数据包给Broker Broker 在收到CONNECT 数据包之后,给Client 返回一个CONNACK 数据包
CONNECT 数据包
连接的建立由
可变头
在
- 协议名称(Protocol Name
) :值固定为字符 “MQTT”。 - 协议版本(Protocol Level
) :对MQTT 3.1.1 来说,该值为4 。 - 用户名标识(User Name Flag
) :消息体中是否有用户名字段,1bit,0 或者1 。 - 密码标识(Password Flag
) :消息体中是否有密码字段,1bit,0 或者1 。 - 遗愿消息
Retain 标识(Will Retain) :标识遗愿消息是否是Retain 消息,1bit,0 或者1 。 - 遗愿消息
QOS 标识(Will QOS) :标识遗愿消息的QOS ,2bit,0、1 或者2 。 - 遗愿标识(Will Flag
) :标识是否使用遗愿消息,1bit,0 或者1 。 - 会话清除标识(Clean Session
) :标识Client 是否建立一个持久化的会话,1bit,0 或者1 。当该标识设为0 时,代表Client 希望建立一个持久会话的连接,Broker 将存储该Client 订阅的主题和未接受的消息,否则Broker 不会存储这些数据,同时在建立连接时清楚这个Client 之前存在的持久化会话所保存的数据。 - 连接保活(Keep Alive
) :设置一个以秒为单位的时间间隔,Client 和Broker 之间在这个时间间隔之内需要至少一次消息交互,否则Client 和Broker 会认为它们之间的连接已经断开。
消息体
-
客户端标识符(Client Identifier
) :Client Identifier 是用来标识Client 身份的字段,在MQTT 3.1.1 的版本中,这个字段的长度是1 到23 个字节,而且只能包含数字和26 个字母(包括大小写) ,Broker 通过这个字段来区分不同的Client 。所以在连接的时候,应该保证Client Identifier 是唯一的,所以我们可以使用UUID ,唯一的设备硬件标识,或者在Android 设备中使用的话,可以使用DEVICE_ID 等作为Client Identifier 的取值来源。MQTT 协议中要求Client 连接时必须带上Client Identifier ,但是也允许Broker 在Client Identifier 为空时,会为Client 分配一个内部唯一的Identifier 。如果需要持久化会话的话,那必须为Client 设定一个唯一的Identifier 。 -
用户名(Username
) :如果可变头中的用户名标识设为1 ,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的Client 进行验证,只允许已授权的Client 接入。注意不同的Client 需要使用不同的Client Identifier ,但它们可以使用同样的用户名和密码进行连接。 -
密码(Password
) :如果可变头中的密码标识设为1 ,那么消息体中将包含密码字段。 -
遗愿主题(Will Topic
) :如果可变头中的遗愿标识设为1 ,那么消息体中将包含遗愿主题,当Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。 -
遗愿消息(Will Message
) :如果可变头中的遗愿标识设为1 ,那么消息体中将包含遗愿消息,当Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。
CONNACK 数据包
当
可变头
-
会话存在标识(Session Present Flag
) :用于标识在Broker 上是否已存在该Client (用Client Identifier 区分)的持久性会话,1bit,0 或者1 。当Client 在连接时设置Clean Session=1
(会话清除标识见CONNECT 数据包的可变头) ,则CONNACK 中的Session Present Flag
始终为0 ;当Client 在连接时设置Clean Session=0
,那么存在下面两种情况 -
- 如果
Broker 上面保留了这个Client 之前留下的持久性会话,那么CONNACK 中的Session Present Flag
值为1 ; - 如果
Broker 上面没有保存这个Client 之前留下的会话数据,那么CONNACK 中的Session Present Flag
值为0 ;
- 如果
- 连接返回码(Connect Return code
) :用于标识Client 是Broker 的连接是否建立成功,连接返回码有以下一些值:Return code 连接状态0 连接已经建立1 连接被拒绝,不允许的协议版本2 连接被拒绝,Client Identifier 被拒绝3 连接被拒绝,服务器不可用4 连接被拒绝,错误的用户名或密码5 连接被拒绝,未授权Return code=2
代表的是Client Identifier 格式不规范,比如长度超过23 个字符,包含了不允许的字符等(部分Broker 的实现在协议标准上做了扩展,比如允许超过23 个字符的Client Identifer 等)Return code=4
在MQTT 协议中的含义是Username 和Password 的格式不正确,但是在大部分的Broker 实现中,使用错误的用户名和密码时返回的也是4, 所以可以认为4 表示就是错误的用户名或者密码;Return code=5
一般表示Broker 不使用用户名和密码验证而是采用IP 地址或者Client Identifier 进行认证的时候使用,来标识Client 没有通过验证。
消息头
MQTT 断开过程
- 一种是
Client 主动关闭连接 - 一种是
Broker 主动关闭连接
Client 主动关闭连接
为什么
Broker 主动关闭连接
3. 代码实践
下面使用
3.1. 建立持久会话的连接
import paho.mqtt.client as mqtt
'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
# 通过client_id指定Client Identifier
# 设置clean_session为False表示要建立一个持久性会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=False)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行
return code: 0
session present: 0
demo_mqtt
的SessionPresent
为
return code: 0
session present: 1
3.2. 建立非持久会话的连接
相比建立持久会话的连接的代码,只需要将clean_session
指定为了
import paho.mqtt.client as mqtt
'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
# 通过client_id指定Client Identifier
# 设置clean_session为True表示要建立一个非持久性的会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=True)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行该代码,输出结果为:
return code: 0
session present: 0
并且无论运行多少次,SessionPresent
都是为
paho mqtt 的Python 版本,默认clean_session
为True
3.3. 使用相同的Client Identifier 进行连接
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
mqtt_client = mqtt.Client(client_id="demo_mqtt")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
分别在两个终端中运行上述同样的代码,那么两个终端中会不停打印如下内容:
return code: 0
session present: 0
return code: 0
session present: 0
......
因为当两个loop_forever()
函数,这个函数会一直阻塞,直到disconnect()
,并且这个函数会在断开后自动重连。所以当连接被