IG902上传数据到MQTT云平台EMQ(2)
6.配置客户端数据存储到Mysql
1)手动新建数据表,或根据附件mqtt.sql导入数据库。
修改文件emqx_backend_mysql.conf
# etc/plugins/emqx_backend_mysql.conf
## 服务器地址
auth.mysql.server=127.0.0.1:3306
## 连接池大小
auth.mysql.pool=8
auth.mysql.username=mysql数据库名
auth.mysql.password=数据库密码
auth.mysql.database=mqtt数据库
auth.mysql.query_timeout=5s
2)建立相关表格MySQL设备在线状态表
mqtt_client 存储设备在线状态:
DROPTABLEIFEXISTS`mqtt_client`;
CREATETABLE`mqtt_client`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`state`varchar(3) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`online_at`datetime DEFAULTNULL,
`offline_at`datetime DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
KEY`mqtt_client_idx`(`clientid`),
UNIQUEKEY`mqtt_client_key`(`clientid`),
INDEXtopic_index(`id`, `clientid`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
1 rows in set(0.00sec)
MySQL主题订阅表
mqtt_sub 存储设备的主题订阅关系:
DROPTABLEIFEXISTS`mqtt_sub`;
CREATETABLE`mqtt_sub`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`topic`varchar(180) DEFAULTNULL,
`qos`tinyint(1) DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
KEY`mqtt_sub_idx`(`clientid`,`topic`,`qos`),
UNIQUEKEY`mqtt_sub_key`(`clientid`,`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL消息存储表
mqtt_msg 存储MQTT 消息:
DROPTABLEIFEXISTS`mqtt_msg`;
CREATETABLE`mqtt_msg`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`msgid`varchar(64) DEFAULTNULL,
`topic`varchar(180) NOTNULL,
`sender`varchar(64) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`qos`tinyint(1) NOTNULLDEFAULT'0',
`retain`tinyint(1) DEFAULTNULL,
`payload`blob,
`arrived`datetime NOTNULL,
PRIMARY KEY(`id`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL保留消息表
mqtt_retain 存储retain 消息:
DROPTABLEIFEXISTS`mqtt_retain`;
CREATETABLE`mqtt_retain`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`topic`varchar(180) DEFAULTNULL,
`msgid`varchar(64) DEFAULTNULL,
`sender`varchar(64) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`qos`tinyint(1) DEFAULTNULL,
`payload`blob,
`arrived`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
UNIQUEKEY`mqtt_retain_key`(`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
select* frommqtt_retain wheretopic = "retain";
MySQL消息确认表
mqtt_acked 存储客户端消息确认:
DROPTABLEIFEXISTS`mqtt_acked`;
CREATETABLE`mqtt_acked`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`topic`varchar(180) DEFAULTNULL,
`mid`int(11) unsignedDEFAULTNULL,
`created`timestampNULLDEFAULTNULL,
PRIMARY KEY(`id`),
UNIQUEKEY`mqtt_acked_key`(`clientid`,`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
3)启动服务mysql存储方法一:emqx_ctl plugins load emqx_backend_mysql方法二:通过插件启动对应插件emqx_backend_mysql
4)通过mysql查看客户端发送数据
EMQ配置文档https://docs.emqx.io/enterprise/latest/cn/
7.数据显示通过软件Grafana
1)下载链接:https://grafana.com/grafana/download安装完成后访问服务器地址:3000进入管理界面,此处访问10.5.20.245:3000
2)设置相关参数
3)测试连接mysql是否可以连接
4)创建视图表格
5)编辑表格
6)设置数据库及查询语句。
8.尝试修改上传值
查看传输的数据
9.EMQ平台下发数据到客户端
1)编辑订阅
相关程序参考http://app.ig.inhand.com.cn/
2)填写代码
import logging
import json
defctl_test(topic, payload, wizard_api): #定义订阅主函数
logging.info(topic) #打印订阅主题,假定topic为write/plc
logging.info(payload) #打印订阅数据,假定payload数据为{"method":"setValue", "TagName":"SP1", "TagValue":12.3}
payload = json.loads(payload) #反序列化订阅数据
ifpayload["method"] == "setValue": #检测是否为写入数据
message = {payload["TagName"]:payload["TagValue"]} #定义下发消息,包括下发的变量名称和变量值
wizard_api.write_plc_values(message) #调用wizard_api模块中的write_plc_values方法,将message字典中的数据下发至指定变量
3)EMQ服务器端连接websocket
4)发布信息到客户端,填写对应客户端订阅主题,及修改字段{"method":"setValue", "TagName":"字段名", "TagValue":修改值为}此处修改客户端字段temperature修改值为12.3此处输入{"method":"setValue", "TagName":"temperature", "TagValue":12.3} 点击发布按钮
查看值是否修改(modbus中值和变量列表中值都被修改)
相关日志
扩展 python实现对数据的修改
import paho.mqtt.client as mqtt
# 连接成功回调
defon_connect(client, userdata, flags, rc):
print('Connected with result code 'str(rc))
client.subscribe('to_service')
# 消息接收回调
defon_message(client, userdata, msg):
print(msg.topic " "str(msg.payload))
client = mqtt.Client()
# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 建立连接
client.username_pw_set('用户名','密码')
client.connect('服务器地址', 1883, 60)
# 发布消息修改对象修改值
client.publish('to_client',payload='{"method":"setValue", "TagName":"T1", "TagValue":12}',qos=1)
client.loop_forever()
Python paho-mqtt 模块使用 https://www.jianshu.com/p/ef546f476322
运行程序查看数据变化
其它:
相关启动命令
启动mysql: systemctl start mysqld
启动EMQ: ./emqx/bin/emqx start
启动grafana:service grafana-server start
如果访问服务器IP:端口无法访问的情况
1.检查服务是否开启
2.centos下防火墙开启相关接口
查看命令
开启相关端口例:开启tcp协议端口3306 firewall-cmd --permanent --add-port=3306/tcp