全栈物联网数据监控:从MQTT设备到Web看板
概述
本文档记录了在Ubuntu 24.02服务器上部署完整MQTT物联网数据采集系统的全过程。系统实现了从设备数据采集、MQTT消息处理、MySQL数据存储到Web实时展示的全链路功能。本教程旨在为日后系统维护提供完整的参考文档。
环境准备
1. 设备清单
- 物联网设备:ML307R,运行度云DTU固件
- 服务器:阿里云轻量应用服务器
- 操作系统:Ubuntu 24.02 LTS
- 核心服务:EMQX 5.0+,MySQL 8.0,Python 3.10+
2. 网络配置
服务器公网IP:自行查询EMQX端口:1883(MQTT)、18083(管理界面)Web应用端口:5001第一部分:服务器基础环境搭建
1.1 系统更新与基础工具安装
# 更新系统包sudo apt update && sudo apt upgrade -y
# 安装必要工具sudo apt install -y vim curl wget git net-tools1.2 防火墙配置(阿里云控制台)
在阿里云服务器安全组中开放以下端口:
- 1883(MQTT协议)
- 18083(EMQX Dashboard)
- 5001(Web应用)
- 3306(MySQL,建议仅内网访问)
# 本地防火墙检查(可选)sudo ufw statussudo ufw allow 1883/tcpsudo ufw allow 18083/tcpsudo ufw allow 5001/tcp
第二部分:MQTT服务部署(EMQX)
2.1 EMQX安装
# 下载最新版EMQXcurl -s https://assets.emqx.com/scripts/install-emqx.sh | sudo bash
# 安装EMQXsudo apt install emqx -y
# 启动服务sudo systemctl start emqxsudo systemctl enable emqx
# 检查状态sudo systemctl status emqx2.2 EMQX基础配置
- 访问管理界面:
http://[服务器IP]:18083 - 默认账号:admin / public
- 创建应用用户(用于数据采集服务):
- 用户名:emqx_data
- 密码:123456
2.3 WebSocket调试
- 使用EMQX内置的
第三部分:物联网设备配置
3.1 ML307R设备配置
通过设备Web管理界面配置:
网络通道参数
- 通道类型:MQTT
- 服务器地址:服务器公网IP
- 端口:1883
- Client ID:device_001(按实际设备编号)
主题设置
- 订阅主题:
test/topic/down(接收服务器指令) - 发布主题:
test/topic/up(发送传感器数据)test/topic/command(发送指令请求)
QoS设置
- QoS等级:1
- Retain:0
- 心跳间隔:600000ms(10分钟)
- 心跳消息:
hello
3.2 数据格式示例
设备发送的数据格式应为JSON:
{ "id": "device_001", "params": { "TDS": {"value": 125.5}, "COD": {"value": 15.3}, "pH": {"value": 7.2}, "Tem": {"value": 25.6}, "Hum": {"value": 65.8} }}注意:度云在多个发布通道之间进行切换时,发送指令前需要添加指令指明发布通道
指令格式MQPUB,0,0,数据
第四部分:数据处理服务部署
4.1. 主要功能文件
- 文件位置:通常放置在
/home/admin/mqtt_to_mysql.py(根据实际部署路径调整)。 - 作用:作为数据处理服务,从MQTT代理(如EMQX)订阅指定主题的消息,将接收到的数据(如传感器数据)实时解析并存储到MySQL数据库。
- 实现功能:
- 使用
paho-mqtt库连接MQTT代理,订阅主题(例如sensor/data)。 - 解析JSON格式的MQTT消息,提取关键字段(如时间戳、设备ID、数值)。
- 使用
mysql-connector-python库连接MySQL数据库,将数据插入到预定义的表中(例如sensor_logs)。 - 包含错误处理机制,确保网络中断或数据库异常时能重连和日志记录。
- 使用
2. 服务的自启动设置及维护
-
创建systemd服务文件:
-
在
/etc/systemd/system/目录下创建服务文件mqtt_to_mysql.service:[Unit]Description=MQTT to MySQL Data Processing ServiceAfter=network.target mysql.serviceRequires=mysql.service[Service]User=adminWorkingDirectory=/home/adminExecStart=/home/admin/mqtt_venv/bin/python3 /home/admin/mqtt_to_mysql.pyRestart=alwaysRestartSec=10Environment="PYTHONUNBUFFERED=1"StandardOutput=syslogStandardError=syslogSyslogIdentifier=mqtt_to_mysql[Install]WantedBy=multi-user.target -
说明:
After=mysql.service确保MySQL先启动。- 使用虚拟环境
mqtt_venv中的Python解释器执行脚本。 Restart=always使服务崩溃后自动重启。
-
-
自启动设置:
- 重新加载systemd配置:
sudo systemctl daemon-reload - 启用开机自启动:
sudo systemctl enable mqtt_to_mysql.service - 启动服务:
sudo systemctl start mqtt_to_mysql.service
- 重新加载systemd配置:
-
维护操作:
- 检查服务状态:
sudo systemctl status mqtt_to_mysql.service - 停止服务:
sudo systemctl stop mqtt_to_mysql.service - 重启服务:
sudo systemctl restart mqtt_to_mysql.service - 查看日志:
sudo journalctl -u mqtt_to_mysql.service -f
- 检查服务状态:
3. MySQL的设置,维护,查询
-
设置:
-
登录MySQL:
sudo mysql -u root -p -
创建数据库(例如
emqx_data):CREATE DATABASE emqx_data; -
创建用户并授权(例如用户
emqx_user,密码123456):CREATE USER 'emqx_user'@'localhost' IDENTIFIED BY '123456';GRANT ALL PRIVILEGES ON emqx_data.* TO 'emqx_user'@'localhost';FLUSH PRIVILEGES; -
创建数据表(示例):
USE emqx_data;CREATE TABLE sensor_logs (id INT AUTO_INCREMENT PRIMARY KEY,timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,device_id VARCHAR(50),value FLOAT,topic VARCHAR(255));
-
-
维护:
- 定期备份数据库:使用
mysqldump -u emqx_user -p emqx_data > backup.sql - 监控性能:检查MySQL错误日志(
/var/log/mysql/error.log)或使用SHOW PROCESSLIST; - 优化表:定期运行
OPTIMIZE TABLE sensor_logs;
- 定期备份数据库:使用
-
查询:
- 连接数据库:
mysql -u emqx_user -p emqx_data - 示例查询:
- 查看最新数据:
SELECT * FROM sensor_logs ORDER BY timestamp DESC LIMIT 10; - 统计设备数据量:
SELECT device_id, COUNT(*) FROM sensor_logs GROUP BY device_id;
- 查看最新数据:
- 连接数据库:
第五部分:数据库部署
5.1 MySQL安装
# 安装MySQLsudo apt install mysql-server -y
# 启动服务sudo systemctl start mysqlsudo systemctl enable mysql
# 运行安全脚本sudo mysql_secure_installation5.2 创建数据库和用户
-- 登录MySQLsudo mysql
-- 创建数据库CREATE DATABASE emqx_data CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 创建用户CREATE USER 'emqx_user'@'localhost' IDENTIFIED BY '123456';
-- 授予权限GRANT ALL PRIVILEGES ON emqx_data.* TO 'emqx_user'@'localhost';
-- 刷新权限FLUSH PRIVILEGES;
-- 退出EXIT;5.3 创建数据表
-- 切换到emqx_data数据库USE emqx_data;
-- 创建数据表CREATE TABLE IF NOT EXISTS mqtt_msg ( id INT AUTO_INCREMENT PRIMARY KEY, device_id VARCHAR(50), tds FLOAT, cod FLOAT, toc FLOAT, uv254 FLOAT, ph FLOAT, temp FLOAT, hum FLOAT, zhexian1 FLOAT, zhexian2 FLOAT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_created_at (created_at), INDEX idx_device_id (device_id));
-- 创建指令表CREATE TABLE IF NOT EXISTS mqtt_command ( id INT AUTO_INCREMENT PRIMARY KEY, command_type VARCHAR(50) COMMENT '指令类型', device_id VARCHAR(50) COMMENT '目标设备ID', sender VARCHAR(50) COMMENT '指令发送者', raw_command TEXT COMMENT '原始指令内容', status VARCHAR(20) DEFAULT 'pending' COMMENT '指令状态', result TEXT COMMENT '执行结果', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, executed_at TIMESTAMP NULL COMMENT '指令执行时间', INDEX idx_status (status), INDEX idx_device_id (device_id), INDEX idx_created_at (created_at));
-- 创建下发记录表CREATE TABLE IF NOT EXISTS mqtt_downlink ( id INT AUTO_INCREMENT PRIMARY KEY, command_id INT, device_id VARCHAR(50), topic VARCHAR(255), payload TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_command_id (command_id), INDEX idx_device_id (device_id));5.4 数据库维护命令
# 查看数据统计mysql -u emqx_user -p123456 -D emqx_data -e "SELECT '数据统计' AS '';SELECT COUNT(*) AS 总记录数, COUNT(DISTINCT device_id) AS 设备数量, MIN(created_at) AS 最早记录, MAX(created_at) AS 最新记录FROM mqtt_msg;"
# 查看最近数据mysql -u emqx_user -p123456 -D emqx_data --table -e "SELECT id, device_id, ROUND(tds,1) AS TDS, ROUND(cod,1) AS COD, ROUND(ph,1) AS pH, ROUND(temp,1) AS 温度, ROUND(hum,1) AS 湿度, DATE_FORMAT(created_at, '%m-%d %H:%i') AS 时间FROM mqtt_msgORDER BY id DESCLIMIT 10;"
# 清空数据表(谨慎使用)mysql -u emqx_user -p123456 -D emqx_data -e "TRUNCATE TABLE mqtt_msg;TRUNCATE TABLE mqtt_command;"5.5 服务的介绍、运行与维护
服务介绍
MQTT to MySQL Super Subscriber 是一个数据处理后台服务,负责从MQTT代理(如EMQX)实时订阅消息,并将接收到的传感器数据解析存储到MySQL数据库中。该服务作为数据采集与存储的核心组件,为Web监控界面提供数据源。
服务配置
基于提供的服务文件,主要配置如下:
[Unit]Description=MQTT to MySQL Super SubscriberAfter=network.target emqx.service
[Service]Type=simpleUser=adminWorkingDirectory=/home/admin/ExecStart=/home/admin/mqtt_venv/bin/python3 /home/admin/mqtt_to_mysql.pyRestart=alwaysRestartSec=5
[Install]WantedBy=multi-user.target配置说明:
- 工作目录:
/home/admin/(确保脚本和相关文件在此目录) - 执行命令:使用Python虚拟环境运行
mqtt_to_mysql.py脚本 - 重启策略:服务异常退出时自动重启,间隔5秒
- 依赖关系:在网络和EMQX服务启动后运行
服务运行与管理
1. 服务设置步骤
# 创建服务文件sudo nano /etc/systemd/system/mqtt_to_mysql.service
# 将上述服务配置粘贴到文件中# 保存并退出编辑器
# 重新加载systemd配置sudo systemctl daemon-reload
# 启用开机自启动sudo systemctl enable mqtt_to_mysql.service
# 启动服务sudo systemctl start mqtt_to_mysql.service2. 日常维护命令
# 查看服务状态(运行状态、日志信息)sudo systemctl status mqtt_to_mysql.service
# 启动服务sudo systemctl start mqtt_to_mysql.service
# 停止服务sudo systemctl stop mqtt_to_mysql.service
# 重启服务(适用于更新代码后)sudo systemctl restart mqtt_to_mysql.service
# 查看实时日志sudo journalctl -u mqtt_to_mysql.service -f
# 查看最近100行日志sudo journalctl -u mqtt_to_mysql.service -n 100
# 禁用开机自启动sudo systemctl disable mqtt_to_mysql.service3. 服务监控
# 检查服务是否正在运行systemctl is-active mqtt_to_mysql.service
# 检查服务是否启用开机启动systemctl is-enabled mqtt_to_mysql.service
# 查看服务的依赖关系systemctl list-dependencies mqtt_to_mysql.service4. 故障排查
当服务出现问题时,可采取以下步骤:
-
检查配置语法:
Terminal window sudo systemctl cat mqtt_to_mysql.service -
检查详细日志:
Terminal window # 查看所有相关日志sudo journalctl -u mqtt_to_mysql.service --since "1 hour ago"# 按优先级过滤日志sudo journalctl -u mqtt_to_mysql.service -p err -
手动测试脚本:
Terminal window # 切换到服务运行用户su - admin# 手动运行脚本,检查是否有错误/home/admin/mqtt_venv/bin/python3 /home/admin/mqtt_to_mysql.py -
检查依赖服务:
Terminal window # 确保EMQX和MySQL服务正常运行systemctl status emqx.servicesystemctl status mysql.service
注意事项
- 权限管理:确保
admin用户对脚本文件和相关目录有执行权限 - 虚拟环境:确认Python虚拟环境已正确安装所有依赖包
- 资源监控:定期检查服务的内存和CPU使用情况,避免资源泄漏
- 日志轮转:配置日志轮转,避免日志文件过大占满磁盘空间
- 备份策略:定期备份服务配置文件和数据库数据
第六部分:Web监控界面部署
6.1 py文件的位置和运行思路
- 文件位置:
/home/admin/mqtt_dashboard/app.py - 运行思路:
- 基于Flask框架构建Web服务,提供数据查询API和页面渲染
- 连接MySQL数据库,查询水质监控数据(TDS、COD、pH、温度、湿度等)
- 通过RESTful API提供JSON格式数据给前端界面
- 集成简单的数据管理功能(如清空数据库记录)
- 支持多数据源查询,包括实时采样数据和历史趋势数据
6.2 html文件的运行思路
- 文件结构:位于
/home/admin/mqtt_dashboard/templates/目录(或作为静态文件) - 运行思路:
- 使用Bootstrap 5框架构建响应式界面,适配不同设备
- 集成ECharts图表库展示水质参数实时趋势曲线
- 前端JavaScript通过定时轮询(默认5秒)从后端API获取最新数据
- 实现模块化设计,包含三个主要面板:
- 趋势分析面板:实时曲线展示各水质参数变化
- 指令记录面板:显示最近的控制指令执行状态
- 采样数据面板:表格展示最新30条采样数据
- 支持手动刷新、暂停自动更新、清空数据库等交互功能
6.3 服务的运行与维护
-
服务配置:
Terminal window # 创建服务文件sudo nano /etc/systemd/system/mqtt_dashboard.service# 内容参考(基于提供的配置):[Unit]Description=MQTT Data Dashboard Web ServiceAfter=network.target mysql.service[Service]User=adminWorkingDirectory=/home/admin/mqtt_dashboardExecStart=/home/admin/mqtt_venv/bin/python3 app.pyRestart=alwaysRestartSec=10Environment="PYTHONUNBUFFERED=1"StandardOutput=syslogStandardError=syslogSyslogIdentifier=mqtt_dashboard[Install]WantedBy=multi-user.target -
服务管理命令:
Terminal window # 重载配置sudo systemctl daemon-reload# 启用开机启动sudo systemctl enable mqtt_dashboard.service# 启动服务sudo systemctl start mqtt_dashboard.service# 查看状态sudo systemctl status mqtt_dashboard.service# 查看日志sudo journalctl -u mqtt_dashboard.service -f# 停止服务sudo systemctl stop mqtt_dashboard.service# 重启服务sudo systemctl restart mqtt_dashboard.service
6.4 网页的使用方式
-
访问界面:
- 在浏览器输入服务器IP地址及端口(默认5001),如:
http://192.168.1.100:5001
- 在浏览器输入服务器IP地址及端口(默认5001),如:
-
主要功能模块:
- 趋势图表:实时展示各水质参数变化趋势,鼠标悬停可查看具体数值
- 指令记录:显示最近发送的控制指令及执行状态
- 采样数据:表格形式展示最近30条水质采样记录
-
操作按钮功能:
- 🔄 手动刷新:立即获取最新数据,无需等待自动更新
- ⏸️/▶️ 暂停/继续自动更新:切换自动更新功能(默认每5秒更新)
- 🗑️ 清空数据库:弹出确认对话框,可选择清空采样数据或指令记录
-
性能监控:
- 右下角显示系统状态,包括:
- 最后更新时间
- 请求响应时间
- 自动更新状态
- 右下角显示系统状态,包括:
-
快捷键支持:
- F5键:手动刷新数据
- 空格键:切换自动更新状态
-
错误处理:
- 网络异常时显示错误提示,3秒后自动重试
- 数据加载时显示加载指示器,避免用户误操作
-
页面优化特性:
- 页面不可见时自动暂停更新,恢复可见后继续
- 防抖机制防止频繁请求
- 数据缓存避免重复加载
第七部分:系统监控与维护(未实现)
7.1 服务状态检查脚本
创建 check_services.sh:
#!/bin/bashecho "=== 服务状态检查 ==="echo "检查时间: $(date)"
echo -e "\n1. EMQX服务:"sudo systemctl is-active emqx
echo -e "\n2. MySQL服务:"sudo systemctl is-active mysql
echo -e "\n3. 数据采集服务:"sudo systemctl is-active mqtt-collector
echo -e "\n4. Web服务:"sudo systemctl is-active mqtt-web
echo -e "\n5. 端口监听状态:"netstat -tulpn | grep -E '(1883|18083|5001|3306)'
echo -e "\n6. 数据采集服务日志(最近5条):"sudo journalctl -u mqtt-collector -n 5 --no-pager
echo -e "\n7. 数据库连接测试:"mysql -u emqx_user -p123456 -e "SELECT '数据库连接正常' AS Status;" 2>/dev/null || echo "数据库连接失败"
echo -e "\n=== 检查完成 ==="7.2 数据备份脚本
创建 backup_database.sh:
#!/bin/bashBACKUP_DIR="/root/backups"DATE=$(date +%Y%m%d_%H%M%S)BACKUP_FILE="$BACKUP_DIR/emqx_data_$DATE.sql"
mkdir -p $BACKUP_DIR
echo "开始备份数据库..."mysqldump -u emqx_user -p123456 emqx_data > $BACKUP_FILE
if [ $? -eq 0 ]; then echo "备份成功: $BACKUP_FILE"
# 压缩备份文件 gzip $BACKUP_FILE
# 删除7天前的备份 find $BACKUP_DIR -name "*.sql.gz" -mtime +7 -delete
echo "备份完成,文件已压缩。"else echo "备份失败!" exit 1fi7.3 自动清理旧数据
创建 clean_old_data.sh:
#!/bin/bash# 自动清理30天前的数据
echo "开始清理旧数据..."echo "清理时间: $(date)"
# 清理30天前的数据记录mysql -u emqx_user -p123456 -D emqx_data -e "DELETE FROM mqtt_msg WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);DELETE FROM mqtt_command WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);DELETE FROM mqtt_downlink WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);"
echo "数据清理完成。"第八部分:故障排查
8.1 常见问题及解决方案
问题1:MQTT连接失败
检查步骤:1. 确认EMQX服务运行状态:sudo systemctl status emqx2. 检查端口监听:netstat -tulpn | grep 18833. 检查防火墙设置:sudo ufw status4. 检查阿里云安全组规则问题2:数据库连接失败
检查步骤:1. 确认MySQL服务状态:sudo systemctl status mysql2. 检查用户权限:mysql -u emqx_user -p1234563. 检查数据库是否存在:SHOW DATABASES;问题3:Web界面无法访问
检查步骤:1. 确认Flask服务状态:sudo systemctl status mqtt-web2. 检查端口监听:netstat -tulpn | grep 50013. 查看服务日志:sudo journalctl -u mqtt-web -f问题4:数据未保存到数据库
检查步骤:1. 检查数据采集服务日志:sudo journalctl -u mqtt-collector -f2. 检查MQTT主题订阅状态3. 检查数据库表结构是否匹配8.2 日志查看命令
# 查看实时日志sudo journalctl -f
# 查看指定服务日志sudo journalctl -u mqtt-collector -fsudo journalctl -u mqtt-web -fsudo journalctl -u emqx -f
# 查看特定时间段的日志sudo journalctl -u mqtt-collector --since "2024-01-01 00:00:00" --until "2024-01-01 23:59:59"
# 查看错误日志sudo journalctl -u mqtt-collector -p err部署完成检查清单
- EMQX服务正常运行
- MySQL服务正常运行
- 数据采集服务正常运行
- Web监控服务正常运行
- 设备能够成功连接MQTT
- 数据能够成功保存到数据库
- Web界面能够正常显示数据
- 防火墙和安全组配置正确
- 所有密码已修改为安全密码
- 定时备份任务已设置
后续维护建议
-
定期检查:
- 每周检查服务状态
- 每月检查磁盘空间
- 每季度更新系统和软件
-
监控报警:
- 设置服务异常报警
- 监控磁盘使用率
- 监控数据库连接数
-
文档更新:
- 记录每次配置变更
- 更新设备清单
- 备份配置文件
最后更新:2026/1/11