MQTT协议 #

一、MQTT协议概述 #

1.1 MQTT简介 #

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。

text
┌─────────────────────────────────────────────────────────┐
│                    MQTT架构模型                          │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐            │
│  │ 发布者  │    │ 订阅者  │    │ 发布者  │            │
│  │Publisher│    │Subscriber│   │Publisher│            │
│  └────┬────┘    └────┬────┘    └────┬────┘            │
│       │              │              │                  │
│       │   Publish    │   Subscribe │                  │
│       │              │              │                  │
│       └──────────────┼──────────────┘                  │
│                      │                                 │
│               ┌──────┴──────┐                          │
│               │   Broker    │                          │
│               │  (服务器)   │                          │
│               └─────────────┘                          │
│                                                         │
└─────────────────────────────────────────────────────────┘

1.2 MQTT特性 #

特性 说明
轻量级 最小头部仅2字节
发布/订阅 解耦发布者和订阅者
QoS支持 三种消息质量等级
遗嘱消息 异常断开时通知
持久会话 离线消息保留
主题过滤 灵活的消息路由

1.3 QoS等级 #

QoS级别 说明 使用场景
QoS 0 最多一次 传感器数据,可丢失
QoS 1 至少一次 重要数据,可重复
QoS 2 恰好一次 关键数据,不丢失不重复

二、Eclipse Paho客户端 #

2.1 添加依赖 #

xml
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

2.2 MQTT客户端封装 #

java
package com.example.network.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.function.Consumer;

public class MQTTClient {

    private MqttAsyncClient client;
    private final String broker;
    private final String clientId;
    private final String username;
    private final String password;
    
    private Consumer<String> connectHandler;
    private Consumer<String> disconnectHandler;
    private Consumer<MqttMessage> messageHandler;
    
    public MQTTClient(String broker, String clientId) {
        this(broker, clientId, null, null);
    }
    
    public MQTTClient(String broker, String clientId, String username, String password) {
        this.broker = broker;
        this.clientId = clientId;
        this.username = username;
        this.password = password;
    }
    
    public void connect() throws MqttException {
        client = new MqttAsyncClient(broker, clientId, new MemoryPersistence());
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(30);
        options.setKeepAliveInterval(60);
        
        if (username != null && password != null) {
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }
        
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("连接丢失: " + cause.getMessage());
                if (disconnectHandler != null) {
                    disconnectHandler.accept(cause.getMessage());
                }
            }
            
            @Override
            public void messageArrived(String topic, MqttMessage message) {
                if (messageHandler != null) {
                    messageHandler.accept(message);
                }
            }
            
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
        });
        
        client.connect(options).waitForCompletion();
        
        if (connectHandler != null) {
            connectHandler.accept(clientId);
        }
        
        System.out.println("已连接到MQTT Broker: " + broker);
    }
    
    public void disconnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect().waitForCompletion();
            client.close();
        }
    }
    
    public void subscribe(String topic) throws MqttException {
        subscribe(topic, 1);
    }
    
    public void subscribe(String topic, int qos) throws MqttException {
        client.subscribe(topic, qos).waitForCompletion();
        System.out.println("已订阅主题: " + topic);
    }
    
    public void unsubscribe(String topic) throws MqttException {
        client.unsubscribe(topic).waitForCompletion();
        System.out.println("已取消订阅: " + topic);
    }
    
    public void publish(String topic, String payload) throws MqttException {
        publish(topic, payload, 1, false);
    }
    
    public void publish(String topic, String payload, int qos, boolean retained) 
            throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        
        client.publish(topic, message).waitForCompletion();
    }
    
    public void publish(String topic, byte[] payload, int qos, boolean retained) 
            throws MqttException {
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        message.setRetained(retained);
        
        client.publish(topic, message).waitForCompletion();
    }
    
    public boolean isConnected() {
        return client != null && client.isConnected();
    }
    
    public void onConnect(Consumer<String> handler) {
        this.connectHandler = handler;
    }
    
    public void onDisconnect(Consumer<String> handler) {
        this.disconnectHandler = handler;
    }
    
    public void onMessage(Consumer<MqttMessage> handler) {
        this.messageHandler = handler;
    }
    
    public static void main(String[] args) throws MqttException, InterruptedException {
        MQTTClient mqtt = new MQTTClient("tcp://localhost:1883", "java-client");
        
        mqtt.onConnect(id -> System.out.println("客户端已连接: " + id));
        mqtt.onDisconnect(reason -> System.out.println("连接断开: " + reason));
        mqtt.onMessage(msg -> System.out.println("收到消息: " + new String(msg.getPayload())));
        
        mqtt.connect();
        
        mqtt.subscribe("sensor/temperature");
        mqtt.subscribe("sensor/humidity");
        
        for (int i = 0; i < 10; i++) {
            String temp = String.format("%.1f", 20 + Math.random() * 10);
            mqtt.publish("sensor/temperature", temp);
            
            String humidity = String.format("%.1f", 40 + Math.random() * 20);
            mqtt.publish("sensor/humidity", humidity);
            
            Thread.sleep(5000);
        }
        
        mqtt.disconnect();
    }
}

2.3 MQTT设备管理器 #

java
package com.example.network.mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class MQTTDeviceManager {

    private final MQTTClient mqtt;
    private final String devicePrefix;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
    private final Map<String, Consumer<MqttMessage>> topicHandlers = new ConcurrentHashMap<>();
    private final Map<String, Object> deviceData = new ConcurrentHashMap<>();
    
    public MQTTDeviceManager(String broker, String deviceId) {
        this.mqtt = new MQTTClient(broker, deviceId);
        this.devicePrefix = "device/" + deviceId;
    }
    
    public void connect() throws MqttException {
        mqtt.connect();
        
        mqtt.onMessage(message -> {
            String topic = (String) message.getUserProperties().get("topic");
            Consumer<MqttMessage> handler = topicHandlers.get(topic);
            if (handler != null) {
                handler.accept(message);
            }
        });
        
        subscribe(devicePrefix + "/command/#");
        subscribe(devicePrefix + "/config/#");
    }
    
    public void disconnect() throws MqttException {
        scheduler.shutdown();
        mqtt.disconnect();
    }
    
    public void subscribe(String topic) throws MqttException {
        mqtt.subscribe(topic);
    }
    
    public void subscribe(String topic, Consumer<MqttMessage> handler) throws MqttException {
        topicHandlers.put(topic, handler);
        mqtt.subscribe(topic);
    }
    
    public void publishStatus(String status) throws MqttException {
        mqtt.publish(devicePrefix + "/status", status, 1, true);
    }
    
    public void publishData(String sensor, String value) throws MqttException {
        mqtt.publish(devicePrefix + "/data/" + sensor, value, 0, false);
    }
    
    public void publishData(String sensor, Map<String, Object> data) throws MqttException {
        StringBuilder sb = new StringBuilder("{");
        boolean first = true;
        for (Map.Entry<String, Object> entry : data.entrySet()) {
            if (!first) sb.append(",");
            sb.append("\"").append(entry.getKey()).append("\":");
            if (entry.getValue() instanceof String) {
                sb.append("\"").append(entry.getValue()).append("\"");
            } else {
                sb.append(entry.getValue());
            }
            first = false;
        }
        sb.append("}");
        mqtt.publish(devicePrefix + "/data/" + sensor, sb.toString(), 0, false);
    }
    
    public void startHeartbeat(int intervalSeconds) {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                publishStatus("online");
                publishData("heartbeat", 
                    Map.of("timestamp", System.currentTimeMillis()));
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }, 0, intervalSeconds, TimeUnit.SECONDS);
    }
    
    public void startDataPublishing(String sensor, Callable<String> dataProvider, 
                                    int intervalSeconds) {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                String value = dataProvider.call();
                publishData(sensor, value);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, intervalSeconds, TimeUnit.SECONDS);
    }
    
    public void setDeviceData(String key, Object value) {
        deviceData.put(key, value);
    }
    
    public Object getDeviceData(String key) {
        return deviceData.get(key);
    }
    
    public static void main(String[] args) throws MqttException, InterruptedException {
        MQTTDeviceManager device = new MQTTDeviceManager(
            "tcp://localhost:1883", "embedded-001");
        
        device.connect();
        device.publishStatus("online");
        
        device.subscribe("device/embedded-001/command/led", msg -> {
            String command = new String(msg.getPayload());
            System.out.println("收到LED命令: " + command);
        });
        
        device.startHeartbeat(30);
        
        device.startDataPublishing("temperature", 
            () -> String.format("%.1f", 20 + Math.random() * 10), 5);
        
        device.startDataPublishing("humidity", 
            () -> String.format("%.1f", 40 + Math.random() * 20), 5);
        
        Thread.sleep(60000);
        
        device.publishStatus("offline");
        device.disconnect();
    }
}

三、主题设计模式 #

3.1 主题层级结构 #

text
设备主题结构:
device/{device_id}/status          - 设备状态
device/{device_id}/data/{sensor}   - 传感器数据
device/{device_id}/command/{cmd}   - 命令控制
device/{device_id}/config/{key}    - 配置更新
device/{device_id}/event/{type}    - 事件通知

3.2 主题管理器 #

java
package com.example.network.mqtt;

import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.function.Consumer;

public class TopicManager {

    private final MQTTClient mqtt;
    private final String baseTopic;
    
    public TopicManager(MQTTClient mqtt, String baseTopic) {
        this.mqtt = mqtt;
        this.baseTopic = baseTopic;
    }
    
    public void publishStatus(String deviceId, String status) throws MqttException {
        String topic = baseTopic + "/" + deviceId + "/status";
        mqtt.publish(topic, status, 1, true);
    }
    
    public void publishData(String deviceId, String sensor, String value) 
            throws MqttException {
        String topic = baseTopic + "/" + deviceId + "/data/" + sensor;
        mqtt.publish(topic, value, 0, false);
    }
    
    public void publishEvent(String deviceId, String eventType, String data) 
            throws MqttException {
        String topic = baseTopic + "/" + deviceId + "/event/" + eventType;
        mqtt.publish(topic, data, 1, false);
    }
    
    public void subscribeDevice(String deviceId, Consumer<MqttMessage> handler) 
            throws MqttException {
        mqtt.subscribe(baseTopic + "/" + deviceId + "/#", handler);
    }
    
    public void subscribeCommand(String deviceId, Consumer<MqttMessage> handler) 
            throws MqttException {
        mqtt.subscribe(baseTopic + "/" + deviceId + "/command/#", handler);
    }
    
    public void subscribeAllDevices(Consumer<MqttMessage> handler) 
            throws MqttException {
        mqtt.subscribe(baseTopic + "/+/status", handler);
    }
    
    public void subscribeSensor(String sensor, Consumer<MqttMessage> handler) 
            throws MqttException {
        mqtt.subscribe(baseTopic + "/+/data/" + sensor, handler);
    }
    
    public static String parseDeviceId(String topic) {
        String[] parts = topic.split("/");
        if (parts.length >= 2) {
            return parts[1];
        }
        return null;
    }
    
    public static String parseSensorName(String topic) {
        String[] parts = topic.split("/");
        if (parts.length >= 4 && "data".equals(parts[2])) {
            return parts[3];
        }
        return null;
    }
    
    public static String parseCommand(String topic) {
        String[] parts = topic.split("/");
        if (parts.length >= 4 && "command".equals(parts[2])) {
            return parts[3];
        }
        return null;
    }
}

四、遗嘱消息 #

4.1 配置遗嘱消息 #

java
package com.example.network.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class WillMessageExample {

    public static void main(String[] args) throws MqttException {
        String broker = "tcp://localhost:1883";
        String clientId = "device-with-will";
        
        MqttAsyncClient client = new MqttAsyncClient(broker, clientId, 
            new MemoryPersistence());
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setAutomaticReconnect(true);
        
        String willTopic = "device/" + clientId + "/status";
        String willMessage = "{\"status\":\"offline\",\"reason\":\"unexpected\"}";
        options.setWill(willTopic, willMessage.getBytes(), 1, true);
        
        client.connect(options).waitForCompletion();
        
        client.publish("device/" + clientId + "/status", 
            "{\"status\":\"online\"}".getBytes(), 1, true);
        
        System.out.println("设备已上线,遗嘱消息已设置");
    }
}

五、总结 #

MQTT协议要点:

  1. 轻量级:适合资源受限的嵌入式设备
  2. 发布/订阅:解耦消息发布者和订阅者
  3. QoS选择:根据数据重要性选择合适的QoS级别
  4. 主题设计:合理设计主题层级结构
  5. 遗嘱消息:处理设备异常断开的情况

下一章我们将学习CoAP协议,了解另一种物联网通信协议。

最后更新:2026-03-27