「学习笔记」MQTT协议详解与服务器搭建

文章目录

1. MQTT协议概述

MQTT(Message Queuing Telemetry Transport) 是一种基于 发布/订阅 模式的轻量级消息传输协议,专为低带宽、高延迟、不可靠网络环境设计。该协议由 IBM 于 1999 年发布,广泛应用于物联网(IoT)、智能家居、工业自动化等领域。

1.1 MQTT 的核心特点

  • 轻量级:协议头部最小仅 2 字节,功耗低,适合资源受限的设备
  • 发布/订阅模式:解耦生产者和消费者,通过主题(Topic)实现消息路由,支持一对多通信
  • QoS 等级:提供三种消息传输服务质量等级
    • QoS 0:最多一次,消息可能丢失
    • QoS 1:至少一次,消息可能重复
    • QoS 2:恰好一次,确保消息不丢失且不重复
  • 会话保持:支持持久会话和自动重连
  • 遗嘱消息(Will Message):客户端异常断开时,Broker 自动向指定主题发布预设消息

1.2 工作原理

MQTT 采用发布/订阅(Publish/Subscribe)模式,区别于传统的客户端/服务器模式:

graph LR
    Publisher["发布者
(Publisher)"] Broker["Broker
(服务器)"] Subscriber["订阅者
(Subscriber)"] Publisher -->|发布消息| Broker Broker -->|推送消息| Subscriber

消息主题(Topic)和 负载(Payload)组成。主题用于路由消息,负载是消息的具体内容。

  • 发布者(Publisher):向指定主题发布消息
  • 订阅者(Subscriber):订阅感兴趣的主题,接收相关消息
  • Broker:负责接收发布者的消息并分发给匹配的订阅者

1.3 使用场景

  • 物联网:传感器数据采集与传输(如温湿度监测)
  • 智能家居:设备间的实时通信(如灯光控制)
  • 车联网:车辆状态监控与远程控制
  • 工业自动化:设备间的低延迟数据交换

2. MQTT协议核心概念

2.1 发布/订阅模式

发布/订阅模式的优势

  • 解耦:发布者和订阅者无需感知彼此的存在
  • 异步:消息传递异步进行,提升系统响应速度
  • 可扩展:可轻松添加或移除客户端,不影响整体架构

2.2 MQTT主题

主题(Topic)是消息路由的核心,采用类似文件系统路径的层级结构:

# 主题示例
home/livingroom/temperature    # 客厅温度
home/bedroom/light/status      # 卧室灯状态
home/bedroom/light/command     # 卧室灯控制指令
iot/devices/sensor01/data      # 传感器数据

主题层级分隔符/

home/kitchen/temperature  # 表示厨房的温度数据

单层通配符+(匹配一个层级)

home/+/temperature  # 匹配 home 下任意子目录的 temperature 主题
# 例如:home/kitchen/temperature、home/livingroom/temperature

多层通配符#(匹配当前及所有子层级,必须放在主题末尾)

home/#              # 匹配 home 下的所有主题
iot/devices/#       # 匹配 iot/devices 下的所有主题

2.3 MQTT QoS 服务质量

MQTT 提供三种 QoS 等级:

QoS 等级 描述 适用场景
QoS 0 最多交付一次(At most once),消息可能丢失 实时性要求高、可容忍丢数据的场景,如环境传感器
QoS 1 至少交付一次(At least once),消息可能重复 需要确保收到、不介意重复的场景,如状态通知
QoS 2 恰好交付一次(Exactly once),确保不丢失不重复 关键命令、支付等对可靠性要求极高的场景

QoS 0(最多一次)

# 发布时指定 QoS 0
client.publish("topic", payload, qos=0)

QoS 1(至少一次)

# 发布时指定 QoS 1
client.publish("topic", payload, qos=1)

QoS 2(正好一次)

# 发布时指定 QoS 2
client.publish("topic", payload, qos=2)

2.4 MQTT 客户端会话

MQTT 客户端支持两种会话模式:

Clean Session(清理会话)

# 连接时指定 clean_session=True(默认)
client.connect(host, port, clean_session=True)
# True:  不保留会话,断开后订阅关系自动清除
# False: 保留会话,断开重连后自动恢复订阅

Keep Alive(心跳保活)

# 设置心跳间隔(秒)
client.connect(host, port, keepalive=60)
# 客户端需在指定时间内发送消息或 Ping 请求,否则服务器判定连接断开

2.5 遗嘱消息(Will)

遗嘱消息用于通知客户端异常断开:

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc, properties=None):
    print(f"连接返回码: {rc}")

client = mqtt.Client()
client.on_connect = on_connect

# 设置遗嘱消息
client.will_set(
    topic="client/status",
    payload="客户端异常断开",
    qos=1,
    retain=True
)

client.connect("localhost", 1883, 60)

3. Docker 搭建 MQTT 服务器

3.1 环境准备

确保已安装 Docker 和 Docker Compose:

# Centos 安装 Docker(若已安装可跳过)
# 备份原源文件
sudo mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak
# 下载阿里云CentOS 7源
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
# 清理缓存并生成新缓存
sudo yum clean all
sudo yum makecache
# 安装依赖工具
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
# 添加阿里云Docker源(国内推荐)
sudo yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装最新稳定版Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io
# 启动 Docker 并设置开机自启
sudo systemctl start docker
sudo systemctl enable docker
# 检查 Docker 版本
docker --version

3.2 使用 Docker 部署 EMQX

EMQX 是高性能的开源 MQTT Broker,支持百万级并发连接,适用于生产级物联网场景。

创建 mqtt/docker-compose.yml

services:
  emqx:
    image: emqx/emqx:5.8.0
    container_name: emqx
    ports:
      - "1883:1883"      # MQTT TCP 端口
      - "8883:8883"      # MQTT/TLS 端口
      - "8083:8083"      # WebSocket 端口
      - "8084:8084"      # WebSocket/TLS 端口
      - "18083:18083"    # Dashboard 端口
    environment:
      - EMQX_NAME=emqx
      - EMQX_HOST=127.0.0.1
      - EMQX_DASHBOARD__DEFAULT_USERNAME=admin
      - EMQX_DASHBOARD__DEFAULT_PASSWORD=public
    volumes:
      - ./emqx_data:/opt/emqx/data
      - ./emqx_log:/opt/emqx/log
    restart: unless-stopped

启动服务:

# 创建目录
mkdir -p mqtt/emqx_data mqtt/emqx_log

# 启动 EMQX
cd mqtt
docker compose up -d

# 查看运行状态
docker compose ps

访问 Dashboard:http://localhost:18083,使用 admin/public 登录。

3.3 使用 Docker 部署 Mosquitto

Mosquitto 是轻量级的开源 MQTT Broker,适合学习和小规模部署。

创建 mosquitto/docker-compose.yml

services:
  mosquitto:
    image: eclipse-mosquitto:2.0.18
    container_name: mosquitto
    ports:
      - "1883:1883"      # MQTT TCP 端口
      - "9001:9001"      # WebSocket 端口(可选)
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    restart: unless-stopped

创建配置文件 mosquitto/config/mosquitto.conf

# 监听端口
listener 1883

# 允许匿名访问(生产环境建议关闭)
allow_anonymous true

# 日志级别
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information

# 持久化存储
persistence true
persistence_location /mosquitto/data/

# 日志文件
persistence_file mosquitto.db

创建必要目录并启动:

# 创建目录
mkdir -p mosquitto/config mosquitto/data mosquitto/log

# 启动 Mosquitto
cd mosquitto
docker compose up -d

# 查看日志
docker compose logs -f

3.4 配置 MQTT 用户认证

为 Mosquitto 添加用户名密码认证:

创建密码文件 mosquitto/config/pwfile.conf

# 进入容器生成密码
docker exec -it mosquitto sh

# 创建用户 admin(首次使用 -c 参数新建文件)
mosquitto_passwd -c /mosquitto/config/pwfile.conf admin
# 输入密码:admin123

# 创建用户 device01
mosquitto_passwd /mosquitto/config/pwfile.conf device01
# 输入密码:device123

更新配置文件 mosquitto/config/mosquitto.conf

# 监听端口
listener 1883

# 启用密码认证
allow_anonymous false
password_file /mosquitto/config/pwfile.conf

# 日志配置
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information

# 持久化
persistence true
persistence_location /mosquitto/data/
persistence_file mosquitto.db

重启服务:

docker compose down
docker compose up -d

3.5 配置 TLS/SSL 加密连接(一)

生成自签名证书:

# 创建证书目录
mkdir -p mosquitto/certs

# 生成CA证书
openssl req -x509 -newkey rsa:4096 -keyout ca.key -out ca.crt -days 365 -nodes \
    -subj "/CN=localhost/O=MQTT CA"

# 生成服务器证书
openssl req -newkey rsa:4096 -keyout server.key -out server.csr \
    -nodes -subj "/CN=localhost"

# 签发服务器证书
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key \
    -CAcreateserial -out server.crt -days 365

更新配置文件 mosquitto/config/mosquitto.conf

# TLS监听端口
listener 8883

# 证书配置
cafile /mosquitto/certs/ca.crt
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key

# 要求客户端证书(可选)
# require_certificate true
# use_identity_as_username true

3.6 配置 TLS/SSL 加密连接(二)

通过 Nginx 转发 MQTT 连接,实现 TLS 加密传输。Nginx 可以作为统一的网关,隐藏后端 Mosquitto 的真实 IP,并集中管理域名和 SSL 证书。

由于 MQTT 协议分为 TCP 和 WebSocket 两种传输方式,Nginx 需要使用不同的模块来进行代理。以下是具体的配置方案:

3.6.1 前置检查:确认 Nginx 模块支持

Nginx 代理 TCP 协议需要 stream 模块。请先检查您的 Nginx 是否已编译该模块:

nginx -V 2>&1 | grep -- '--with-stream'

如果有输出,说明支持。如果没有,您可能需要重新编译 Nginx 或安装包含该模块的版本(如 OpenResty)。 安装完成后,您需要在主配置文件 nginx.conf 的最顶部(第一行)加载该模块:

load_module /usr/lib64/nginx/modules/ngx_stream_module.so;

(注:模块的实际路径可能因系统而异,安装时终端通常会提示该 .so 文件的具体位置,请根据实际情况修改)。

3.6.2 配置 TCP 协议代理(8883 端口)

TCP 代理需要配置在 nginx.confstream 块中(与 http 块同级)。这种四层代理不解析 HTTP 头,性能极高。

# ... 其他配置 ...

# ==========================================
# 1. 四层代理 (Stream) - 处理原生 MQTT TCP
# ==========================================
stream {
    # --- 加密 TCP (8883) ---
    upstream mqtt_tcp_ssl {
        server 127.0.0.1:1883;
    }
    server {
        listen 8883 ssl;
        proxy_pass mqtt_tcp_ssl;
        proxy_timeout 2h;

        ssl_certificate /etc/nginx/cert/test.com.pem;
        ssl_certificate_key /etc/nginx/cert/test.com.key;
        ssl_protocols TLSv1.2 TLSv1.3;
    }
}

3. 配置 WebSocket 协议代理(9002 端口)

WebSocket 代理需要配置在 http 块中。必须正确设置 UpgradeConnection 请求头,才能实现协议的升级。

# ==========================================
# 2. 七层代理 (HTTP) - 处理 MQTT over WebSocket
# ==========================================
http {
    # ... 其他 HTTP 配置 ...

    # --- 加密 WebSocket (9002) ---
    server {
        listen 9002 ssl;
        server_name mqtt.test.com;

        ssl_certificate /etc/nginx/cert/test.com.pem;
        ssl_certificate_key /etc/nginx/cert/test.com.key;
        ssl_protocols TLSv1.2 TLSv1.3;
        
        location /mqtt {
            proxy_pass http://127.0.0.1:9001;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_read_timeout 300s;
        }
    }

    # 其他站点配置
    include /etc/nginx/conf.d/*.conf;
}

配置完成后,请务必使用 nginx -t 检查语法,然后执行 systemctl reload nginx 使配置生效。

  • 最终效果:
客户端类型 协议 连接地址示例 说明
IoT 设备/PC客户端 TCP mqtt://192.168.2.171:1883 非加密,适合内网或测试
IoT 设备/PC客户端 SSL mqtts://mqtt.shuimuai.com:8883 加密连接,适合公网
Web 浏览器 WebSocket ws://192.168.2.171:9001 非加密,用于开发调试
Web 浏览器 Secure WS wss://mqtt.shuimuai.com:9002/mqtt 生产环境推荐,浏览器强制要求加密

4. Python 客户端开发

4.1 安装 paho-mqtt 库

pip install paho-mqtt

4.2 基本发布者

创建 publisher.py

import paho.mqtt.client as mqtt
import time
import json

# MQTT Broker配置
BROKER = "localhost"
PORT = 1883
TOPIC = "home/livingroom/temperature"

def on_connect(client, userdata, flags, rc, properties=None):
    """连接回调函数"""
    if rc == 0:
        print("连接成功!")
    else:
        print(f"连接失败,返回码: {rc}")

def on_publish(client, userdata, mid, properties=None):
    """发布回调函数"""
    print(f"消息发送成功,mid: {mid}")

# 创建客户端
client = mqtt.Client(client_id="publisher_001")

# 设置回调函数
client.on_connect = on_connect
client.on_publish = on_publish

# 连接Broker
client.connect(BROKER, PORT, 60)

# 启动消息循环(处理网络流量)
client.loop_start()

# 发布消息
try:
    for i in range(10):
        # 模拟温度数据
        temperature = 20 + (i % 10)
        payload = json.dumps({
            "temperature": temperature,
            "humidity": 50 + i,
            "timestamp": time.time()
        })
        
        # 发布消息
        result = client.publish(TOPIC, payload, qos=1)
        
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            print(f"发布成功: {payload}")
        else:
            print(f"发布失败: {result.rc}")
        
        time.sleep(2)

except KeyboardInterrupt:
    print("停止发布")

finally:
    client.loop_stop()
    client.disconnect()

4.3 基本订阅者

创建 subscriber.py

import paho.mqtt.client as mqtt
import json

# MQTT Broker配置
BROKER = "localhost"
PORT = 1883
TOPIC = "home/#"  # 订阅所有 home 下的主题

def on_connect(client, userdata, flags, rc, properties=None):
    """连接成功回调"""
    print(f"连接成功,返回码: {rc}")
    # 订阅主题
    client.subscribe(TOPIC, qos=1)
    print(f"已订阅主题: {TOPIC}")

def on_message(client, userdata, msg):
    """收到消息回调"""
    topic = msg.topic
    payload = msg.payload.decode()
    qos = msg.qos
    
    print(f"\n收到消息:")
    print(f"  主题: {topic}")
    print(f"  QoS: {qos}")
    print(f"  内容: {payload}")
    
    # 解析JSON数据
    try:
        data = json.loads(payload)
        if "temperature" in data:
            print(f"  温度: {data['temperature']}°C")
        if "humidity" in data:
            print(f"  湿度: {data['humidity']}%")
    except json.JSONDecodeError:
        pass

def on_disconnect(client, userdata, rc, properties=None):
    """断开连接回调"""
    print("连接断开")

# 创建客户端
client = mqtt.Client(client_id="subscriber_001")

# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# 连接Broker
client.connect(BROKER, PORT, 60)

# 保持连接
client.loop_forever()

4.4 带认证的连接

import paho.mqtt.client as mqtt

# 用户认证配置
USERNAME = "admin"
PASSWORD = "admin123"

# 创建客户端
client = mqtt.Client(client_id="authenticated_client")

# 设置用户名密码
client.username_pw_set(USERNAME, PASSWORD)

# 连接Broker
client.connect("localhost", 1883, 60)

# 发布消息
client.publish("test/topic", "Hello with auth!")

4.5 带TLS连接的客户端

import paho.mqtt.client as mqtt
import ssl

# 创建客户端
client = mqtt.Client(client_id="tls_client")

# 设置TLS配置
client.tls_set(
    ca_certs="mosquitto/certs/ca.crt",  # CA证书
    certfile="client.crt",               # 客户端证书(可选)
    keyfile="client.key",                # 客户端密钥(可选)
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# 如果证书是自签名的,需要跳过验证
client.tls_insecure_set(True)

# 连接
client.connect("localhost", 8883, 60)

4.6 异步客户端

使用 asyncio 实现异步消息处理:

import asyncio
from paho.mqtt import client as mqtt_client
import json

BROKER = 'localhost'
PORT = 1883
TOPIC = "home/sensor/#"

async def connect_mqtt():
    """连接 MQTT Broker"""
    def on_connect(client, userdata, flags, rc, properties=None):
        if rc == 0:
            print("连接成功")
            client.subscribe(TOPIC)
        else:
            print(f"连接失败: {rc}")
    
    def on_message(client, userdata, msg):
        print(f"收到: {msg.topic} -> {msg.payload.decode()}")
    
    client = mqtt_client.Client(client_id='async_client')
    client.on_connect = on_connect
    client.on_message = on_message
    
    client.connect(BROKER, PORT)
    client.loop_start()
    
    return client

async def publish(client):
    """定时发布消息"""
    await asyncio.sleep(1)
    for i in range(5):
        payload = json.dumps({"value": i * 10})
        client.publish("home/sensor/data", payload)
        print(f"发布: {payload}")
        await asyncio.sleep(1)

async def main():
    """主函数"""
    client = await connect_mqtt()
    await asyncio.gather(
        publish(client),
        asyncio.sleep(10)
    )
    client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

5. JavaScript/Node.js 客户端开发

5.1 安装 mqtt.js

npm install mqtt

5.2 发布者示例

创建 publisher.js

const mqtt = require('mqtt');
const { randomUUID } = require('crypto');

// Broker配置
const BROKER = 'mqtt://localhost:1883';
const TOPIC = 'home/livingroom/light';

// 创建客户端
const client = mqtt.connect(BROKER, {
    clientId: 'node_publisher',
    username: 'admin',
    password: 'admin123'
});

client.on('connect', () => {
    console.log('连接成功');
    
    // 定时发布消息
    let isOn = false;
    setInterval(() => {
        isOn = !isOn;
        const payload = JSON.stringify({
            id: randomUUID(),
            status: isOn ? 'on' : 'off',
            brightness: isOn ? 100 : 0,
            timestamp: Date.now()
        });
        
        client.publish(TOPIC, payload, { qos: 1 }, (err) => {
            if (err) {
                console.error('发布失败:', err);
            } else {
                console.log(`发布成功: ${payload}`);
            }
        });
    }, 2000);
});

// 错误处理
client.on('error', (err) => {
    console.error('连接错误:', err);
});

process.on('SIGINT', () => {
    client.end();
    process.exit(0);
});

5.3 订阅者示例

创建 subscriber.js

const mqtt = require('mqtt');

// Broker配置
const BROKER = 'mqtt://localhost:1883';
const TOPIC = 'home/#';  // 订阅所有 home 下的主题

// 创建客户端
const client = mqtt.connect(BROKER, {
    clientId: 'node_subscriber',
    username: 'admin',
    password: 'admin123'
});

client.on('connect', () => {
    console.log('订阅者连接成功');
    client.subscribe(TOPIC, { qos: 1 }, (err, granted) => {
        if (err) {
            console.error('订阅失败:', err);
        } else {
            console.log(`已订阅主题:`, granted.map(t => t.topic));
        }
    });
});

client.on('message', (topic, message) => {
    console.log('\n========== 新消息 ==========');
    console.log(`主题: ${topic}`);
    console.log(`内容: ${message.toString()}`);
    
    try {
        const data = JSON.parse(message.toString());
        console.log('解析后:', data);
    } catch (e) {
        console.log('非JSON格式');
    }
});

client.on('error', (err) => {
    console.error('错误:', err);
});

client.on('close', () => {
    console.log('连接关闭');
});

process.on('SIGINT', () => {
    client.end();
    process.exit(0);
});

5.4 WebSocket连接

在浏览器中使用MQTT:

const BROKER = 'ws://localhost:8083/mqtt';
const TOPIC = 'home/#';

const client = mqtt.connect(BROKER, {
    clientId: 'browser_client',
    username: 'admin',
    password: 'admin123'
});

client.on('connect', () => {
    console.log('WebSocket连接成功');
    client.subscribe(TOPIC);
});

client.on('message', (topic, message) => {
    console.log(`${topic}: ${message.toString()}`);
});

6. 总结

将 MQTT 应用到实际项目中时,需要重点考虑以下几个方面:

版本选择

  • MQTT v3.1.1:兼容性最好,设备支持最广
  • MQTT v5.0:当前最新标准,支持消息过期时间、请求/响应模式、用户属性、流控等特性。新项目强烈建议使用 v5.0

安全机制

  • 传输加密:必须使用 TLS/SSL(端口 8883),禁止明文传输
  • 认证:不要仅依赖 ClientID,生产环境必须开启 Username/Password、Token 或 X.509 证书认证
  • ACL(访问控制):限制设备只能发布/订阅自身 Topic(如 device/{device_id}/status),防止恶意广播

高可用与扩展

  • 集群部署:单机 Broker 存在单点故障风险,生产环境应使用集群方案(如 EMQX 集群、HiveMQ 集群)
  • 消息桥接:使用规则引擎将 MQTT 消息桥接到 Kafka、MySQL、Redis 或时序数据库(InfluxDB/TDengine),实现业务解耦

弱网优化

  • Keep Alive 设置:合理设置心跳间隔(太短费电,太长易断连)
  • 协议选择:移动端可考虑 MQTT over QUIC,在弱网/高丢包环境下比 TCP 更稳定,且支持连接迁移(WiFi 与 4G 切换不断连)
END .

相关系列文章

×