消息的发布和订阅

消息的发布和订阅

MQTT 的发布

MQTT 发布中最重要的是 PUBLISH 数据包,PUBLISH 数据包是用于 sender 和 receiver 之间传输消息数据的。当 Publisher 要向某个 Topic 发布一条消息的时候,Publisher 会向 Broker 发送一个 PUBLISH 数据包;当 Broker 要将一条消息转发给订阅了某条主题的 Subscriber 时,Broker 也会向该 Subscriber 发送一个 PUBLISH 数据包。因为 PUBLISH 传输过程中涉及到了 QoS,Recevier 收到 sender 的 PUBLISH 数据包之后会根据 QoS 的不同,还有后续不同的应答流程(只有当 QoS 为 0 时,Receiver 不做任何应答),所以关于这个具体的流程,在 QoS 那一章节进行讲述。下面对 PUBLISH 数据包进行讲解:

PUBLISH 数据包

固定头

PUBLISH 的固定头包含了一下内容:

  • 消息重复标识(DUP flag):1bit,0 或者 1,当 DUP flag = 1 的时候,代表该消息是一条重发消息,因 Receiver 没有确认收到之前的消息而重新发送的。这个标识只在 QoS 大于 0 的消息中使用。
  • QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level。
  • Retain 标识(Retain flag):1bit,0 或者 1。在从 Client 发送到 Broker 的 PUBLISH 消息中被设为 1 的时候,Broker 应该保存该条消息,当之后有任何新的 Subscriber 订阅 PUBLISH 消息中指定的主题时,都会先收到该条消息,这种消息也叫 Retained 消息。在从 Broker 发送到 Client 的 PUBLISH 消息中被设为 1 的时候,代表该条消息是一条 Retained 消息。

可变头

  • 数据包标识( Packet Identifier):2 字节,用来标识一个唯一数据包。数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互(比如发送、应答为一次交互)中保持唯一就好,只在 QoS 大于 1 的消息中使用,因为 QoS 大于 1 的消息有应答流程。
  • 主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格)。但是在实际项目中,我们最好还是遵循以下一些最优方法。

消息体

PUBLISH 数据包的消息体中包含的是该消息要发送的具体数据,数据可以是任何格式的:二进制数据、文本、JSON 等都可以。

MQTT 的订阅

订阅主题的流程如下图所示:

订阅流程

  • Client 向 Broker 发送一个 SUBSCRIBE 数据包,该数据包中含有 Client 想要订阅的主题和其他一些参数;
  • Broker 收到 SUBSCRIBE 数据包后,向 Client 发送一个 SUBACK 数据包作为应答。

SUBSCRIBE 数据包

可变头

数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

消息体

  • 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。 主题名说明 主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫 SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。

    • 单层通配符“+”:“+”可以用来指代任意一个层级。 举例: 如“sensor/+/tem”,可以匹配:

      • sensor/data/tem
      • sensor/cmd/tem

不可以匹配:

    • sensor/data/01/tem

    • 多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指代任意多个层。**但是"#“必须是 Topic Filter 的最后一个字符,同时必须跟在“/“后面,除非 Topic Filter 只包含一个”#“这一个字符。**如“#”是一个合法的 Topic Filter,而“sensor#”不是一个合法的 Topic Filter。 举例: 如“sensor/data/#”,可匹配:

      • sensor/data
      • sensor/data/tem
      • sensor/data/tem/01
      • sensor/data/tem/01/02

不可以匹配:

      • sensor/cmd/tem

SUBSCRIBE 数据包中 QoS 代表针对某一个或着某一组的主题,Client 希望 Broker 在转发来自这些主题的消息给它时,消息使用的 QoS 级别。

SUBACK 数据包

为确认每一次的订阅,Broker 收到 SUBSCRIBE 之后会回复一个 SUBACK 数据包作为应答。SUBACK 数据包包含以下内容:

可变头

数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

消息体

返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。

返回码含义 0 订阅成功, 最大可用 QoS 为 01 订阅成功,最大可用 QoS 为 12 订阅成功, 最大可用 QoS 为 2128 订阅失败

返回码 0~2 表示订阅成功,并且 Broker 授予 Subscriber 不同等级的 QoS,这个等级可能会和 SubScriber 在 SUBSCRIBE 数据包中要求的不一样。128 表示订阅失败,可能是没有权限订阅这个主题,或者订阅主题的格式不对。

MQTT 的取消订阅

Subscriber 也可以取消对某些主题的订阅,取消订阅的流程如下图所示:

MQTT 的取消订阅

  • Subscriber 向 Broker 发送一个 UNSUBSCRIBE 数据包,该数据包包含想要取消订阅的主题;
  • Broker 收到 UNSUBSCRIBE 数据包之后,向 subscriber 发送一个 UNSUBACK 数据包作为应答。

UNSUBSCRIBE 数据包

可变头

数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

消息体

主题列表(List of Topics):UNSUBSCRIBE 的消息体中包含 Client 想要取消订阅的主题过滤器列表,这些主题过滤器和 SUBSCRIBE 数据包中一样,可以包含通配符。UNSUBSCRIBE 消息体里面不再包含主题过滤器对应的 QoS 了。

UNSUBACK 数据包

Broker 收到 UNSUBSCRIBE 数据包之后会回复一个 UNSUBACK 数据包作为应答。UNSUBACK 数据包内容如下:

可变头

数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

消息体

UNSUBACK 数据包没有消息体。

4. 代码实践

4.1. 发布消息

向一个主题发布一条 QoS 为 1 的数据包,发送成功之后断开连接:

import paho.mqtt.client as mqtt

def on_publish(client, userdata, mid):
    print("message ID ", mid)
    client.disconnect()

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.publish("test", payload="hello world", qos=1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码,输出如下:

message ID  1

相应订阅了test主题的订阅方输出如下:

img

4.2. 订阅消息

通常我们在建立和 Broker 的连接之后就可以开始订阅了,如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要。相应的代码如下:

import paho.mqtt.client as mqtt

'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):
    print("granted_qos:", granted_qos)

'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):
    print(message.topic, message.payload)

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        if flags["session present"] == 0:
            print("subscribing")
            client.subscribe("test", 1)
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码得到如下输出结果:

subscribing
granted_qos: (1,)

当运行的发布消息中的代码之后,将输出:

test b'hello world'

当终止掉订阅消息中的代码运行之后,再次运行该代码,会发现什么都不输出。因为第一次运行的时候,Broker 上面没有保存这个 Client 的会话,所以需要进行订阅,当重新运行之后,因为 Broker 上面已经保存了这个 Client 的会话,所以就不需要再订阅了,你就不会看到订阅相关的输出了。

4.3. 取消订阅

在上述订阅消息中建立连接并订阅了相应主题的基础上,我们取消对之前订阅的主题

import paho.mqtt.client as mqtt

'''
当代理响应取消订阅请求时调用
'''
def on_unsubscribe(client, userdata, mid):
    print("message id:", mid)
    client.disconnect()

def on_connect(client, userdata, flags, rc):
    if rc == 0 :
        print("unsubscribing")
        client.unsubscribe("test")
    else:
        print("connection failed ", rc)

mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_unsubscribe = on_unsubscribe

mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

相应的输出如下:

unsubscribing
message id: 1

之后再运行订阅消息中的代码和发布消息中的代码,此时运行订阅消息的终端不再有输出。

上一页