MQTT 基本认知

Python013

MQTT 基本认知,第1张

物联网 (internet of thing) ,表示的是可以把一些带某些传感器的设备(终端),接入到互联网的行为。

通过互联网连接这些设备,这些设备就能够互相协作。

MQTT 就是这些设备之间数据通信的一个基于 TCP/IP 的协议。

每个终端都和实现了 MQTT 协议的代理/服务器相连。

通过 published MQTT 代理服务器的某个 主题 发送数据。

通过 subscription 从 MQTT 代理服务器获取自己订阅的 主题 数据。

MQTT 协议是一种轻量级的、灵活的网络协议。并且非常适合 IOT 的场景。

大多数开发人员已经熟悉了 HTTP WEB 协议。那么为什么不让 IOT 设置链接到 WEB 服务?

设备可以采用 HTTP 请求的形式发送数据,并采用 HTTP 响应的形式从服务器获取数据,接受更新。

因为对于 IOT 的设备来说,这种 主动请求-->被动等待应答的 数据传输模型存在严重的局限性:

那么,MQTT 为什么如此轻便且灵活?MQTT 协议的一个关键的特性是 发布/订阅模型 。它将数据的发布者和接受者分离。

一个设备终端既可以是数据的发布者 (published) 也可以是数据的订阅者 (subscription)

一个设备如果要发布数据,只需要往代理服务器中 相应的主题发布数据内容即可。

一个设备如果需要接受到数据,只需要在代理服务器中, 提前订阅自己需要关注的主题即可。

MQTT 最基本的体验,就是使用 mosquitto 。

Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用。

它可以理解成一个 MQTT 的代理服务器。

基本步骤如下:

安装成功截图

使用 brew services start mosquitto 启动 MQTT 服务

运行截图

然后再打开另外两个终端窗口,模拟两个IOT设备。A 订阅 MQTT 服务。B 向 MQTT 的服务发送数据。

A订阅当前MQTT的某个服务。

B向 MQTT 服务器发布(published) 数据。

然后,我们就可以在A控制台里看到由 B 通过 MQTT 服务发送的数据了。

基本流程图

控制台 A 向 MQTT 服务器订阅 dw/demo 服务,并被动的等待 MQTT 服务器返回数据。

控制台 B 主动的向 MQTT 服务器的 dw/demo 服务发送 published 数据,之后。服务器会主动向事先订阅了 dw/demo 的终端分发此消息。

MQTT 是一种链接协议,它指定了如何组织数据字节并通过 TCP/IP 网络传输它们。但实际上,开发人员并不需要链接这个链接协议的具体细节。我们只需要知道,每条消息都有一个命令和数据有效负载。该命令定义消息类型(比如 CONNECT 消息或者 SUB SCRIBE 消息)。所有的 MQTT 库和工具都提供了直接处理这些消息的基本方法,并且能自动填充一些必要的字段(在数据包的对应字节填充),比如消息和客户端 ID。

首先客户端发送一条 CONNECT消息 来链接代理。CONNECT 消息要求建立从客户端到代理服务器的链接。

CONNECT 命令的基本参数

当客户端向代理服务器发送一条 CONNECT 命令之后,服务器会调用 CONNACK 命令,告知服务链接的状态。

CONNACK 命令的基本参数

当客户端和服务器建立连接之后,客户端就可以向服务器订阅某些主题的。(发送一条或多条 SUBSCRIBE消息 )。

表明当服务器接受到其他终端推送的此主题数据时,服务器会默认发送给它。

SUBSCRIBE 参数列表

当客户端成功的向服务器订阅某个主题之后,服务器会返回一条 SUBACK 的消息,其中包含一个或者多个 returnCode 参数。

SUBACK消息参数

returnCode : 值 0 - 2 ,表示成功订阅,并返回这个订阅消息的 QOS。值 128 : 订阅失败。

既然客户端可以向服务器订阅某个主题,当然也可以取消订阅。

SUBSCRIBE 订阅命令相反的命令是 UNSUBSCRIBE 取消订阅命令。

此命令非常简单。只有一个topic(主题)参数。

上面讲的是订阅,订阅是需要有消息从服务器发送过来的。但是服务器本身基本不产生数据,那数据从何而来呢?

通过另外一个客户端执行 PUBLISH 命令,往代理服务器发送数据。并最终通过代理服务器将数据传递给订阅了此服务的客户端。

PUBLISH 消息参数

对于 MQTT 的一张基本理解图

基本流程图:

最后总结

参考资料: 初识 MQTT

在go语言中使用viper之类的库很方便的处理yaml配置文件,但是在c语言中就比较麻烦,经过一番思索和借助强大的github,发现了一个libyaml c库,但是网上的例子都比较麻烦,而且比较繁琐,就想法作了一个相对比较容易配置的解析应用,可以简单地类似viper 的模式进行配置实现不同的配置文件读取。如你的配置文件很复杂请按格式修改KeyValue 全局变量,欢迎大家一起完善

库请自行下载 GitHub - yaml/libyaml: Canonical source repository for LibYAML

直接上代码

yaml示例文件

%YAML 1.1

---

mqtt:

subtopic: "Control/#"

pubtopic: "bbt"

qos: 1

serveraddress: "tcp://192.168.0.25:1883"

clientid: "kvm_test"

writelog: false

writetodisk: false

outputfile: "./receivedMessages.txt"

hearttime: 30

#ifndef __CONFIG_H__

#define __CONFIG_H__

#ifdef __cplusplus

extern "C" {

#endif

/************************/

/* Minimum YAML version */

/************************/

#define YAML_VERSION_MAJOR 1

#define YAML_VERSION_MINOR 1

#define STRUCT_TYPE_NAME 100

#define INT_TYPE_NAME 101

#define STRING_TYPE_NAME 102

#define BOOL_TYPE_NAME 103

#define FLOAT_TYPE_NAME 104

#define MAP_TYPE_NAME 105

#define LIST_TYPE_NAME 106

typedef struct{

char *key

void *value

int valuetype

char *parent

}KeyValue,*pKeyValue

#ifdef __cplusplus

}

#endif

#endif

#include

#include

#include

#include

#include

#include

#include

#include "config.h"

typedef struct {

char *SUBTOPIC//string `yaml:"subtopic" mapstructure:"subtopic"` //"topic1"

char *PUBTOPIC//string `yaml:"pubtopic" mapstructure:"pubtopic"`

int QOS//byte `yaml:"qos" mapstructure:"qos"` //1

char *SERVERADDRESS//string `yaml:"serveraddress" mapstructure:"serveraddress"` //= "tcp://mosquitto:1883"

char *CLIENTID//string `yaml:"clientid" mapstructure:"clientid"` //= "mqtt_subscriber"

int HEARTTIME//int `yaml:"hearttime" mapstructure:"hearttime"`

// CommandLocalPath string `yam:"commanlocalpath"`

}mqttSection,*pmqttSection

typedef struct {

mqttSection Mqtt// `yaml:"mqtt" mapstructure:"mqtt"`

// KVM kvmSection `yaml:"kvm" mapstructure:"kvm"`

}ConfigT

ConfigT config

static KeyValue webrtcconfig[]={

{"mqtt",&config,STRUCT_TYPE_NAME,NULL},

{"subtopic",&(config.Mqtt.SUBTOPIC),STRING_TYPE_NAME,"mqtt"},

{"pubtopic",&(config.Mqtt.PUBTOPIC),STRING_TYPE_NAME,"mqtt"},

{"qos",&(config.Mqtt.QOS),INT_TYPE_NAME,"mqtt"},

{"serveraddress",&(config.Mqtt.SERVERADDRESS),STRING_TYPE_NAME,"mqtt"},

{"clientid",&(config.Mqtt.CLIENTID),STRING_TYPE_NAME,"mqtt"},

{"hearttime",&(config.Mqtt.HEARTTIME),INT_TYPE_NAME,"mqtt"},

{NULL,NULL,0,NULL},

}

int printConfig(ConfigT * pconfig){

if(pconfig==NULL) return -1

printf("mqtt:r ")

if(pconfig->Mqtt.SUBTOPIC!=NULL) {printf("subtopic: %sr ",pconfig->Mqtt.SUBTOPIC)}

if(pconfig->Mqtt.SUBTOPIC!=NULL) {printf("pubtopic: %sr ",pconfig->Mqtt.PUBTOPIC)}

printf("qos: %dr ",config.Mqtt.QOS)

if(pconfig->Mqtt.SERVERADDRESS!=NULL) {printf("serveraddress: %sr ",pconfig->Mqtt.SERVERADDRESS)}

if(pconfig->Mqtt.CLIENTID!=NULL) {printf("clientid: %sr ",pconfig->Mqtt.CLIENTID)}

printf("hearttime: %dr ",config.Mqtt.HEARTTIME)

}

int freeConfig(ConfigT * pconfig){

if(pconfig==NULL) return -1

if(pconfig->Mqtt.SERVERADDRESS!=NULL) {free(pconfig->Mqtt.SERVERADDRESS)}

if(pconfig->Mqtt.CLIENTID!=NULL) {free(pconfig->Mqtt.CLIENTID)}

if(pconfig->Mqtt.SUBTOPIC!=NULL) {free(pconfig->Mqtt.SUBTOPIC)}

}

char currentkey[100]

void getvalue(yaml_event_t event,pKeyValue *ppconfigs){

char *value = (char *)event.data.scalar.value

pKeyValue pconfig=*ppconfigs

char *pstringname

while(pconfig->key!=NULL){

if(currentkey[0]!=0){

if(!strcmp(currentkey,pconfig->key))

{

switch(pconfig->valuetype){

case STRING_TYPE_NAME:

pstringname=strdup(value)

printf("get string value %sr ",pstringname)

*((char**)pconfig->value)=pstringname

memset(currentkey, 0, sizeof(currentkey))

break

case INT_TYPE_NAME:

*((int*)(pconfig->value))=atoi(value)

memset(currentkey, 0, sizeof(currentkey))

break

case BOOL_TYPE_NAME:

if(!strcmp(value,"true")) *((bool*)(pconfig->value))=true

else *((bool*)(pconfig->value))=false

memset(currentkey, 0, sizeof(currentkey))

break

case FLOAT_TYPE_NAME:

*((float*)(pconfig->value))=atof(value)

memset(currentkey, 0, sizeof(currentkey))

break

case STRUCT_TYPE_NAME:

case MAP_TYPE_NAME:

case LIST_TYPE_NAME:

memset(currentkey, 0, sizeof(currentkey))

strncpy(currentkey,value,strlen(value))

break

default:

break

}

break

}

//continue

}else{

if(!strcmp(value,pconfig->key)){

strncpy(currentkey,pconfig->key,strlen(pconfig->key))

break

}

}

pconfig++

}

}

int Load_YAML_Config( char *yaml_file, KeyValue *(configs[]) )

{

struct stat filecheck

yaml_parser_t parser

yaml_event_t event

bool done = 0

unsigned char type = 0

unsigned char sub_type = 0

if (stat(yaml_file, &filecheck) != false )

{

printf("[%s, line %d] Cannot open configuration file '%s'! %s", __FILE__, __LINE__, yaml_file, strerror(errno) )

return -1

}

FILE *fh = fopen(yaml_file, "r")

if (!yaml_parser_initialize(&parser))

{

printf("[%s, line %d] Failed to initialize the libyaml parser. Abort!", __FILE__, __LINE__)

return -1

}

if (fh == NULL)

{

printf("[%s, line %d] Failed to open the configuration file '%s' Abort!", __FILE__, __LINE__, yaml_file)

return -1

}

memset(currentkey, 0, sizeof(currentkey))

/* Set input file */

yaml_parser_set_input_file(&parser, fh)

while(!done)

{

if (!yaml_parser_parse(&parser, &event))

{

/* Useful YAML vars: parser.context_mark.line+1, parser.context_mark.column+1, parser.problem, parser.problem_mark.line+1, parser.problem_mark.column+1 */

printf( "[%s, line %d] libyam parse error at line %ld in '%s'", __FILE__, __LINE__, parser.problem_mark.line+1, yaml_file)

}

if ( event.type == YAML_DOCUMENT_START_EVENT )

{

//yaml file first line is version

//%YAML 1.1

//---

yaml_version_directive_t *ver = event.data.document_start.version_directive

if ( ver == NULL )

{

printf( "[%s, line %d] Invalid configuration file. Configuration must start with "%%YAML 1.1"", __FILE__, __LINE__)

}

int major = ver->major

int minor = ver->minor

if (! (major == YAML_VERSION_MAJOR &&minor == YAML_VERSION_MINOR) )

{

printf( "[%s, line %d] Configuration has a invalid YAML version. Must be 1.1 or above", __FILE__, __LINE__)

return -1

}

}

else if ( event.type == YAML_STREAM_END_EVENT )

{

done = true

}

else if ( event.type == YAML_MAPPING_END_EVENT )

{

sub_type = 0

}

else if ( event.type == YAML_SCALAR_EVENT )

{

getvalue(event,configs)

}

}

return 0

}

int main(int argc, char *argv[]){

pKeyValue pconfig=&webrtcconfig[0]

Load_YAML_Config("../../etc/kvmagent.yml",&pconfig)

printConfig(&config)

freeConfig(&config)

}