IG902上传数据到MQTT云平台EMQ
EMQX 是全球领先的开源物联网MQTT 5.0服务器,高并发、低延时,内置强大的规则引擎,支持边缘及云端部署,是5G时代大型物联网应用首选技术方案。IG902边缘计算网关,可以通过MQTT协议接入EMQ服务器,帮助您快速搭建IIoT计算方案。
1. IG902初始化配置
1)开启边缘计算安装相应APP device_supervisor并启动APP。
2)添加设备选择右侧栏目设备监控-》设备列表右上角操作中的“ ”号添加设备。
3)添加变量
2.模拟传感器数据上传:
1)使用软件modbus Slave
2) 添加模拟数据
3)启动数据发送connection-》connect
3.配置设备云服务及搭建EMQ服务器
1)在IG Web端查看接收数据
4.搭建EMQ服务器
1)下载链接 https://www.emqx.io/cn/downloads#enterprise
2)以Linux系统为例注册账户下载免费的license 文件,替换etc目录下的对应文件。
3)安装目录bin目录下启动系统 通过./empx start命令启动。
4)浏览器访问http://服务器地址:18083/#/login 输入用户名和密码用户名admin 密码public登录管理页面
5.EMQ平台配置用户名认证
1)关闭匿名登录
配置匿名认证开关:
# 修改etc/emqx.conf
## Value: true | false
allow_anonymous=false
方式一:
修改etc/plugins文件夹下的
emqx_auth_username.conf文件
# etc/plugins/emqx_auth_username.conf
auth.user.password_hash = plain
## 第一组认证数据
auth.user.1.username = admin
auth.user.1.password = public
## 第二组认证数据
auth.user.2.username = test
auth.user.2.password = test
启动认证服务
./emqx_ctlplugins loademqx_auth_username或在web界面开启服务
重启emqx服务
./bin/emqx stop
./bin/emqx start
查看运行状态
./bin/emqx_ctlstatus
方式二: 1)安装mysql 数据库
mysql安装
检查系统本身是否有预装的mysql
rpm -qa | grep mysql #检查是否安装了mysql
rpm -qa | grep mariadb #检查是否安装了mariadb
rpm -e xxx#一般使用此命令即可卸载成功
rpm -e --nodeps xxx #卸载不成功时使用此命令强制卸载)
安装
wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
//下载mysql的rpm的包
yum -y installmysql57-community-release-el7-10.noarch.rpm
yum -y installmysql-community-server
2)创建相关表格:
>mysql –u root –p
createdatabase mqtt;
CREATETABLE`mqtt_user`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`username`varchar(100) DEFAULTNULL,
`password`varchar(100) DEFAULTNULL,
`salt`varchar(35) DEFAULTNULL,
`is_superuser`tinyint(1) DEFAULT0,
`created`datetime DEFAULTNULL,
PRIMARY KEY(`id`),
UNIQUEKEY`mqtt_username`(`username`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;
3) 修改文件
# etc/plugins/emqx_auth_mysql.conf
## 服务器地址
auth.mysql.server = 127.0.0.1:3306
## 连接池大小
auth.mysql.pool = 8
auth.mysql.username = mysql数据库名
auth.mysql.password = 数据库密码
auth.mysql.database = mqtt数据库
auth.mysql.query_timeout = 5s
4)启动服务mysql认证服务emqx_ctl plugins load emqx_auth_mysql 或通过web管理端插件开启
注意:如果服务未启动查看报错内容解决。1.配置文件出错(检查后修改)。2.mysql未开启远程管理权限
5)添加测试用户
mysql -u root -p
usemysql;
updateusersethost=’%’ whereuser=‘root’ andhost=‘localhost’;
添加认证用户test密码test
INSERTINTO`mqtt_user`( `username`, `password`, `salt`)
VALUES
('test', ' f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2', NULL);
密码采用sha256加密
6)客户端测试使用软件MQTTX 链接:https://mqttx.app/cn/
7)点击connect,提示连接成功
8)IG连接需要添加用户名密码
9)编辑发布脚本
相关发布脚本参考链接:
6.配置客户端数据存储到Mysql
1)手动新建数据表,或根据附件mqtt.sql导入数据库。
修改文件emqx_backend_mysql.conf
# etc/plugins/emqx_backend_mysql.conf
## 服务器地址
auth.mysql.server=127.0.0.1:3306
## 连接池大小
auth.mysql.pool=8
auth.mysql.username=mysql数据库名
auth.mysql.password=数据库密码
auth.mysql.database=mqtt数据库
auth.mysql.query_timeout=5s
2)建立相关表格MySQL设备在线状态表
mqtt_client 存储设备在线状态:
DROPTABLEIFEXISTS`mqtt_client`;
CREATETABLE`mqtt_client`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`state`varchar(3) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`online_at`datetime DEFAULTNULL,
`offline_at`datetime DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
KEY`mqtt_client_idx`(`clientid`),
UNIQUEKEY`mqtt_client_key`(`clientid`),
INDEXtopic_index(`id`, `clientid`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
1 rows in set(0.00sec)
MySQL主题订阅表
mqtt_sub 存储设备的主题订阅关系:
DROPTABLEIFEXISTS`mqtt_sub`;
CREATETABLE`mqtt_sub`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`topic`varchar(180) DEFAULTNULL,
`qos`tinyint(1) DEFAULTNULL,
`created`timestampNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
KEY`mqtt_sub_idx`(`clientid`,`topic`,`qos`),
UNIQUEKEY`mqtt_sub_key`(`clientid`,`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL消息存储表
mqtt_msg 存储MQTT 消息:
DROPTABLEIFEXISTS`mqtt_msg`;
CREATETABLE`mqtt_msg`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`msgid`varchar(64) DEFAULTNULL,
`topic`varchar(180) NOTNULL,
`sender`varchar(64) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`qos`tinyint(1) NOTNULLDEFAULT'0',
`retain`tinyint(1) DEFAULTNULL,
`payload`blob,
`arrived`datetime NOTNULL,
PRIMARY KEY(`id`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
MySQL保留消息表
mqtt_retain 存储retain 消息:
DROPTABLEIFEXISTS`mqtt_retain`;
CREATETABLE`mqtt_retain`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`topic`varchar(180) DEFAULTNULL,
`msgid`varchar(64) DEFAULTNULL,
`sender`varchar(64) DEFAULTNULL,
`node`varchar(64) DEFAULTNULL,
`qos`tinyint(1) DEFAULTNULL,
`payload`blob,
`arrived`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
UNIQUEKEY`mqtt_retain_key`(`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
select* frommqtt_retain wheretopic = "retain";
MySQL消息确认表
mqtt_acked 存储客户端消息确认:
DROPTABLEIFEXISTS`mqtt_acked`;
CREATETABLE`mqtt_acked`(
`id`int(11) unsignedNOTNULLAUTO_INCREMENT,
`clientid`varchar(64) DEFAULTNULL,
`topic`varchar(180) DEFAULTNULL,
`mid`int(11) unsignedDEFAULTNULL,
`created`timestampNULLDEFAULTNULL,
PRIMARY KEY(`id`),
UNIQUEKEY`mqtt_acked_key`(`clientid`,`topic`),
INDEXtopic_index(`id`, `topic`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8MB4;
3)启动服务mysql存储方法一:emqx_ctl plugins load emqx_backend_mysql方法二:通过插件启动对应插件emqx_backend_mysql
4)通过mysql查看客户端发送数据
EMQ配置文档https://docs.emqx.io/enterprise/latest/cn/
7.数据显示通过软件Grafana
1)下载链接:https://grafana.com/grafana/download安装完成后访问服务器地址:3000进入管理界面,此处访问10.5.20.245:3000
2)设置相关参数
3)测试连接mysql是否可以连接
4)创建视图表格
5)编辑表格
6)设置数据库及查询语句。
8.尝试修改上传值
查看传输的数据
9.EMQ平台下发数据到客户端
1)编辑订阅
2)填写代码
import logging
import json
defctl_test(topic, payload, wizard_api): #定义订阅主函数
logging.info(topic) #打印订阅主题,假定topic为write/plc
logging.info(payload) #打印订阅数据,假定payload数据为{"method":"setValue", "TagName":"SP1", "TagValue":12.3}
payload = json.loads(payload) #反序列化订阅数据
ifpayload["method"] == "setValue": #检测是否为写入数据
message = {payload["TagName"]:payload["TagValue"]} #定义下发消息,包括下发的变量名称和变量值
wizard_api.write_plc_values(message) #调用wizard_api模块中的write_plc_values方法,将message字典中的数据下发至指定变量
3)EMQ服务器端连接websocket
4)发布信息到客户端,填写对应客户端订阅主题,及修改字段{"method":"setValue", "TagName":"字段名", "TagValue":修改值为}此处修改客户端字段temperature修改值为12.3此处输入{"method":"setValue", "TagName":"temperature", "TagValue":12.3} 点击发布按钮
查看值是否修改(modbus中值和变量列表中值都被修改)
相关日志
扩展 python实现对数据的修改
import paho.mqtt.client as mqtt
# 连接成功回调
defon_connect(client, userdata, flags, rc):
print('Connected with result code 'str(rc))
client.subscribe('to_service')
# 消息接收回调
defon_message(client, userdata, msg):
print(msg.topic " "str(msg.payload))
client = mqtt.Client()
# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 建立连接
client.username_pw_set('用户名','密码')
client.connect('服务器地址', 1883, 60)
# 发布消息修改对象修改值
client.publish('to_client',payload='{"method":"setValue", "TagName":"T1", "TagValue":12}',qos=1)
client.loop_forever()
Python paho-mqtt 模块使用 https://www.jianshu.com/p/ef546f476322
运行程序查看数据变化
其它:
相关启动命令
启动mysql: systemctl start mysqld
启动EMQ: ./emqx/bin/emqx start
启动grafana:service grafana-server start
如果访问服务器IP:端口无法访问的情况
1.检查服务是否开启
2.centos下防火墙开启相关接口
查看命令
开启相关端口例:开启tcp协议端口3306 firewall-cmd --permanent --add-port=3306/tcp