「学习笔记」MQTT协议详解与服务器搭建
文章目录
1. MQTT协议概述
MQTT(Message Queuing Telemetry Transport) 是一种基于 发布/订阅 模式的轻量级消息传输协议,专为低带宽、高延迟、不可靠网络环境设计。该协议由 IBM 于 1999 年发布,广泛应用于物联网(IoT)、智能家居、工业自动化等领域。
1.1 MQTT 的核心特点
- 轻量级:协议头部最小仅 2 字节,功耗低,适合资源受限的设备
- 发布/订阅模式:解耦生产者和消费者,通过主题(Topic)实现消息路由,支持一对多通信
- QoS 等级:提供三种消息传输服务质量等级
- QoS 0:最多一次,消息可能丢失
- QoS 1:至少一次,消息可能重复
- QoS 2:恰好一次,确保消息不丢失且不重复
- 会话保持:支持持久会话和自动重连
- 遗嘱消息(Will Message):客户端异常断开时,Broker 自动向指定主题发布预设消息
1.2 工作原理
MQTT 采用发布/订阅(Publish/Subscribe)模式,区别于传统的客户端/服务器模式:
graph LR
Publisher["发布者
(Publisher)"]
Broker["Broker
(服务器)"]
Subscriber["订阅者
(Subscriber)"]
Publisher -->|发布消息| Broker
Broker -->|推送消息| Subscriber
消息 由 主题(Topic)和 负载(Payload)组成。主题用于路由消息,负载是消息的具体内容。
- 发布者(Publisher):向指定主题发布消息
- 订阅者(Subscriber):订阅感兴趣的主题,接收相关消息
- Broker:负责接收发布者的消息并分发给匹配的订阅者
1.3 使用场景
- 物联网:传感器数据采集与传输(如温湿度监测)
- 智能家居:设备间的实时通信(如灯光控制)
- 车联网:车辆状态监控与远程控制
- 工业自动化:设备间的低延迟数据交换
2. MQTT协议核心概念
2.1 发布/订阅模式
发布/订阅模式的优势:
- 解耦:发布者和订阅者无需感知彼此的存在
- 异步:消息传递异步进行,提升系统响应速度
- 可扩展:可轻松添加或移除客户端,不影响整体架构
2.2 MQTT主题
主题(Topic)是消息路由的核心,采用类似文件系统路径的层级结构:
# 主题示例
home/livingroom/temperature # 客厅温度
home/bedroom/light/status # 卧室灯状态
home/bedroom/light/command # 卧室灯控制指令
iot/devices/sensor01/data # 传感器数据
主题层级分隔符:/
home/kitchen/temperature # 表示厨房的温度数据
单层通配符:+(匹配一个层级)
home/+/temperature # 匹配 home 下任意子目录的 temperature 主题
# 例如:home/kitchen/temperature、home/livingroom/temperature
多层通配符:#(匹配当前及所有子层级,必须放在主题末尾)
home/# # 匹配 home 下的所有主题
iot/devices/# # 匹配 iot/devices 下的所有主题
2.3 MQTT QoS 服务质量
MQTT 提供三种 QoS 等级:
| QoS 等级 | 描述 | 适用场景 |
|---|---|---|
| QoS 0 | 最多交付一次(At most once),消息可能丢失 | 实时性要求高、可容忍丢数据的场景,如环境传感器 |
| QoS 1 | 至少交付一次(At least once),消息可能重复 | 需要确保收到、不介意重复的场景,如状态通知 |
| QoS 2 | 恰好交付一次(Exactly once),确保不丢失不重复 | 关键命令、支付等对可靠性要求极高的场景 |
QoS 0(最多一次):
# 发布时指定 QoS 0
client.publish("topic", payload, qos=0)
QoS 1(至少一次):
# 发布时指定 QoS 1
client.publish("topic", payload, qos=1)
QoS 2(正好一次):
# 发布时指定 QoS 2
client.publish("topic", payload, qos=2)
2.4 MQTT 客户端会话
MQTT 客户端支持两种会话模式:
Clean Session(清理会话):
# 连接时指定 clean_session=True(默认)
client.connect(host, port, clean_session=True)
# True: 不保留会话,断开后订阅关系自动清除
# False: 保留会话,断开重连后自动恢复订阅
Keep Alive(心跳保活):
# 设置心跳间隔(秒)
client.connect(host, port, keepalive=60)
# 客户端需在指定时间内发送消息或 Ping 请求,否则服务器判定连接断开
2.5 遗嘱消息(Will)
遗嘱消息用于通知客户端异常断开:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc, properties=None):
print(f"连接返回码: {rc}")
client = mqtt.Client()
client.on_connect = on_connect
# 设置遗嘱消息
client.will_set(
topic="client/status",
payload="客户端异常断开",
qos=1,
retain=True
)
client.connect("localhost", 1883, 60)
3. Docker 搭建 MQTT 服务器
3.1 环境准备
确保已安装 Docker 和 Docker Compose:
# Centos 安装 Docker(若已安装可跳过)
# 备份原源文件
sudo mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak
# 下载阿里云CentOS 7源
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
# 清理缓存并生成新缓存
sudo yum clean all
sudo yum makecache
# 安装依赖工具
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
# 添加阿里云Docker源(国内推荐)
sudo yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装最新稳定版Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io
# 启动 Docker 并设置开机自启
sudo systemctl start docker
sudo systemctl enable docker
# 检查 Docker 版本
docker --version
3.2 使用 Docker 部署 EMQX
EMQX 是高性能的开源 MQTT Broker,支持百万级并发连接,适用于生产级物联网场景。
创建 mqtt/docker-compose.yml:
services:
emqx:
image: emqx/emqx:5.8.0
container_name: emqx
ports:
- "1883:1883" # MQTT TCP 端口
- "8883:8883" # MQTT/TLS 端口
- "8083:8083" # WebSocket 端口
- "8084:8084" # WebSocket/TLS 端口
- "18083:18083" # Dashboard 端口
environment:
- EMQX_NAME=emqx
- EMQX_HOST=127.0.0.1
- EMQX_DASHBOARD__DEFAULT_USERNAME=admin
- EMQX_DASHBOARD__DEFAULT_PASSWORD=public
volumes:
- ./emqx_data:/opt/emqx/data
- ./emqx_log:/opt/emqx/log
restart: unless-stopped
启动服务:
# 创建目录
mkdir -p mqtt/emqx_data mqtt/emqx_log
# 启动 EMQX
cd mqtt
docker compose up -d
# 查看运行状态
docker compose ps
访问 Dashboard:http://localhost:18083,使用 admin/public 登录。
3.3 使用 Docker 部署 Mosquitto
Mosquitto 是轻量级的开源 MQTT Broker,适合学习和小规模部署。
创建 mosquitto/docker-compose.yml:
services:
mosquitto:
image: eclipse-mosquitto:2.0.18
container_name: mosquitto
ports:
- "1883:1883" # MQTT TCP 端口
- "9001:9001" # WebSocket 端口(可选)
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stopped
创建配置文件 mosquitto/config/mosquitto.conf:
# 监听端口
listener 1883
# 允许匿名访问(生产环境建议关闭)
allow_anonymous true
# 日志级别
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information
# 持久化存储
persistence true
persistence_location /mosquitto/data/
# 日志文件
persistence_file mosquitto.db
创建必要目录并启动:
# 创建目录
mkdir -p mosquitto/config mosquitto/data mosquitto/log
# 启动 Mosquitto
cd mosquitto
docker compose up -d
# 查看日志
docker compose logs -f
3.4 配置 MQTT 用户认证
为 Mosquitto 添加用户名密码认证:
创建密码文件 mosquitto/config/pwfile.conf:
# 进入容器生成密码
docker exec -it mosquitto sh
# 创建用户 admin(首次使用 -c 参数新建文件)
mosquitto_passwd -c /mosquitto/config/pwfile.conf admin
# 输入密码:admin123
# 创建用户 device01
mosquitto_passwd /mosquitto/config/pwfile.conf device01
# 输入密码:device123
更新配置文件 mosquitto/config/mosquitto.conf:
# 监听端口
listener 1883
# 启用密码认证
allow_anonymous false
password_file /mosquitto/config/pwfile.conf
# 日志配置
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information
# 持久化
persistence true
persistence_location /mosquitto/data/
persistence_file mosquitto.db
重启服务:
docker compose down
docker compose up -d
3.5 配置 TLS/SSL 加密连接(一)
生成自签名证书:
# 创建证书目录
mkdir -p mosquitto/certs
# 生成CA证书
openssl req -x509 -newkey rsa:4096 -keyout ca.key -out ca.crt -days 365 -nodes \
-subj "/CN=localhost/O=MQTT CA"
# 生成服务器证书
openssl req -newkey rsa:4096 -keyout server.key -out server.csr \
-nodes -subj "/CN=localhost"
# 签发服务器证书
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crt -days 365
更新配置文件 mosquitto/config/mosquitto.conf:
# TLS监听端口
listener 8883
# 证书配置
cafile /mosquitto/certs/ca.crt
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
# 要求客户端证书(可选)
# require_certificate true
# use_identity_as_username true
3.6 配置 TLS/SSL 加密连接(二)
通过 Nginx 转发 MQTT 连接,实现 TLS 加密传输。Nginx 可以作为统一的网关,隐藏后端 Mosquitto 的真实 IP,并集中管理域名和 SSL 证书。
由于 MQTT 协议分为 TCP 和 WebSocket 两种传输方式,Nginx 需要使用不同的模块来进行代理。以下是具体的配置方案:
3.6.1 前置检查:确认 Nginx 模块支持
Nginx 代理 TCP 协议需要 stream 模块。请先检查您的 Nginx 是否已编译该模块:
nginx -V 2>&1 | grep -- '--with-stream'
如果有输出,说明支持。如果没有,您可能需要重新编译 Nginx 或安装包含该模块的版本(如 OpenResty)。 安装完成后,您需要在主配置文件 nginx.conf 的最顶部(第一行)加载该模块:
load_module /usr/lib64/nginx/modules/ngx_stream_module.so;
(注:模块的实际路径可能因系统而异,安装时终端通常会提示该 .so 文件的具体位置,请根据实际情况修改)。
3.6.2 配置 TCP 协议代理(8883 端口)
TCP 代理需要配置在 nginx.conf 的 stream 块中(与 http 块同级)。这种四层代理不解析 HTTP 头,性能极高。
# ... 其他配置 ...
# ==========================================
# 1. 四层代理 (Stream) - 处理原生 MQTT TCP
# ==========================================
stream {
# --- 加密 TCP (8883) ---
upstream mqtt_tcp_ssl {
server 127.0.0.1:1883;
}
server {
listen 8883 ssl;
proxy_pass mqtt_tcp_ssl;
proxy_timeout 2h;
ssl_certificate /etc/nginx/cert/test.com.pem;
ssl_certificate_key /etc/nginx/cert/test.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
}
}
3. 配置 WebSocket 协议代理(9002 端口)
WebSocket 代理需要配置在 http 块中。必须正确设置 Upgrade 和 Connection 请求头,才能实现协议的升级。
# ==========================================
# 2. 七层代理 (HTTP) - 处理 MQTT over WebSocket
# ==========================================
http {
# ... 其他 HTTP 配置 ...
# --- 加密 WebSocket (9002) ---
server {
listen 9002 ssl;
server_name mqtt.test.com;
ssl_certificate /etc/nginx/cert/test.com.pem;
ssl_certificate_key /etc/nginx/cert/test.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
location /mqtt {
proxy_pass http://127.0.0.1:9001;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 300s;
}
}
# 其他站点配置
include /etc/nginx/conf.d/*.conf;
}
配置完成后,请务必使用 nginx -t 检查语法,然后执行 systemctl reload nginx 使配置生效。
- 最终效果:
| 客户端类型 | 协议 | 连接地址示例 | 说明 |
|---|---|---|---|
| IoT 设备/PC客户端 | TCP | mqtt://192.168.2.171:1883 |
非加密,适合内网或测试 |
| IoT 设备/PC客户端 | SSL | mqtts://mqtt.shuimuai.com:8883 |
加密连接,适合公网 |
| Web 浏览器 | WebSocket | ws://192.168.2.171:9001 |
非加密,用于开发调试 |
| Web 浏览器 | Secure WS | wss://mqtt.shuimuai.com:9002/mqtt |
生产环境推荐,浏览器强制要求加密 |
4. Python 客户端开发
4.1 安装 paho-mqtt 库
pip install paho-mqtt
4.2 基本发布者
创建 publisher.py:
import paho.mqtt.client as mqtt
import time
import json
# MQTT Broker配置
BROKER = "localhost"
PORT = 1883
TOPIC = "home/livingroom/temperature"
def on_connect(client, userdata, flags, rc, properties=None):
"""连接回调函数"""
if rc == 0:
print("连接成功!")
else:
print(f"连接失败,返回码: {rc}")
def on_publish(client, userdata, mid, properties=None):
"""发布回调函数"""
print(f"消息发送成功,mid: {mid}")
# 创建客户端
client = mqtt.Client(client_id="publisher_001")
# 设置回调函数
client.on_connect = on_connect
client.on_publish = on_publish
# 连接Broker
client.connect(BROKER, PORT, 60)
# 启动消息循环(处理网络流量)
client.loop_start()
# 发布消息
try:
for i in range(10):
# 模拟温度数据
temperature = 20 + (i % 10)
payload = json.dumps({
"temperature": temperature,
"humidity": 50 + i,
"timestamp": time.time()
})
# 发布消息
result = client.publish(TOPIC, payload, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"发布成功: {payload}")
else:
print(f"发布失败: {result.rc}")
time.sleep(2)
except KeyboardInterrupt:
print("停止发布")
finally:
client.loop_stop()
client.disconnect()
4.3 基本订阅者
创建 subscriber.py:
import paho.mqtt.client as mqtt
import json
# MQTT Broker配置
BROKER = "localhost"
PORT = 1883
TOPIC = "home/#" # 订阅所有 home 下的主题
def on_connect(client, userdata, flags, rc, properties=None):
"""连接成功回调"""
print(f"连接成功,返回码: {rc}")
# 订阅主题
client.subscribe(TOPIC, qos=1)
print(f"已订阅主题: {TOPIC}")
def on_message(client, userdata, msg):
"""收到消息回调"""
topic = msg.topic
payload = msg.payload.decode()
qos = msg.qos
print(f"\n收到消息:")
print(f" 主题: {topic}")
print(f" QoS: {qos}")
print(f" 内容: {payload}")
# 解析JSON数据
try:
data = json.loads(payload)
if "temperature" in data:
print(f" 温度: {data['temperature']}°C")
if "humidity" in data:
print(f" 湿度: {data['humidity']}%")
except json.JSONDecodeError:
pass
def on_disconnect(client, userdata, rc, properties=None):
"""断开连接回调"""
print("连接断开")
# 创建客户端
client = mqtt.Client(client_id="subscriber_001")
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 连接Broker
client.connect(BROKER, PORT, 60)
# 保持连接
client.loop_forever()
4.4 带认证的连接
import paho.mqtt.client as mqtt
# 用户认证配置
USERNAME = "admin"
PASSWORD = "admin123"
# 创建客户端
client = mqtt.Client(client_id="authenticated_client")
# 设置用户名密码
client.username_pw_set(USERNAME, PASSWORD)
# 连接Broker
client.connect("localhost", 1883, 60)
# 发布消息
client.publish("test/topic", "Hello with auth!")
4.5 带TLS连接的客户端
import paho.mqtt.client as mqtt
import ssl
# 创建客户端
client = mqtt.Client(client_id="tls_client")
# 设置TLS配置
client.tls_set(
ca_certs="mosquitto/certs/ca.crt", # CA证书
certfile="client.crt", # 客户端证书(可选)
keyfile="client.key", # 客户端密钥(可选)
tls_version=ssl.PROTOCOL_TLSv1_2
)
# 如果证书是自签名的,需要跳过验证
client.tls_insecure_set(True)
# 连接
client.connect("localhost", 8883, 60)
4.6 异步客户端
使用 asyncio 实现异步消息处理:
import asyncio
from paho.mqtt import client as mqtt_client
import json
BROKER = 'localhost'
PORT = 1883
TOPIC = "home/sensor/#"
async def connect_mqtt():
"""连接 MQTT Broker"""
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print("连接成功")
client.subscribe(TOPIC)
else:
print(f"连接失败: {rc}")
def on_message(client, userdata, msg):
print(f"收到: {msg.topic} -> {msg.payload.decode()}")
client = mqtt_client.Client(client_id='async_client')
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT)
client.loop_start()
return client
async def publish(client):
"""定时发布消息"""
await asyncio.sleep(1)
for i in range(5):
payload = json.dumps({"value": i * 10})
client.publish("home/sensor/data", payload)
print(f"发布: {payload}")
await asyncio.sleep(1)
async def main():
"""主函数"""
client = await connect_mqtt()
await asyncio.gather(
publish(client),
asyncio.sleep(10)
)
client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
5. JavaScript/Node.js 客户端开发
5.1 安装 mqtt.js
npm install mqtt
5.2 发布者示例
创建 publisher.js:
const mqtt = require('mqtt');
const { randomUUID } = require('crypto');
// Broker配置
const BROKER = 'mqtt://localhost:1883';
const TOPIC = 'home/livingroom/light';
// 创建客户端
const client = mqtt.connect(BROKER, {
clientId: 'node_publisher',
username: 'admin',
password: 'admin123'
});
client.on('connect', () => {
console.log('连接成功');
// 定时发布消息
let isOn = false;
setInterval(() => {
isOn = !isOn;
const payload = JSON.stringify({
id: randomUUID(),
status: isOn ? 'on' : 'off',
brightness: isOn ? 100 : 0,
timestamp: Date.now()
});
client.publish(TOPIC, payload, { qos: 1 }, (err) => {
if (err) {
console.error('发布失败:', err);
} else {
console.log(`发布成功: ${payload}`);
}
});
}, 2000);
});
// 错误处理
client.on('error', (err) => {
console.error('连接错误:', err);
});
process.on('SIGINT', () => {
client.end();
process.exit(0);
});
5.3 订阅者示例
创建 subscriber.js:
const mqtt = require('mqtt');
// Broker配置
const BROKER = 'mqtt://localhost:1883';
const TOPIC = 'home/#'; // 订阅所有 home 下的主题
// 创建客户端
const client = mqtt.connect(BROKER, {
clientId: 'node_subscriber',
username: 'admin',
password: 'admin123'
});
client.on('connect', () => {
console.log('订阅者连接成功');
client.subscribe(TOPIC, { qos: 1 }, (err, granted) => {
if (err) {
console.error('订阅失败:', err);
} else {
console.log(`已订阅主题:`, granted.map(t => t.topic));
}
});
});
client.on('message', (topic, message) => {
console.log('\n========== 新消息 ==========');
console.log(`主题: ${topic}`);
console.log(`内容: ${message.toString()}`);
try {
const data = JSON.parse(message.toString());
console.log('解析后:', data);
} catch (e) {
console.log('非JSON格式');
}
});
client.on('error', (err) => {
console.error('错误:', err);
});
client.on('close', () => {
console.log('连接关闭');
});
process.on('SIGINT', () => {
client.end();
process.exit(0);
});
5.4 WebSocket连接
在浏览器中使用MQTT:
const BROKER = 'ws://localhost:8083/mqtt';
const TOPIC = 'home/#';
const client = mqtt.connect(BROKER, {
clientId: 'browser_client',
username: 'admin',
password: 'admin123'
});
client.on('connect', () => {
console.log('WebSocket连接成功');
client.subscribe(TOPIC);
});
client.on('message', (topic, message) => {
console.log(`${topic}: ${message.toString()}`);
});
6. 总结
将 MQTT 应用到实际项目中时,需要重点考虑以下几个方面:
版本选择
- MQTT v3.1.1:兼容性最好,设备支持最广
- MQTT v5.0:当前最新标准,支持消息过期时间、请求/响应模式、用户属性、流控等特性。新项目强烈建议使用 v5.0
安全机制
- 传输加密:必须使用 TLS/SSL(端口 8883),禁止明文传输
- 认证:不要仅依赖 ClientID,生产环境必须开启 Username/Password、Token 或 X.509 证书认证
- ACL(访问控制):限制设备只能发布/订阅自身 Topic(如
device/{device_id}/status),防止恶意广播
高可用与扩展
- 集群部署:单机 Broker 存在单点故障风险,生产环境应使用集群方案(如 EMQX 集群、HiveMQ 集群)
- 消息桥接:使用规则引擎将 MQTT 消息桥接到 Kafka、MySQL、Redis 或时序数据库(InfluxDB/TDengine),实现业务解耦
弱网优化
- Keep Alive 设置:合理设置心跳间隔(太短费电,太长易断连)
- 协议选择:移动端可考虑 MQTT over QUIC,在弱网/高丢包环境下比 TCP 更稳定,且支持连接迁移(WiFi 与 4G 切换不断连)