IG902上传数据到MQTT云平台EMQ(2)

6.配置客户端数据存储到Mysql

1)手动新建数据表,或根据附件mqtt.sql导入数据库。

修改文件emqx_backend_mysql.conf

# etc/plugins/emqx_backend_mysql.conf

## 服务器地址

auth.mysql.server=127.0.0.1:3306

## 连接池大小

auth.mysql.pool=8

auth.mysql.username=mysql数据库名

auth.mysql.password=数据库密码

auth.mysql.database=mqtt数据库

auth.mysql.query_timeout=5s


2)建立相关表格MySQL设备在线状态表

mqtt_client 存储设备在线状态:

DROPTABLEIFEXISTS`mqtt_client`;

CREATETABLE`mqtt_client`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`state`varchar(3) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`online_at`datetime DEFAULTNULL,

`offline_at`datetime DEFAULTNULL,

`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

KEY`mqtt_client_idx`(`clientid`),

UNIQUEKEY`mqtt_client_key`(`clientid`),

INDEXtopic_index(`id`, `clientid`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

1 rows in set(0.00sec)


MySQL主题订阅表

mqtt_sub 存储设备的主题订阅关系:

DROPTABLEIFEXISTS`mqtt_sub`;

CREATETABLE`mqtt_sub`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`topic`varchar(180) DEFAULTNULL,

`qos`tinyint(1) DEFAULTNULL,

`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

KEY`mqtt_sub_idx`(`clientid`,`topic`,`qos`),

UNIQUEKEY`mqtt_sub_key`(`clientid`,`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;


MySQL消息存储表

mqtt_msg 存储MQTT 消息:

DROPTABLEIFEXISTS`mqtt_msg`;

CREATETABLE`mqtt_msg`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`msgid`varchar(64) DEFAULTNULL,

`topic`varchar(180) NOTNULL,

`sender`varchar(64) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`qos`tinyint(1) NOTNULLDEFAULT'0',

`retain`tinyint(1) DEFAULTNULL,

`payload`blob,

`arrived`datetime NOTNULL,

PRIMARY KEY(`id`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;


MySQL保留消息表

mqtt_retain 存储retain 消息:

DROPTABLEIFEXISTS`mqtt_retain`;

CREATETABLE`mqtt_retain`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`topic`varchar(180) DEFAULTNULL,

`msgid`varchar(64) DEFAULTNULL,

`sender`varchar(64) DEFAULTNULL,

`node`varchar(64) DEFAULTNULL,

`qos`tinyint(1) DEFAULTNULL,

`payload`blob,

`arrived`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,

PRIMARY KEY(`id`),

UNIQUEKEY`mqtt_retain_key`(`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

select* frommqtt_retain wheretopic = "retain";


MySQL消息确认表

mqtt_acked 存储客户端消息确认:

DROPTABLEIFEXISTS`mqtt_acked`;

CREATETABLE`mqtt_acked`(

`id`int(11) unsignedNOTNULLAUTO_INCREMENT,

`clientid`varchar(64) DEFAULTNULL,

`topic`varchar(180) DEFAULTNULL,

`mid`int(11) unsignedDEFAULTNULL,

`created`timestampNULLDEFAULTNULL,

PRIMARY KEY(`id`),

UNIQUEKEY`mqtt_acked_key`(`clientid`,`topic`),

INDEXtopic_index(`id`, `topic`)

) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;

3)启动服务mysql存储方法一:emqx_ctl plugins load emqx_backend_mysql方法二:通过插件启动对应插件emqx_backend_mysql

/uploads/article/20201120/ad7364d5e2dbd484be1322602b78fb76.png

4)通过mysql查看客户端发送数据

/uploads/article/20201120/f19d595df5b5531465d3d5fb11a571e3.png

EMQ配置文档https://docs.emqx.io/enterprise/latest/cn/

7.数据显示通过软件Grafana

1)下载链接:https://grafana.com/grafana/download安装完成后访问服务器地址:3000进入管理界面,此处访问10.5.20.245:3000

/uploads/article/20201120/bbd98f99dab9323a86f94caf4ac0e125.png

2设置相关参数

/uploads/article/20201120/a4bf3166dd7c0ff53526aacaf987ada3.png

3)测试连接mysql是否可以连接

/uploads/article/20201120/48e110b633c848516279965d30db2932.png

4)创建视图表格

/uploads/article/20201120/369140450097d0a1ba0f03506f10b2fa.png

5)编辑表格

/uploads/article/20201120/505205a0041cc3c3c17c7c74eb051c87.png

6)设置数据库及查询语句。

/uploads/article/20201120/f9151e36a2d956183940f6feaee40fda.png

      8.尝试修改上传值

/uploads/article/20201120/11e4fe1198ef33ca815977978d052387.png

查看传输的数据

/uploads/article/20201120/13b2e6b6fb09ba43e6c04688d642fec3.png

   9.EMQ平台下发数据到客户端

1)编辑订阅

/uploads/article/20201120/72b43d0a8e1f143d23f16732ad794ab8.png

相关程序参考http://app.ig.inhand.com.cn/

2)填写代码

import logging

import json

defctl_test(topic, payload, wizard_api): #定义订阅主函数

logging.info(topic) #打印订阅主题,假定topic为write/plc

logging.info(payload) #打印订阅数据,假定payload数据为{"method":"setValue", "TagName":"SP1", "TagValue":12.3}

payload = json.loads(payload) #反序列化订阅数据

ifpayload["method"] == "setValue": #检测是否为写入数据

message = {payload["TagName"]:payload["TagValue"]} #定义下发消息,包括下发的变量名称和变量值

wizard_api.write_plc_values(message) #调用wizard_api模块中的write_plc_values方法,将message字典中的数据下发至指定变量


3EMQ服务器端连接websocket

/uploads/article/20201120/f9fbd2dbd14ac6ca3df35006e6c4cc46.png

4)发布信息到客户端,填写对应客户端订阅主题,及修改字段{"method":"setValue", "TagName":"字段名", "TagValue":修改值为}此处修改客户端字段temperature修改值为12.3此处输入{"method":"setValue", "TagName":"temperature", "TagValue":12.3}  点击发布按钮

/uploads/article/20201120/bcd8b2c811c3901e0a77e373e2bdc15d.png

查看值是否修改(modbus中值和变量列表中值都被修改)

/uploads/article/20201120/2d025a3fa9e222222173911e950987a8.png

相关日志

/uploads/article/20201120/65307ed650b734b7b3a137f0380644aa.png

扩展 python实现对数据的修改

import paho.mqtt.client as mqtt

# 连接成功回调

defon_connect(client, userdata, flags, rc):

print('Connected with result code 'str(rc))

client.subscribe('to_service')

# 消息接收回调

defon_message(client, userdata, msg):

print(msg.topic " "str(msg.payload))

client = mqtt.Client()

# 指定回调函数

client.on_connect = on_connect

client.on_message = on_message

# 建立连接

client.username_pw_set('用户名','密码')

client.connect('服务器地址', 1883, 60)

# 发布消息修改对象修改值

client.publish('to_client',payload='{"method":"setValue", "TagName":"T1", "TagValue":12}',qos=1)

client.loop_forever()


   Python paho-mqtt 模块使用   https://www.jianshu.com/p/ef546f476322  

运行程序查看数据变化

/uploads/article/20201120/cd08e6d0f46391e6bbb9348947794d74.png
/uploads/article/20201120/a84fff3630193312edd79728c08559ef.png

其它:

相关启动命令

启动mysql  systemctl start mysqld

启动EMQ   ./emqx/bin/emqx start

启动grafana:service grafana-server start

如果访问服务器IP:端口无法访问的情况

1.检查服务是否开启

2.centos下防火墙开启相关接口

查看命令

/uploads/article/20201120/ed799d60e8eb407257e44848806c220e.png

开启相关端口例:开启tcp协议端口3306 firewall-cmd --permanent --add-port=3306/tcp

0 个评论

要回复文章请先登录注册