工业数据网关 #

一、项目概述 #

1.1 项目目标 #

构建一个工业数据网关,实现工业协议转换、数据采集、边缘计算和云平台对接。

text
┌─────────────────────────────────────────────────────────┐
│                  工业数据网关架构                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              云平台 / MES系统                     │   │
│  └────────────────────────┬────────────────────────┘   │
│                           │ MQTT/HTTP                   │
│  ┌────────────────────────┴────────────────────────┐   │
│  │              工业数据网关                         │   │
│  │  ┌─────────────────────────────────────────┐    │   │
│  │  │           协议转换层                     │    │   │
│  │  │  Modbus │ OPC-UA │ MQTT │ HTTP        │    │   │
│  │  └─────────────────────────────────────────┘    │   │
│  │                      │                          │   │
│  │  ┌─────────────────────────────────────────┐    │   │
│  │  │           数据处理层                     │    │   │
│  │  │  采集 │ 缓存 │ 计算 │ 报警 │ 上传     │    │   │
│  │  └─────────────────────────────────────────┘    │   │
│  └─────────────────────────────────────────────────┘   │
│                           │                             │
│  ┌────────────────────────┴────────────────────────┐   │
│  │              工业设备                            │   │
│  │  PLC │ 传感器 │ 变频器 │ 仪表                  │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

1.2 功能模块 #

模块 功能
协议转换 Modbus RTU/TCP、OPC-UA转换
数据采集 定时采集、事件触发
边缘计算 数据过滤、聚合、计算
本地存储 断网缓存、历史数据
云端对接 MQTT/HTTP数据上传
远程配置 参数远程配置

二、协议转换 #

2.1 Modbus RTU采集 #

java
package com.example.gateway.protocol;

import java.util.*;
import java.util.concurrent.*;

public class ModbusRTUCollector {

    private final UARTManager uart;
    private final int slaveId;
    private final Map<String, RegisterConfig> registers = new HashMap<>();
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    
    public static class RegisterConfig {
        public final String name;
        public final int address;
        public final int count;
        public final String type;
        public final double scale;
        public final double offset;
        
        public RegisterConfig(String name, int address, int count, 
                             String type, double scale, double offset) {
            this.name = name;
            this.address = address;
            this.count = count;
            this.type = type;
            this.scale = scale;
            this.offset = offset;
        }
    }
    
    public ModbusRTUCollector(String port, int baudRate, int slaveId) {
        this.uart = new UARTManager(port, baudRate);
        this.slaveId = slaveId;
    }
    
    public void addRegister(String name, int address, int count, 
                           String type, double scale, double offset) {
        registers.put(name, new RegisterConfig(name, address, count, type, scale, offset));
    }
    
    public void startPolling(int intervalMs) {
        scheduler.scheduleAtFixedRate(this::poll, 0, 
            intervalMs, TimeUnit.MILLISECONDS);
    }
    
    private void poll() {
        for (RegisterConfig reg : registers.values()) {
            try {
                int[] values = readHoldingRegisters(reg.address, reg.count);
                double processedValue = processValue(values, reg);
                
                notifyListeners(reg.name, processedValue);
                
            } catch (Exception e) {
                System.err.println("读取失败: " + reg.name);
            }
        }
    }
    
    private int[] readHoldingRegisters(int address, int count) throws Exception {
        byte[] request = buildRequest(address, count);
        uart.write(request);
        
        Thread.sleep(50);
        
        byte[] response = uart.read(3 + count * 2 + 2);
        return parseResponse(response);
    }
    
    private byte[] buildRequest(int address, int count) {
        byte[] request = new byte[8];
        request[0] = (byte) slaveId;
        request[1] = 0x03;
        request[2] = (byte) ((address >> 8) & 0xFF);
        request[3] = (byte) (address & 0xFF);
        request[4] = (byte) ((count >> 8) & 0xFF);
        request[5] = (byte) (count & 0xFF);
        
        int crc = calculateCRC(request, 0, 6);
        request[6] = (byte) (crc & 0xFF);
        request[7] = (byte) ((crc >> 8) & 0xFF);
        
        return request;
    }
    
    private double processValue(int[] values, RegisterConfig reg) {
        int rawValue = values[0];
        if (reg.count == 2) {
            rawValue = (values[0] << 16) | values[1];
        }
        
        return rawValue * reg.scale + reg.offset;
    }
    
    private int calculateCRC(byte[] data, int offset, int length) {
        int crc = 0xFFFF;
        for (int i = offset; i < offset + length; i++) {
            crc ^= (data[i] & 0xFF);
            for (int j = 0; j < 8; j++) {
                if ((crc & 0x0001) != 0) {
                    crc = (crc >> 1) ^ 0xA001;
                } else {
                    crc >>= 1;
                }
            }
        }
        return crc;
    }
}

2.2 数据处理引擎 #

java
package com.example.gateway.engine;

import java.util.*;
import java.util.concurrent.*;

public class DataProcessingEngine {

    private final BlockingQueue<DataPoint> queue = new LinkedBlockingQueue<>();
    private final List<DataProcessor> processors = new ArrayList<>();
    private final ExecutorService executor = Executors.newFixedThreadPool(2);
    private volatile boolean running = false;
    
    public static class DataPoint {
        public final String tag;
        public final double value;
        public final long timestamp;
        
        public DataPoint(String tag, double value) {
            this.tag = tag;
            this.value = value;
            this.timestamp = System.currentTimeMillis();
        }
    }
    
    public interface DataProcessor {
        void process(DataPoint point);
    }
    
    public void addProcessor(DataProcessor processor) {
        processors.add(processor);
    }
    
    public void submit(String tag, double value) {
        queue.offer(new DataPoint(tag, value));
    }
    
    public void start() {
        running = true;
        
        executor.submit(() -> {
            while (running) {
                try {
                    DataPoint point = queue.take();
                    
                    for (DataProcessor processor : processors) {
                        processor.process(point);
                    }
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
    
    public void stop() {
        running = false;
        executor.shutdown();
    }
}

三、边缘计算 #

3.1 数据聚合 #

java
package com.example.gateway.edge;

import java.util.*;
import java.util.concurrent.*;

public class DataAggregator {

    private final Map<String, List<Double>> buffers = new ConcurrentHashMap<>();
    private final Map<String, AggregationConfig> configs = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    
    public static class AggregationConfig {
        public final String tag;
        public final int windowSize;
        public final AggregationType type;
        
        public AggregationConfig(String tag, int windowSize, AggregationType type) {
            this.tag = tag;
            this.windowSize = windowSize;
            this.type = type;
        }
    }
    
    public enum AggregationType {
        AVERAGE, MAX, MIN, SUM, COUNT
    }
    
    public void configure(String tag, int windowSize, AggregationType type) {
        configs.put(tag, new AggregationConfig(tag, windowSize, type));
        buffers.put(tag, new CopyOnWriteArrayList<>());
    }
    
    public void addValue(String tag, double value) {
        List<Double> buffer = buffers.get(tag);
        if (buffer != null) {
            buffer.add(value);
        }
    }
    
    public void startAggregation(int intervalSeconds, 
            Consumer<String, Double> callback) {
        
        scheduler.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, List<Double>> entry : buffers.entrySet()) {
                String tag = entry.getKey();
                List<Double> buffer = entry.getValue();
                
                if (buffer.isEmpty()) continue;
                
                AggregationConfig config = configs.get(tag);
                double result = aggregate(buffer, config.type);
                
                callback.accept(tag, result);
                
                buffer.clear();
            }
        }, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
    }
    
    private double aggregate(List<Double> values, AggregationType type) {
        if (values.isEmpty()) return 0;
        
        switch (type) {
            case AVERAGE:
                return values.stream().mapToDouble(d -> d).average().orElse(0);
            case MAX:
                return values.stream().mapToDouble(d -> d).max().orElse(0);
            case MIN:
                return values.stream().mapToDouble(d -> d).min().orElse(0);
            case SUM:
                return values.stream().mapToDouble(d -> d).sum();
            case COUNT:
                return values.size();
            default:
                return 0;
        }
    }
}

四、云端对接 #

4.1 数据上传 #

java
package com.example.gateway.cloud;

import org.eclipse.paho.client.mqttv3.*;
import java.util.*;
import java.util.concurrent.*;

public class CloudConnector {

    private MqttAsyncClient mqttClient;
    private final BlockingQueue<String> uploadQueue = new LinkedBlockingQueue<>();
    private final String deviceTopic;
    private volatile boolean connected = false;
    
    public CloudConnector(String broker, String deviceId) throws MqttException {
        this.deviceTopic = "gateway/" + deviceId;
        
        mqttClient = new MqttAsyncClient(broker, deviceId, null);
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        
        mqttClient.connect(options, null, new MqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                connected = true;
                System.out.println("云端连接成功");
            }
            
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                connected = false;
                System.err.println("云端连接失败: " + exception.getMessage());
            }
        });
    }
    
    public void upload(String tag, double value, long timestamp) {
        String payload = String.format(
            "{\"tag\":\"%s\",\"value\":%.2f,\"timestamp\":%d}",
            tag, value, timestamp);
        
        uploadQueue.offer(payload);
    }
    
    public void startUpload() {
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                try {
                    String payload = uploadQueue.take();
                    
                    if (connected) {
                        mqttClient.publish(deviceTopic + "/data", 
                            payload.getBytes(), 1, false);
                    }
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

五、总结 #

工业数据网关项目要点:

  1. 协议转换:支持多种工业协议
  2. 数据采集:定时采集与事件触发
  3. 边缘计算:本地数据处理与聚合
  4. 断网缓存:保证数据不丢失
  5. 云端对接:可靠的数据上传机制

至此,Java嵌入式开发文档全部完成!

最后更新:2026-03-27