QoS
MQTT 的QoS 介绍
MQTT 中的QoS 等级
- QoS0,At most once,至多一次;
- QoS1,At least once,至少一次;
- QoS2,Exactly once,确保只有一次。
QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说Sender 尽力向Receiver 发送消息,如果发送失败,也就算了;QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说Sender 向Receiver 发送消息,如果发送失败,会继续重试,直到Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说Sender 尽力向Receiver 发送消息,如果发送失败,会继续重试,直到Receiver 收到消息为止,同时保证Receiver 不会因为消息重传而收到重复的消息。
QoS0

QoS1

Sender 向Receiver 发送一个带有数据的PUBLISH 包,并在本地保存这个PUBLISH 包;Receiver 收到PUBLISH 包以后,向Sender 发送一个PUBACK 数据包,PUBACK 数据包没有消息体(Payload) ,在可变头中有一个包标识(Packet Identifier) ,和它收到的PUBLISH 包中的Packet Identifier 一致。Sender 收到PUBACK 之后,根据PUBACK 包中的Packet Identifier 找到本地保存的PUBLISH 包,然后丢弃掉,一次消息的发送完成。
但是消息传递流程中可能会出现问题:
- 如果
Sender 在一段时间内没有收到PUBLISH 包对应的PUBACK ,它将该PUBLISH 包的DUP 标识设为1 (代表是重新发送的PUBLISH 包) ,然后重新发送该PUBLISH 包。 Receiver 可能会重复收到消息,需自行去重。
QoS2
相比

Sender 发送QoS 为2 的PUBLISH 数据包,数据包Packet Identifier 为P ,并在本地保存该PUBLISH 包;Receiver 收到PUBLISH 数据包后,在本地保存PUBLISH 包的Packet Identifier P ,并回复Sender 一个PUBREC 数据包,PUBREC 数据包可变头中的Packet Identifier 为P ,没有消息体(Payload) ;- 当
Sender 收到PUBREC ,它就可以安全的丢弃掉初始Packet Identifier 为P 的PUBLISH 数据包。同时保存该PUBREC 数据包,并回复Receiver 一个PUBREL 数据包,PUBREL 数据包可变头中的Packet Identifier 为P ,没有消息体; - 当
Receiver 收到PUBREL 数据包,它可以丢掉保存的PUBLISH 包的Packet Identifier P ,并回复Sender 一个可变头中Packet Identifier 为P ,没有消息体(Payload)的PUBCOMP 数据包; - 当
Sender 收到PUBCOMP 包,那么认为传输已完成,则丢掉对应的PUBREC 数据包;
上面是一次完整无误的传输过程,然而传输过程中可能会出现以下情况:
- 情况
1 :Sender 发送PUBLISH 数据包给Receiver 的时候,发送失败; - 情况
2 :Sender 已经成功发送PUBLISH 数据包给Receiver 了,但是Receiver 发送PUBREC 数据包失败; - 情况
3 :Sender 已经成功收到了PUBREC 数据包,但是PUBREL 数据包发送失败; - 情况
4 :Receiver 已经收到了PUBREL 数据包,但是发送PUBCOMP 数据包时发送失败
针对上述的问题,较为详细的处理方法如下:
- 不管是情况
1 还是情况2 ,因为Sender 在一定时间内没有收到PUBREC ,那么它会把PUBLISH 包的DUP 标识设为1 ,重新发送该PUBLISH 数据包; - 不管是情况
3 还是情况4 ,因为Sender 在一定时间内没有收到PUBCOMP 包,那么它会重新发送PUBREL 数据包; - 针对情况
2 ,Receiver 可能会收到多个重复的PUBLISH 包,更加完善的处理如下:Receiver 在收到PUBLISH 数据包之后,马上回复一个PUBREC 数据包。并会在本地保存PUBLISH 包的Packet Identifier P ,不管之后因为重传多少次这个Packet Identifier 为P 的数据包,Receiver 都认为是重复的,丢弃。同时Receiver 接收到QoS 为2 的PUBLISH 数据包后,** 并不马上投递给上层,** 而是在本地做持久化,将消息保存起来(这里需要是持久化而不是保存在内存) 。 - 针对情况
4 ,更加完善的处理如下:Receiver 收到PUBREL 数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P ,并发送PUBCOMP 数据包,销毁之前的持久化消息。之后不管接收到多少个PUBREL 数据包,因为没有Packet Identifier P ,直接回复PUBCOMP 数据包即可。
QoS 降级
在
Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)
如下面代码所示:该
import paho.mqtt.client as mqtt
'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
print("granted_qos:", granted_qos)
'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
print("message qos", message.qos)
print("message topic", message.topic)
print("message payload", message.payload)
def on_connect(client, userdata, flags, rc):
if rc == 0 :
print("subscribing")
client.subscribe("test", 1)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行上述代码输出的结果为:
subscribing
granted_qos: (1,)
之后运行下面的
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload="hello world", qos=0)
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
结果上面运行
message qos 0
message topic test
message payload b'hello world'
上述的结果表示,订阅者收到的消息的
QoS 和会话
如果
QoS 等级使用建议
在以下情况下你可以选择
Client 和Broker 之间的网络连接非常稳定,例如一个通过有线网络连接到Broker 的测试用Client ;- 可以接受丢失部分消息,比如你有一个传感器以非常短的间隔发布状态数据,所以丢一些也可以接受;
- 你不需要离线消息。
在以下情况下你应该选择
- 你需要接收所有的消息,而且你的应用可以接受并处理重复的消息;
- 你无法接受
QoS2 带来的额外开销,QoS1 发送消息的速度比QoS2 快很多。
在以下情况下你应该选择
- 你的应用必须接收到所有的消息,而且你的应用在重复的消息下无法正常工作,同时你也能接受
QoS2 带来的额外开销。