MQTT 系列之 MQTT broker 的连接

Python015

MQTT 系列之 MQTT broker 的连接,第1张

client 在可以发布和订阅消息之前,必须先连接到 broker,下面我们来看一下连接到 broker 的流程。

连接的建立由 Client 端发起,Client 端首先向 broker 发送一个 CONNECT 数据包,CONNECT 数据包包含以下内容(这里我们略过 fixed header)。

在 CONNECT 数据包可变头中,含有以下信息。

协议名称(Protocol name) :值固定为 MQTT 字符。

协议版本 :对于 MQTT3.1.1 来说,值为 4.

用户名标识 :消息体中是否有用户名字段, 1bit , 0 或 1。

密码标识 :消息体中是否有密码字段,1bit,0 或 1。

遗愿消息 Retain 标志(will retain) :标识遗愿消息是否是 retain 消息,1 bit,0 或 1。

遗愿消息 QoS 标志 (will QoS) :标志遗愿消息的 QoS 是否存在,1 bit,0 或 1。

会话清除标志(clean session) :标识 Client 是否建立一个持久化的会话,1bit,0或1,当 clean session 的标识为 0 时,代表 client 希望建立一个之久的会话连接,broker 间存储该 client 订阅的主题和未接受的消息,否则broker不会存储这些数据,同时在建立连接时清除这个 client 之前的持久化会话保存的数据。

心跳保持(keep alive) :设置一个单位为秒的时间间隔,client 和 broker 之间在这个时间内至少需要进行一次消息交互,否则 client 和 broker 会认为它们之间的连接已断开。

CONNECT 数据包的消息体中包含了以下数据。

客户端标识符(client identifier) :Client Identifier 是用来标识 Client 身份的字段,在 MQTT3.1.1 的版本中,这个字段的长度是 1 到 23个字节,而且只能包含数字和26个字母(包括大小写),broker 通过这个字段来区分不同的 client。所有在连接的时候,client应该保证它的 identifier 是唯一的,通常我们可以使用比如 UUID,唯一的设备硬件标识,或者 Android 设备的 DEVICE_ID 等作为 Client identifier 的取值来源。MQTT 协议中要求 Client 连接时必须带上 Client Identifier,但是也允许 broker 在实现时 Client Identifier 为空,这时 Broker 会为 Client 分配一个内部唯一的 Identifier。如果你需要使用持久化会话,那就必须自己为 Client 设定一个唯一的 Identifier。

用户名(Username) :如果可变头中的用户标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已经授权的 Client 接入。注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。

密码(password) :如果可变头中的密码标识为1,那么消息体中将包含密码字段。

遗愿主题(will topic) :如果可变头中遗愿标识设为1,那么消息体中将包遗愿主题,当 Client 非正常地中断连接的时候,Broker将向指定的遗愿主题中发布遗愿消息。

遗愿消息(will message) :如果可变头中的遗愿标志为1,那么消息体中将包含遗愿消息,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。

当 broker 收到 client 的 CONNECT 数据包之后,将检查并校验 CONNECT 数据包的内容,之后回复 Client 一个 CONNACK 数据包。

CONNACK 数据包包含以下内容(这里略过 Fixed header)。

CONNACK 数据包的可变头中,含有如下信息:

会话存在标识(Session Present Flag) :用于标识在 Broker 上,是否存在该 client(用 client identifier 区分)的持久性会话,1bit,0或1。当 Client 在连接时设置 clean session = 1 ,则 CONNACK 中的 Session Present Flag 始终为 0;当 client 在连接时设置 clean session = 0 时,那么分为两种情况:如果 broker 上面保存了这个 Client 之前留下的持久性会话,那么 CONNACK 中的 session present flag 值为 1,如果 broker 没有保存该 client 的任何会话数据,那么 CONNACK 中 session present flag 值为 0.

连接返回码: 用于标识 client 是否连接建立成功,连接返回码如下:

在这里强调一下 code 4 和 5。Return Code 4 在 MQTT 协议中的含义是 Username 和 Password 的格式不正确,但是在大部分的 Broker 实现中,在使用错误的用户名密码时,得到的返回码也是 4。所以这里我们认为 4 就是代表错误的用户名或密码。Return Code 5 一般在 Broker 不使用用户名和密码而使用 IP 地址或者 Client Identifier 进行验证的时候使用,来标识 Client 没有通过验证。

CONNACK 没有 payload。

Client 主动关闭连接的流程很简单,只需要向 broker 发送一个 DISCONNECT 数据包就可以了。DISCONNECT 数据包没有可变头和消息体。在 Client 发送完 DISCONNECT 后,无需等待 broker 的回复(broker 也不会有回复),直接关闭底层的 tcp 连接即可。

MQTT 协议规定 Broker 在没有收到 Client 的 DISCONNECT 数据包之前都应该保持和 Client 连接,只有 Broker 在 Keep Alive 的时间间隔里,没有收到 Client 的任何 MQTT 数据包的时候会主动关闭连接。一些 Broker 的实现在 MQTT 协议上做了一些拓展,支持 Client 的连接管理,可以主动地断开和某个 Client 的连接。

Broker 主动关闭连接之前不会向 Client 发送任何 MQTT 数据包,直接关闭底层的 TCP 连接就完事了。

MQTT协议是广泛应用的物联网协议,使用测试MQTT协议需要MQTT的代理。有两种方法使用MQTT服务,一是租用现成的MQTT服务器,如阿里云,百度云,华为云等公用的云平台提供的MQTT服务,使用公用的MQTT服务器的好处是省事,但如果仅仅用于测试学习还需要注册帐号,灵活性差些,有的平台还需要付费。另一方法是自己使用开源的MQTT组件来搭建。

MQTT服务器非常多,如apache的ActiveMQ,emtqqd,HiveMQ,Emitter,Mosquitto,Moquette等等。

这里介绍的是用轻量级的mosquitto开源项目来搭建一个属于自己的MQTT服务器。

第一步:需要安装一台linux主机,这不多介绍,可以使用真机安装也可以使用虚拟机安装。如果仅仅是自己测试使用都可以。

第二步:下载mosquitto需要的依赖

sudo apt-get install libssl-devsudo apt-get install uuid-devsudo apt-get install cmake

第三步:下载mosquitto并解压,现在mosquitto官网最新的版本是1.5.1

tar xzvf mosquitto-1.5.1.tar.gz

第四步:编译

cd mosquitto-1.5.1/

make

make install

第五步:启动mosquitto

./mosquitto -v

1535473957: mosquitto version 1.5.1 starting

1535473957: Using default config.

1535473957: Opening ipv4 listen socket on port 1883.

1535473957: Opening ipv6 listen socket on port 1883.

这时候mosquitto就会以默认的参数启动。如果需要带配置文件可以修改配置文件mosquitto.conf,

启动时候加上参数 -c,

./mosquitto -c mosquitto.conf

可以看到,mosquitto监听的端口为1883.

这时候我们的MQTT服务器就搭建好了。可找一个mqtt客户端来测试一下。

先发布一个主题“home/garden/fountain/2”

内容是“hello world”

这时候在mosquitto会打印出下面的log

535474247: New connection from 192.168.1.105 on port 1883.

1535474247: New client connected from 192.168.1.105 as MQTT_FX_Client (c1, k60).

1535474247: No will message specified.

1535474247: Sending CONNACK to MQTT_FX_Client (0, 0)

1535474307: Received PINGREQ from MQTT_FX_Client

1535474307: Sending PINGRESP to MQTT_FX_Client

1535474339: Received PUBLISH from MQTT_FX_Client (d0, q0, r0, m0, 'home/garden/fountain/2', ... (12 bytes))

1535474367: Received PINGREQ from MQTT_FX_Client

1535474367: Sending PINGRESP to MQTT_FX_Client

订阅主题“home/garden/fountain/2”

可以看到收到了自己发布的消息。

用wireshark抓包

可以看到抓到了一个MQTT的publish的报文。

最近在着手研究使用网页控制硬件的交互过程,发现mqtt协议有很多种语言的封装,整个思路也很清晰,就是一个客户端进行发布和订阅+服务器中间代理的过程,于是开始学习使用MQTT框架搭建交互平台。

客户端使用了MQTT.js,服务器使用mosca(基于nodejs)。

QoS在MQTT中有(摘自 MQ 遥测传输 (MQTT) V3.1 协议规范 ):

MQTT.js只是支持了MQTT协议,并没有支持QoS,也就是说,只支持最低级别的“至多一次”(QoS0)。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。

这里只了解消息体,固定头和可变头并不需要我们手动写。

payload消息体包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息:

(1)Connect    与服务器建立连接。

(2)Disconnect    与服务器断开TCP/IP会话。

(3)Subscribe    订阅。

(4)UnSubscribe    取消订阅。

(5)Publish    发送消息请求,发送完成后返回应用程序线程。

二、实现(具体的API稍后呈现)