工业数据网关 #
一、项目概述 #
1.1 项目目标 #
构建一个工业数据网关,实现工业协议转换、数据采集、边缘计算和云平台对接。
text
┌─────────────────────────────────────────────────────────┐
│ 工业数据网关架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 云平台 / MES系统 │ │
│ └────────────────────────┬────────────────────────┘ │
│ │ MQTT/HTTP │
│ ┌────────────────────────┴────────────────────────┐ │
│ │ 工业数据网关 │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ 协议转换层 │ │ │
│ │ │ Modbus │ OPC-UA │ MQTT │ HTTP │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ 数据处理层 │ │ │
│ │ │ 采集 │ 缓存 │ 计算 │ 报警 │ 上传 │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴────────────────────────┐ │
│ │ 工业设备 │ │
│ │ PLC │ 传感器 │ 变频器 │ 仪表 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
1.2 功能模块 #
| 模块 | 功能 |
|---|---|
| 协议转换 | Modbus RTU/TCP、OPC-UA转换 |
| 数据采集 | 定时采集、事件触发 |
| 边缘计算 | 数据过滤、聚合、计算 |
| 本地存储 | 断网缓存、历史数据 |
| 云端对接 | MQTT/HTTP数据上传 |
| 远程配置 | 参数远程配置 |
二、协议转换 #
2.1 Modbus RTU采集 #
java
package com.example.gateway.protocol;
import java.util.*;
import java.util.concurrent.*;
public class ModbusRTUCollector {
private final UARTManager uart;
private final int slaveId;
private final Map<String, RegisterConfig> registers = new HashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public static class RegisterConfig {
public final String name;
public final int address;
public final int count;
public final String type;
public final double scale;
public final double offset;
public RegisterConfig(String name, int address, int count,
String type, double scale, double offset) {
this.name = name;
this.address = address;
this.count = count;
this.type = type;
this.scale = scale;
this.offset = offset;
}
}
public ModbusRTUCollector(String port, int baudRate, int slaveId) {
this.uart = new UARTManager(port, baudRate);
this.slaveId = slaveId;
}
public void addRegister(String name, int address, int count,
String type, double scale, double offset) {
registers.put(name, new RegisterConfig(name, address, count, type, scale, offset));
}
public void startPolling(int intervalMs) {
scheduler.scheduleAtFixedRate(this::poll, 0,
intervalMs, TimeUnit.MILLISECONDS);
}
private void poll() {
for (RegisterConfig reg : registers.values()) {
try {
int[] values = readHoldingRegisters(reg.address, reg.count);
double processedValue = processValue(values, reg);
notifyListeners(reg.name, processedValue);
} catch (Exception e) {
System.err.println("读取失败: " + reg.name);
}
}
}
private int[] readHoldingRegisters(int address, int count) throws Exception {
byte[] request = buildRequest(address, count);
uart.write(request);
Thread.sleep(50);
byte[] response = uart.read(3 + count * 2 + 2);
return parseResponse(response);
}
private byte[] buildRequest(int address, int count) {
byte[] request = new byte[8];
request[0] = (byte) slaveId;
request[1] = 0x03;
request[2] = (byte) ((address >> 8) & 0xFF);
request[3] = (byte) (address & 0xFF);
request[4] = (byte) ((count >> 8) & 0xFF);
request[5] = (byte) (count & 0xFF);
int crc = calculateCRC(request, 0, 6);
request[6] = (byte) (crc & 0xFF);
request[7] = (byte) ((crc >> 8) & 0xFF);
return request;
}
private double processValue(int[] values, RegisterConfig reg) {
int rawValue = values[0];
if (reg.count == 2) {
rawValue = (values[0] << 16) | values[1];
}
return rawValue * reg.scale + reg.offset;
}
private int calculateCRC(byte[] data, int offset, int length) {
int crc = 0xFFFF;
for (int i = offset; i < offset + length; i++) {
crc ^= (data[i] & 0xFF);
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) != 0) {
crc = (crc >> 1) ^ 0xA001;
} else {
crc >>= 1;
}
}
}
return crc;
}
}
2.2 数据处理引擎 #
java
package com.example.gateway.engine;
import java.util.*;
import java.util.concurrent.*;
public class DataProcessingEngine {
private final BlockingQueue<DataPoint> queue = new LinkedBlockingQueue<>();
private final List<DataProcessor> processors = new ArrayList<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
private volatile boolean running = false;
public static class DataPoint {
public final String tag;
public final double value;
public final long timestamp;
public DataPoint(String tag, double value) {
this.tag = tag;
this.value = value;
this.timestamp = System.currentTimeMillis();
}
}
public interface DataProcessor {
void process(DataPoint point);
}
public void addProcessor(DataProcessor processor) {
processors.add(processor);
}
public void submit(String tag, double value) {
queue.offer(new DataPoint(tag, value));
}
public void start() {
running = true;
executor.submit(() -> {
while (running) {
try {
DataPoint point = queue.take();
for (DataProcessor processor : processors) {
processor.process(point);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
public void stop() {
running = false;
executor.shutdown();
}
}
三、边缘计算 #
3.1 数据聚合 #
java
package com.example.gateway.edge;
import java.util.*;
import java.util.concurrent.*;
public class DataAggregator {
private final Map<String, List<Double>> buffers = new ConcurrentHashMap<>();
private final Map<String, AggregationConfig> configs = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public static class AggregationConfig {
public final String tag;
public final int windowSize;
public final AggregationType type;
public AggregationConfig(String tag, int windowSize, AggregationType type) {
this.tag = tag;
this.windowSize = windowSize;
this.type = type;
}
}
public enum AggregationType {
AVERAGE, MAX, MIN, SUM, COUNT
}
public void configure(String tag, int windowSize, AggregationType type) {
configs.put(tag, new AggregationConfig(tag, windowSize, type));
buffers.put(tag, new CopyOnWriteArrayList<>());
}
public void addValue(String tag, double value) {
List<Double> buffer = buffers.get(tag);
if (buffer != null) {
buffer.add(value);
}
}
public void startAggregation(int intervalSeconds,
Consumer<String, Double> callback) {
scheduler.scheduleAtFixedRate(() -> {
for (Map.Entry<String, List<Double>> entry : buffers.entrySet()) {
String tag = entry.getKey();
List<Double> buffer = entry.getValue();
if (buffer.isEmpty()) continue;
AggregationConfig config = configs.get(tag);
double result = aggregate(buffer, config.type);
callback.accept(tag, result);
buffer.clear();
}
}, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
}
private double aggregate(List<Double> values, AggregationType type) {
if (values.isEmpty()) return 0;
switch (type) {
case AVERAGE:
return values.stream().mapToDouble(d -> d).average().orElse(0);
case MAX:
return values.stream().mapToDouble(d -> d).max().orElse(0);
case MIN:
return values.stream().mapToDouble(d -> d).min().orElse(0);
case SUM:
return values.stream().mapToDouble(d -> d).sum();
case COUNT:
return values.size();
default:
return 0;
}
}
}
四、云端对接 #
4.1 数据上传 #
java
package com.example.gateway.cloud;
import org.eclipse.paho.client.mqttv3.*;
import java.util.*;
import java.util.concurrent.*;
public class CloudConnector {
private MqttAsyncClient mqttClient;
private final BlockingQueue<String> uploadQueue = new LinkedBlockingQueue<>();
private final String deviceTopic;
private volatile boolean connected = false;
public CloudConnector(String broker, String deviceId) throws MqttException {
this.deviceTopic = "gateway/" + deviceId;
mqttClient = new MqttAsyncClient(broker, deviceId, null);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
mqttClient.connect(options, null, new MqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
connected = true;
System.out.println("云端连接成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
connected = false;
System.err.println("云端连接失败: " + exception.getMessage());
}
});
}
public void upload(String tag, double value, long timestamp) {
String payload = String.format(
"{\"tag\":\"%s\",\"value\":%.2f,\"timestamp\":%d}",
tag, value, timestamp);
uploadQueue.offer(payload);
}
public void startUpload() {
Executors.newSingleThreadExecutor().submit(() -> {
while (true) {
try {
String payload = uploadQueue.take();
if (connected) {
mqttClient.publish(deviceTopic + "/data",
payload.getBytes(), 1, false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
五、总结 #
工业数据网关项目要点:
- 协议转换:支持多种工业协议
- 数据采集:定时采集与事件触发
- 边缘计算:本地数据处理与聚合
- 断网缓存:保证数据不丢失
- 云端对接:可靠的数据上传机制
至此,Java嵌入式开发文档全部完成!
最后更新:2026-03-27