03.连接与断开

MQTT的连接过程

Client建立到Broker的连接过程如下图所示:

Client 连接到 Broker

  • Client发送CONNECT数据包给Broker
  • Broker在收到CONNECT数据包之后,给Client返回一个CONNACK数据包

CONNECT数据包

连接的建立由Client发起,Client端首先向Broker发送一个CONNECT数据包,CONNECT数据包包含以下内容。

可变头

CONNECT数据包可变头中,包含以下信息:

  • 协议名称(Protocol Name:值固定为字符 “MQTT”。
  • 协议版本(Protocol Level:对MQTT 3.1.1来说,该值为4
  • 用户名标识(User Name Flag:消息体中是否有用户名字段,1bit,0或者1
  • 密码标识(Password Flag:消息体中是否有密码字段,1bit,0或者1
  • 遗愿消息Retain标识(Will Retain:标识遗愿消息是否是Retain消息,1bit,0或者1
  • 遗愿消息QOS标识(Will QOS:标识遗愿消息的QOS,2bit,0、1或者2
  • 遗愿标识(Will Flag:标识是否使用遗愿消息,1bit,0或者1
  • 会话清除标识(Clean Session:标识Client是否建立一个持久化的会话,1bit,0或者1。当该标识设为0时,代表Client希望建立一个持久会话的连接,Broker将存储该Client订阅的主题和未接受的消息,否则Broker不会存储这些数据,同时在建立连接时清楚这个Client之前存在的持久化会话所保存的数据。
  • 连接保活(Keep Alive:设置一个以秒为单位的时间间隔,ClientBroker之间在这个时间间隔之内需要至少一次消息交互,否则ClientBroker会认为它们之间的连接已经断开。

消息体

CONNECT数据包的消息体中包含以下数据:

  • 客户端标识符(Client IdentifierClient Identifier是用来标识Client身份的字段,在MQTT 3.1.1的版本中,这个字段的长度是123个字节,而且只能包含数字和26个字母(包括大小写Broker通过这个字段来区分不同的Client。所以在连接的时候,应该保证Client Identifier是唯一的,所以我们可以使用UUID,唯一的设备硬件标识,或者在Android设备中使用的话,可以使用DEVICE_ID等作为Client Identifier的取值来源。MQTT协议中要求Client连接时必须带上Client Identifier,但是也允许BrokerClient Identifier为空时,会为Client分配一个内部唯一的Identifier。如果需要持久化会话的话,那必须为Client设定一个唯一的Identifier

  • 用户名(Username:如果可变头中的用户名标识设为1,那么消息体中将包含用户名字段,Broker可以使用用户名和密码来对接入的Client进行验证,只允许已授权的Client接入。注意不同的Client需要使用不同的Client Identifier,但它们可以使用同样的用户名和密码进行连接。

  • 密码(Password:如果可变头中的密码标识设为1,那么消息体中将包含密码字段。

  • 遗愿主题(Will Topic:如果可变头中的遗愿标识设为1,那么消息体中将包含遗愿主题,当Client非正常地中断连接的时候,Broker将向指定的遗愿主题中发布遗愿消息。

  • 遗愿消息(Will Message:如果可变头中的遗愿标识设为1,那么消息体中将包含遗愿消息,当Client非正常地中断连接的时候,Broker将向指定的遗愿主题中发布由该字段指定的内容。

CONNACK数据包

Broker收到ClientCONNECT数据包之后,将检查并检验CONNECT数据包的内容,之后回复Client一个CONNACK数据包。CONNACK数据包包含以下内容。

可变头

CONNACK数据包的可变头中,包含以下信息:

  • 会话存在标识(Session Present Flag:用于标识在Broker上是否已存在该Client(用Client Identifier区分)的持久性会话,1bit,0或者1。当Client在连接时设置Clean Session=1(会话清除标识见CONNECT数据包的可变头,则CONNACK中的Session Present Flag始终为0;当Client在连接时设置 Clean Session=0,那么存在下面两种情况

    • 如果Broker上面保留了这个Client之前留下的持久性会话,那么CONNACK中的Session Present Flag值为1
    • 如果Broker上面没有保存这个Client之前留下的会话数据,那么CONNACK中的Session Present Flag值为0

Session Present Flag这个特性是在MQTT3.1.1版本中新加入的,之前的版本中没有这个标识

  • 连接返回码(Connect Return code:用于标识ClientBroker的连接是否建立成功,连接返回码有以下一些值: Return code连接状态0连接已经建立1连接被拒绝,不允许的协议版本2连接被拒绝,Client Identifier被拒绝3连接被拒绝,服务器不可用4连接被拒绝,错误的用户名或密码5连接被拒绝,未授权Return code=2 代表的是Client Identifier格式不规范,比如长度超过23个字符,包含了不允许的字符等(部分Broker的实现在协议标准上做了扩展,比如允许超过23个字符的Client Identifer等) Return code=4MQTT协议中的含义是UsernamePassword的格式不正确,但是在大部分的Broker实现中,使用错误的用户名和密码时返回的也是4,所以可以认为4表示就是错误的用户名或者密码; Return code=5一般表示Broker不使用用户名和密码验证而是采用IP地址或者Client Identifier进行认证的时候使用,来标识Client没有通过验证。

消息头

CONNACK没有消息体。综上所述当ClientBroker发送了CONNECT数据包并获得了Return code0CONNACK数据包后,则代表连接建立成功了。之后则可以进行消息的发布和订阅了。

MQTT断开过程

MQTT的断开过程分为以下两种:

  • 一种是Client主动关闭连接
  • 一种是Broker主动关闭连接

Client主动关闭连接

Client主动关闭连接的流程非常简单,只需要ClientBroker发送一个DISCONNECT数据包就可以了。DISCONNECT数据包没有可变头(Variable header)和消息体(Payload。在Client发送完DISCONNECT之后,就可以关闭底层的TCP连接了,不需要等待Broker的回复(Broker也不会对DISCONNECT数据包回复

为什么Client关闭TCP连接之前,要发送一个和Broker没有交互的数据包,而不是关闭底层的TCP连接的?因为这涉及到MQTT协议的一个特性,在MQTT协议中,Broker需要判断Client是否是正常的断开连接。当Broker收到ClientDISCONNECT数据包的时候,Broker则认为Client是正常断开连接的,那么会丢弃当前连接指定的遗愿消息。如果Broker检测到Client连接丢失,但是又没有收到DISCONNECT数据包,则认为Client是非正常断开的,就会向在连接的时候指定的遗愿主题发布遗愿消息。

Broker主动关闭连接

MQTT协议规定Broker在没有收到ClientDISCONNECT数据包之前都应该和Client保持连接。只有当BrokerKeep Alive的时间间隔内,没有收到Client的任何MQTT数据包的时候会主动关闭连接。一些Broker的实现在MQTT协议上做了一些拓展,支持Client的连接管理,可以主动和某个Client断开连接。

Broker主动关闭连接之前不会向Client发送任何MQTT数据包,而是直接关闭底层的TCP连接。

3.代码实践

下面使用Pythonpaho mqtt库来实现MQTT的连接,Broker的话使用自己搭建的mosquitto

3.1.建立持久会话的连接

import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为False表示要建立一个持久性会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=False)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行Python代码之后,输出结果如下所示:

return code: 0
session present: 0

return code0表示连接已成功建立,因为demo_mqttClient第一次建立,所以SessionPresent0。再次运行,输出结果变成如下:

return code: 0
session present: 1

3.2.建立非持久会话的连接

相比建立持久会话的连接的代码,只需要将clean_session指定为了True即可建立一个非持久会话的连接了。

import paho.mqtt.client as mqtt

'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])


# 通过client_id指定Client Identifier
# 设置clean_session为True表示要建立一个非持久性的会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=True)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行该代码,输出结果为:

return code: 0
session present: 0

并且无论运行多少次,SessionPresent都是为0

paho mqttPython版本,默认clean_sessionTrue

3.3.使用相同的Client Identifier进行连接

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("return code:", rc)
    print("session present:", flags['session present'])

mqtt_client = mqtt.Client(client_id="demo_mqtt")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

分别在两个终端中运行上述同样的代码,那么两个终端中会不停打印如下内容:

return code: 0
session present: 0
return code: 0
session present: 0
......

因为当两个Client中使用同样的Client Identifie进行连接时,那么第二个Client连接成功后,Broker会关闭和第一个已经连接上的Client连接。然而因为我们使用了loop_forever()函数,这个函数会一直阻塞,直到Client调用disconnect(),并且这个函数会在断开后自动重连。所以当连接被Broker关闭时,它又会尝试重新连接,结果就是这两个Client交替地把对方顶下线,因此在使用中我们需要保证每一个设备使用的Client Identifier是唯一的。

上一页
下一页