环境监测站 #

一、项目概述 #

1.1 项目目标 #

构建一个环境监测站,实现多传感器数据采集、存储、分析和可视化展示。

text
┌─────────────────────────────────────────────────────────┐
│                  环境监测站架构                          │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              Web可视化界面                       │   │
│  └────────────────────────┬────────────────────────┘   │
│                           │ HTTP/WebSocket              │
│  ┌────────────────────────┴────────────────────────┐   │
│  │              环境监测站 (树莓派)                  │   │
│  │  ┌─────────────────────────────────────────┐    │   │
│  │  │           传感器阵列                     │    │   │
│  │  │  温湿度 │ 气压 │ 光照 │ PM2.5 │ CO2    │    │   │
│  │  └─────────────────────────────────────────┘    │   │
│  │                      │                          │   │
│  │  ┌─────────────────────────────────────────┐    │   │
│  │  │           数据处理层                     │    │   │
│  │  │  采集 │ 存储 │ 分析 │ 报警 │ 上传     │    │   │
│  │  └─────────────────────────────────────────┘    │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

1.2 功能模块 #

模块 功能
数据采集 多传感器定时采集
数据存储 本地数据库存储
数据分析 统计分析、趋势预测
报警功能 阈值报警、异常检测
可视化 Web界面实时展示
数据上传 云平台数据同步

二、硬件准备 #

2.1 硬件清单 #

设备 数量 用途
Raspberry Pi 4B 1 主控制器
BME280 1 温湿度气压
BH1750 1 光照强度
PMS5003 1 PM2.5检测
MH-Z19C 1 CO2浓度
LCD1602 1 本地显示
蜂鸣器 1 报警输出

三、软件实现 #

3.1 数据采集模块 #

java
package com.example.envmonitor.sensor;

import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.*;

public class DataCollector {

    private final Map<String, Sensor> sensors = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(2);
    private final DataStorage storage;
    private final AlarmManager alarmManager;
    
    public static class SensorData {
        public final String sensorId;
        public final double value;
        public final String unit;
        public final LocalDateTime timestamp;
        
        public SensorData(String sensorId, double value, String unit) {
            this.sensorId = sensorId;
            this.value = value;
            this.unit = unit;
            this.timestamp = LocalDateTime.now();
        }
    }
    
    public DataCollector(DataStorage storage, AlarmManager alarmManager) {
        this.storage = storage;
        this.alarmManager = alarmManager;
    }
    
    public void addSensor(String id, Sensor sensor) {
        sensors.put(id, sensor);
    }
    
    public void start(int intervalSeconds) {
        scheduler.scheduleAtFixedRate(this::collect, 0, 
            intervalSeconds, TimeUnit.SECONDS);
    }
    
    private void collect() {
        for (Map.Entry<String, Sensor> entry : sensors.entrySet()) {
            try {
                double value = entry.getValue().read();
                SensorData data = new SensorData(
                    entry.getKey(), value, entry.getValue().getUnit());
                
                storage.save(data);
                alarmManager.check(entry.getKey(), value);
                
            } catch (Exception e) {
                System.err.println("传感器读取失败: " + entry.getKey());
                e.printStackTrace();
            }
        }
    }
    
    public void stop() {
        scheduler.shutdown();
    }
}

3.2 数据存储模块 #

java
package com.example.envmonitor.storage;

import java.sql.*;
import java.time.LocalDateTime;
import java.util.*;

public class SQLiteStorage implements DataStorage {

    private final String dbPath;
    private Connection connection;
    
    public SQLiteStorage(String dbPath) throws SQLException {
        this.dbPath = dbPath;
        initialize();
    }
    
    private void initialize() throws SQLException {
        connection = DriverManager.getConnection("jdbc:sqlite:" + dbPath);
        
        try (Statement stmt = connection.createStatement()) {
            stmt.execute("""
                CREATE TABLE IF NOT EXISTS sensor_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    sensor_id TEXT NOT NULL,
                    value REAL NOT NULL,
                    unit TEXT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                )
            """);
            
            stmt.execute("""
                CREATE INDEX IF NOT EXISTS idx_sensor_time 
                ON sensor_data(sensor_id, timestamp)
            """);
        }
    }
    
    @Override
    public void save(DataCollector.SensorData data) throws SQLException {
        String sql = "INSERT INTO sensor_data(sensor_id, value, unit, timestamp) VALUES(?,?,?,?)";
        
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, data.sensorId);
            pstmt.setDouble(2, data.value);
            pstmt.setString(3, data.unit);
            pstmt.setString(4, data.timestamp.toString());
            pstmt.executeUpdate();
        }
    }
    
    @Override
    public List<DataCollector.SensorData> query(String sensorId, 
            LocalDateTime start, LocalDateTime end) throws SQLException {
        
        String sql = "SELECT * FROM sensor_data WHERE sensor_id = ? " +
                     "AND timestamp BETWEEN ? AND ? ORDER BY timestamp";
        
        List<DataCollector.SensorData> results = new ArrayList<>();
        
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, sensorId);
            pstmt.setString(2, start.toString());
            pstmt.setString(3, end.toString());
            
            ResultSet rs = pstmt.executeQuery();
            while (rs.next()) {
                results.add(new DataCollector.SensorData(
                    rs.getString("sensor_id"),
                    rs.getDouble("value"),
                    rs.getString("unit")
                ));
            }
        }
        
        return results;
    }
    
    @Override
    public Map<String, Double> getLatest() throws SQLException {
        Map<String, Double> latest = new HashMap<>();
        
        String sql = "SELECT sensor_id, value FROM sensor_data " +
                     "WHERE id IN (SELECT MAX(id) FROM sensor_data GROUP BY sensor_id)";
        
        try (Statement stmt = connection.createStatement();
             ResultSet rs = stmt.executeQuery(sql)) {
            while (rs.next()) {
                latest.put(rs.getString("sensor_id"), rs.getDouble("value"));
            }
        }
        
        return latest;
    }
    
    public void close() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }
}

3.3 报警管理模块 #

java
package com.example.envmonitor.alarm;

import java.util.*;
import java.util.function.Consumer;

public class AlarmManager {

    private final Map<String, AlarmRule> rules = new ConcurrentHashMap<>();
    private final List<Consumer<AlarmEvent>> listeners = new ArrayList<>();
    
    public static class AlarmRule {
        public final String sensorId;
        public final double minValue;
        public final double maxValue;
        public final String message;
        
        public AlarmRule(String sensorId, double min, double max, String message) {
            this.sensorId = sensorId;
            this.minValue = min;
            this.maxValue = max;
            this.message = message;
        }
    }
    
    public static class AlarmEvent {
        public final String sensorId;
        public final double value;
        public final String message;
        public final LocalDateTime timestamp;
        
        public AlarmEvent(String sensorId, double value, String message) {
            this.sensorId = sensorId;
            this.value = value;
            this.message = message;
            this.timestamp = LocalDateTime.now();
        }
    }
    
    public void addRule(String sensorId, double min, double max, String message) {
        rules.put(sensorId, new AlarmRule(sensorId, min, max, message));
    }
    
    public void addListener(Consumer<AlarmEvent> listener) {
        listeners.add(listener);
    }
    
    public void check(String sensorId, double value) {
        AlarmRule rule = rules.get(sensorId);
        if (rule == null) return;
        
        if (value < rule.minValue || value > rule.maxValue) {
            AlarmEvent event = new AlarmEvent(sensorId, value, rule.message);
            
            for (Consumer<AlarmEvent> listener : listeners) {
                listener.accept(event);
            }
        }
    }
}

3.4 Web服务模块 #

java
package com.example.envmonitor.web;

import com.sun.net.httpserver.*;
import java.io.*;
import java.net.*;
import java.util.*;

public class WebServer {

    private HttpServer server;
    private final DataStorage storage;
    private final int port;
    
    public WebServer(int port, DataStorage storage) {
        this.port = port;
        this.storage = storage;
    }
    
    public void start() throws IOException {
        server = HttpServer.create(new InetSocketAddress(port), 0);
        
        server.createContext("/api/current", this::handleCurrent);
        server.createContext("/api/history", this::handleHistory);
        server.createContext("/", this::handleStatic);
        
        server.setExecutor(Executors.newFixedThreadPool(4));
        server.start();
        
        System.out.println("Web服务器启动,端口: " + port);
    }
    
    private void handleCurrent(HttpExchange exchange) throws IOException {
        try {
            Map<String, Double> data = storage.getLatest();
            
            StringBuilder json = new StringBuilder("{");
            boolean first = true;
            for (Map.Entry<String, Double> entry : data.entrySet()) {
                if (!first) json.append(",");
                json.append("\"").append(entry.getKey()).append("\":")
                    .append(entry.getValue());
                first = false;
            }
            json.append("}");
            
            sendJson(exchange, 200, json.toString());
        } catch (Exception e) {
            sendJson(exchange, 500, "{\"error\":\"" + e.getMessage() + "\"}");
        }
    }
    
    private void handleHistory(HttpExchange exchange) throws IOException {
        Map<String, String> params = parseQuery(exchange.getRequestURI().getQuery());
        
        String sensorId = params.getOrDefault("sensor", "temperature");
        int hours = Integer.parseInt(params.getOrDefault("hours", "24"));
        
        LocalDateTime end = LocalDateTime.now();
        LocalDateTime start = end.minusHours(hours);
        
        try {
            List<DataCollector.SensorData> data = storage.query(sensorId, start, end);
            
            StringBuilder json = new StringBuilder("[");
            boolean first = true;
            for (DataCollector.SensorData d : data) {
                if (!first) json.append(",");
                json.append("{\"time\":\"").append(d.timestamp)
                    .append("\",\"value\":").append(d.value).append("}");
                first = false;
            }
            json.append("]");
            
            sendJson(exchange, 200, json.toString());
        } catch (Exception e) {
            sendJson(exchange, 500, "{\"error\":\"" + e.getMessage() + "\"}");
        }
    }
    
    private void handleStatic(HttpExchange exchange) throws IOException {
        String path = exchange.getRequestURI().getPath();
        if ("/".equals(path)) path = "/index.html";
        
        InputStream is = getClass().getResourceAsStream("/web" + path);
        if (is != null) {
            exchange.getResponseHeaders().set("Content-Type", getContentType(path));
            exchange.sendResponseHeaders(200, 0);
            
            try (OutputStream os = exchange.getResponseBody()) {
                is.transferTo(os);
            }
            is.close();
        } else {
            sendJson(exchange, 404, "{\"error\":\"Not found\"}");
        }
    }
    
    private void sendJson(HttpExchange exchange, int code, String json) throws IOException {
        exchange.getResponseHeaders().set("Content-Type", "application/json");
        byte[] bytes = json.getBytes();
        exchange.sendResponseHeaders(code, bytes.length);
        try (OutputStream os = exchange.getResponseBody()) {
            os.write(bytes);
        }
    }
    
    private Map<String, String> parseQuery(String query) {
        Map<String, String> params = new HashMap<>();
        if (query != null) {
            for (String param : query.split("&")) {
                String[] pair = param.split("=");
                if (pair.length == 2) {
                    params.put(pair[0], pair[1]);
                }
            }
        }
        return params;
    }
    
    private String getContentType(String path) {
        if (path.endsWith(".html")) return "text/html";
        if (path.endsWith(".css")) return "text/css";
        if (path.endsWith(".js")) return "application/javascript";
        return "application/octet-stream";
    }
    
    public void stop() {
        if (server != null) {
            server.stop(0);
        }
    }
}

四、总结 #

环境监测站项目要点:

  1. 多传感器集成:统一管理多种传感器
  2. 数据持久化:SQLite本地存储
  3. 报警机制:阈值检测与通知
  4. Web可视化:HTTP服务提供数据接口
  5. 模块化设计:采集、存储、展示分离

下一章我们将学习工业数据网关项目。

最后更新:2026-03-27