MQTT协议 #
一、MQTT协议概述 #
1.1 MQTT简介 #
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。
text
┌─────────────────────────────────────────────────────────┐
│ MQTT架构模型 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 发布者 │ │ 订阅者 │ │ 发布者 │ │
│ │Publisher│ │Subscriber│ │Publisher│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ │ Publish │ Subscribe │ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Broker │ │
│ │ (服务器) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
1.2 MQTT特性 #
| 特性 | 说明 |
|---|---|
| 轻量级 | 最小头部仅2字节 |
| 发布/订阅 | 解耦发布者和订阅者 |
| QoS支持 | 三种消息质量等级 |
| 遗嘱消息 | 异常断开时通知 |
| 持久会话 | 离线消息保留 |
| 主题过滤 | 灵活的消息路由 |
1.3 QoS等级 #
| QoS级别 | 说明 | 使用场景 |
|---|---|---|
| QoS 0 | 最多一次 | 传感器数据,可丢失 |
| QoS 1 | 至少一次 | 重要数据,可重复 |
| QoS 2 | 恰好一次 | 关键数据,不丢失不重复 |
二、Eclipse Paho客户端 #
2.1 添加依赖 #
xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
2.2 MQTT客户端封装 #
java
package com.example.network.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.function.Consumer;
public class MQTTClient {
private MqttAsyncClient client;
private final String broker;
private final String clientId;
private final String username;
private final String password;
private Consumer<String> connectHandler;
private Consumer<String> disconnectHandler;
private Consumer<MqttMessage> messageHandler;
public MQTTClient(String broker, String clientId) {
this(broker, clientId, null, null);
}
public MQTTClient(String broker, String clientId, String username, String password) {
this.broker = broker;
this.clientId = clientId;
this.username = username;
this.password = password;
}
public void connect() throws MqttException {
client = new MqttAsyncClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(60);
if (username != null && password != null) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接丢失: " + cause.getMessage());
if (disconnectHandler != null) {
disconnectHandler.accept(cause.getMessage());
}
}
@Override
public void messageArrived(String topic, MqttMessage message) {
if (messageHandler != null) {
messageHandler.accept(message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options).waitForCompletion();
if (connectHandler != null) {
connectHandler.accept(clientId);
}
System.out.println("已连接到MQTT Broker: " + broker);
}
public void disconnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect().waitForCompletion();
client.close();
}
}
public void subscribe(String topic) throws MqttException {
subscribe(topic, 1);
}
public void subscribe(String topic, int qos) throws MqttException {
client.subscribe(topic, qos).waitForCompletion();
System.out.println("已订阅主题: " + topic);
}
public void unsubscribe(String topic) throws MqttException {
client.unsubscribe(topic).waitForCompletion();
System.out.println("已取消订阅: " + topic);
}
public void publish(String topic, String payload) throws MqttException {
publish(topic, payload, 1, false);
}
public void publish(String topic, String payload, int qos, boolean retained)
throws MqttException {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
client.publish(topic, message).waitForCompletion();
}
public void publish(String topic, byte[] payload, int qos, boolean retained)
throws MqttException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
client.publish(topic, message).waitForCompletion();
}
public boolean isConnected() {
return client != null && client.isConnected();
}
public void onConnect(Consumer<String> handler) {
this.connectHandler = handler;
}
public void onDisconnect(Consumer<String> handler) {
this.disconnectHandler = handler;
}
public void onMessage(Consumer<MqttMessage> handler) {
this.messageHandler = handler;
}
public static void main(String[] args) throws MqttException, InterruptedException {
MQTTClient mqtt = new MQTTClient("tcp://localhost:1883", "java-client");
mqtt.onConnect(id -> System.out.println("客户端已连接: " + id));
mqtt.onDisconnect(reason -> System.out.println("连接断开: " + reason));
mqtt.onMessage(msg -> System.out.println("收到消息: " + new String(msg.getPayload())));
mqtt.connect();
mqtt.subscribe("sensor/temperature");
mqtt.subscribe("sensor/humidity");
for (int i = 0; i < 10; i++) {
String temp = String.format("%.1f", 20 + Math.random() * 10);
mqtt.publish("sensor/temperature", temp);
String humidity = String.format("%.1f", 40 + Math.random() * 20);
mqtt.publish("sensor/humidity", humidity);
Thread.sleep(5000);
}
mqtt.disconnect();
}
}
2.3 MQTT设备管理器 #
java
package com.example.network.mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
public class MQTTDeviceManager {
private final MQTTClient mqtt;
private final String devicePrefix;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private final Map<String, Consumer<MqttMessage>> topicHandlers = new ConcurrentHashMap<>();
private final Map<String, Object> deviceData = new ConcurrentHashMap<>();
public MQTTDeviceManager(String broker, String deviceId) {
this.mqtt = new MQTTClient(broker, deviceId);
this.devicePrefix = "device/" + deviceId;
}
public void connect() throws MqttException {
mqtt.connect();
mqtt.onMessage(message -> {
String topic = (String) message.getUserProperties().get("topic");
Consumer<MqttMessage> handler = topicHandlers.get(topic);
if (handler != null) {
handler.accept(message);
}
});
subscribe(devicePrefix + "/command/#");
subscribe(devicePrefix + "/config/#");
}
public void disconnect() throws MqttException {
scheduler.shutdown();
mqtt.disconnect();
}
public void subscribe(String topic) throws MqttException {
mqtt.subscribe(topic);
}
public void subscribe(String topic, Consumer<MqttMessage> handler) throws MqttException {
topicHandlers.put(topic, handler);
mqtt.subscribe(topic);
}
public void publishStatus(String status) throws MqttException {
mqtt.publish(devicePrefix + "/status", status, 1, true);
}
public void publishData(String sensor, String value) throws MqttException {
mqtt.publish(devicePrefix + "/data/" + sensor, value, 0, false);
}
public void publishData(String sensor, Map<String, Object> data) throws MqttException {
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, Object> entry : data.entrySet()) {
if (!first) sb.append(",");
sb.append("\"").append(entry.getKey()).append("\":");
if (entry.getValue() instanceof String) {
sb.append("\"").append(entry.getValue()).append("\"");
} else {
sb.append(entry.getValue());
}
first = false;
}
sb.append("}");
mqtt.publish(devicePrefix + "/data/" + sensor, sb.toString(), 0, false);
}
public void startHeartbeat(int intervalSeconds) {
scheduler.scheduleAtFixedRate(() -> {
try {
publishStatus("online");
publishData("heartbeat",
Map.of("timestamp", System.currentTimeMillis()));
} catch (MqttException e) {
e.printStackTrace();
}
}, 0, intervalSeconds, TimeUnit.SECONDS);
}
public void startDataPublishing(String sensor, Callable<String> dataProvider,
int intervalSeconds) {
scheduler.scheduleAtFixedRate(() -> {
try {
String value = dataProvider.call();
publishData(sensor, value);
} catch (Exception e) {
e.printStackTrace();
}
}, 0, intervalSeconds, TimeUnit.SECONDS);
}
public void setDeviceData(String key, Object value) {
deviceData.put(key, value);
}
public Object getDeviceData(String key) {
return deviceData.get(key);
}
public static void main(String[] args) throws MqttException, InterruptedException {
MQTTDeviceManager device = new MQTTDeviceManager(
"tcp://localhost:1883", "embedded-001");
device.connect();
device.publishStatus("online");
device.subscribe("device/embedded-001/command/led", msg -> {
String command = new String(msg.getPayload());
System.out.println("收到LED命令: " + command);
});
device.startHeartbeat(30);
device.startDataPublishing("temperature",
() -> String.format("%.1f", 20 + Math.random() * 10), 5);
device.startDataPublishing("humidity",
() -> String.format("%.1f", 40 + Math.random() * 20), 5);
Thread.sleep(60000);
device.publishStatus("offline");
device.disconnect();
}
}
三、主题设计模式 #
3.1 主题层级结构 #
text
设备主题结构:
device/{device_id}/status - 设备状态
device/{device_id}/data/{sensor} - 传感器数据
device/{device_id}/command/{cmd} - 命令控制
device/{device_id}/config/{key} - 配置更新
device/{device_id}/event/{type} - 事件通知
3.2 主题管理器 #
java
package com.example.network.mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.function.Consumer;
public class TopicManager {
private final MQTTClient mqtt;
private final String baseTopic;
public TopicManager(MQTTClient mqtt, String baseTopic) {
this.mqtt = mqtt;
this.baseTopic = baseTopic;
}
public void publishStatus(String deviceId, String status) throws MqttException {
String topic = baseTopic + "/" + deviceId + "/status";
mqtt.publish(topic, status, 1, true);
}
public void publishData(String deviceId, String sensor, String value)
throws MqttException {
String topic = baseTopic + "/" + deviceId + "/data/" + sensor;
mqtt.publish(topic, value, 0, false);
}
public void publishEvent(String deviceId, String eventType, String data)
throws MqttException {
String topic = baseTopic + "/" + deviceId + "/event/" + eventType;
mqtt.publish(topic, data, 1, false);
}
public void subscribeDevice(String deviceId, Consumer<MqttMessage> handler)
throws MqttException {
mqtt.subscribe(baseTopic + "/" + deviceId + "/#", handler);
}
public void subscribeCommand(String deviceId, Consumer<MqttMessage> handler)
throws MqttException {
mqtt.subscribe(baseTopic + "/" + deviceId + "/command/#", handler);
}
public void subscribeAllDevices(Consumer<MqttMessage> handler)
throws MqttException {
mqtt.subscribe(baseTopic + "/+/status", handler);
}
public void subscribeSensor(String sensor, Consumer<MqttMessage> handler)
throws MqttException {
mqtt.subscribe(baseTopic + "/+/data/" + sensor, handler);
}
public static String parseDeviceId(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 2) {
return parts[1];
}
return null;
}
public static String parseSensorName(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 4 && "data".equals(parts[2])) {
return parts[3];
}
return null;
}
public static String parseCommand(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 4 && "command".equals(parts[2])) {
return parts[3];
}
return null;
}
}
四、遗嘱消息 #
4.1 配置遗嘱消息 #
java
package com.example.network.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class WillMessageExample {
public static void main(String[] args) throws MqttException {
String broker = "tcp://localhost:1883";
String clientId = "device-with-will";
MqttAsyncClient client = new MqttAsyncClient(broker, clientId,
new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setAutomaticReconnect(true);
String willTopic = "device/" + clientId + "/status";
String willMessage = "{\"status\":\"offline\",\"reason\":\"unexpected\"}";
options.setWill(willTopic, willMessage.getBytes(), 1, true);
client.connect(options).waitForCompletion();
client.publish("device/" + clientId + "/status",
"{\"status\":\"online\"}".getBytes(), 1, true);
System.out.println("设备已上线,遗嘱消息已设置");
}
}
五、总结 #
MQTT协议要点:
- 轻量级:适合资源受限的嵌入式设备
- 发布/订阅:解耦消息发布者和订阅者
- QoS选择:根据数据重要性选择合适的QoS级别
- 主题设计:合理设计主题层级结构
- 遗嘱消息:处理设备异常断开的情况
下一章我们将学习CoAP协议,了解另一种物联网通信协议。
最后更新:2026-03-27