环境监测站 #
一、项目概述 #
1.1 项目目标 #
构建一个环境监测站,实现多传感器数据采集、存储、分析和可视化展示。
text
┌─────────────────────────────────────────────────────────┐
│ 环境监测站架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Web可视化界面 │ │
│ └────────────────────────┬────────────────────────┘ │
│ │ HTTP/WebSocket │
│ ┌────────────────────────┴────────────────────────┐ │
│ │ 环境监测站 (树莓派) │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ 传感器阵列 │ │ │
│ │ │ 温湿度 │ 气压 │ 光照 │ PM2.5 │ CO2 │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ 数据处理层 │ │ │
│ │ │ 采集 │ 存储 │ 分析 │ 报警 │ 上传 │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
1.2 功能模块 #
| 模块 | 功能 |
|---|---|
| 数据采集 | 多传感器定时采集 |
| 数据存储 | 本地数据库存储 |
| 数据分析 | 统计分析、趋势预测 |
| 报警功能 | 阈值报警、异常检测 |
| 可视化 | Web界面实时展示 |
| 数据上传 | 云平台数据同步 |
二、硬件准备 #
2.1 硬件清单 #
| 设备 | 数量 | 用途 |
|---|---|---|
| Raspberry Pi 4B | 1 | 主控制器 |
| BME280 | 1 | 温湿度气压 |
| BH1750 | 1 | 光照强度 |
| PMS5003 | 1 | PM2.5检测 |
| MH-Z19C | 1 | CO2浓度 |
| LCD1602 | 1 | 本地显示 |
| 蜂鸣器 | 1 | 报警输出 |
三、软件实现 #
3.1 数据采集模块 #
java
package com.example.envmonitor.sensor;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.*;
public class DataCollector {
private final Map<String, Sensor> sensors = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
private final DataStorage storage;
private final AlarmManager alarmManager;
public static class SensorData {
public final String sensorId;
public final double value;
public final String unit;
public final LocalDateTime timestamp;
public SensorData(String sensorId, double value, String unit) {
this.sensorId = sensorId;
this.value = value;
this.unit = unit;
this.timestamp = LocalDateTime.now();
}
}
public DataCollector(DataStorage storage, AlarmManager alarmManager) {
this.storage = storage;
this.alarmManager = alarmManager;
}
public void addSensor(String id, Sensor sensor) {
sensors.put(id, sensor);
}
public void start(int intervalSeconds) {
scheduler.scheduleAtFixedRate(this::collect, 0,
intervalSeconds, TimeUnit.SECONDS);
}
private void collect() {
for (Map.Entry<String, Sensor> entry : sensors.entrySet()) {
try {
double value = entry.getValue().read();
SensorData data = new SensorData(
entry.getKey(), value, entry.getValue().getUnit());
storage.save(data);
alarmManager.check(entry.getKey(), value);
} catch (Exception e) {
System.err.println("传感器读取失败: " + entry.getKey());
e.printStackTrace();
}
}
}
public void stop() {
scheduler.shutdown();
}
}
3.2 数据存储模块 #
java
package com.example.envmonitor.storage;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.*;
public class SQLiteStorage implements DataStorage {
private final String dbPath;
private Connection connection;
public SQLiteStorage(String dbPath) throws SQLException {
this.dbPath = dbPath;
initialize();
}
private void initialize() throws SQLException {
connection = DriverManager.getConnection("jdbc:sqlite:" + dbPath);
try (Statement stmt = connection.createStatement()) {
stmt.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""");
stmt.execute("""
CREATE INDEX IF NOT EXISTS idx_sensor_time
ON sensor_data(sensor_id, timestamp)
""");
}
}
@Override
public void save(DataCollector.SensorData data) throws SQLException {
String sql = "INSERT INTO sensor_data(sensor_id, value, unit, timestamp) VALUES(?,?,?,?)";
try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
pstmt.setString(1, data.sensorId);
pstmt.setDouble(2, data.value);
pstmt.setString(3, data.unit);
pstmt.setString(4, data.timestamp.toString());
pstmt.executeUpdate();
}
}
@Override
public List<DataCollector.SensorData> query(String sensorId,
LocalDateTime start, LocalDateTime end) throws SQLException {
String sql = "SELECT * FROM sensor_data WHERE sensor_id = ? " +
"AND timestamp BETWEEN ? AND ? ORDER BY timestamp";
List<DataCollector.SensorData> results = new ArrayList<>();
try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
pstmt.setString(1, sensorId);
pstmt.setString(2, start.toString());
pstmt.setString(3, end.toString());
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
results.add(new DataCollector.SensorData(
rs.getString("sensor_id"),
rs.getDouble("value"),
rs.getString("unit")
));
}
}
return results;
}
@Override
public Map<String, Double> getLatest() throws SQLException {
Map<String, Double> latest = new HashMap<>();
String sql = "SELECT sensor_id, value FROM sensor_data " +
"WHERE id IN (SELECT MAX(id) FROM sensor_data GROUP BY sensor_id)";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
latest.put(rs.getString("sensor_id"), rs.getDouble("value"));
}
}
return latest;
}
public void close() throws SQLException {
if (connection != null) {
connection.close();
}
}
}
3.3 报警管理模块 #
java
package com.example.envmonitor.alarm;
import java.util.*;
import java.util.function.Consumer;
public class AlarmManager {
private final Map<String, AlarmRule> rules = new ConcurrentHashMap<>();
private final List<Consumer<AlarmEvent>> listeners = new ArrayList<>();
public static class AlarmRule {
public final String sensorId;
public final double minValue;
public final double maxValue;
public final String message;
public AlarmRule(String sensorId, double min, double max, String message) {
this.sensorId = sensorId;
this.minValue = min;
this.maxValue = max;
this.message = message;
}
}
public static class AlarmEvent {
public final String sensorId;
public final double value;
public final String message;
public final LocalDateTime timestamp;
public AlarmEvent(String sensorId, double value, String message) {
this.sensorId = sensorId;
this.value = value;
this.message = message;
this.timestamp = LocalDateTime.now();
}
}
public void addRule(String sensorId, double min, double max, String message) {
rules.put(sensorId, new AlarmRule(sensorId, min, max, message));
}
public void addListener(Consumer<AlarmEvent> listener) {
listeners.add(listener);
}
public void check(String sensorId, double value) {
AlarmRule rule = rules.get(sensorId);
if (rule == null) return;
if (value < rule.minValue || value > rule.maxValue) {
AlarmEvent event = new AlarmEvent(sensorId, value, rule.message);
for (Consumer<AlarmEvent> listener : listeners) {
listener.accept(event);
}
}
}
}
3.4 Web服务模块 #
java
package com.example.envmonitor.web;
import com.sun.net.httpserver.*;
import java.io.*;
import java.net.*;
import java.util.*;
public class WebServer {
private HttpServer server;
private final DataStorage storage;
private final int port;
public WebServer(int port, DataStorage storage) {
this.port = port;
this.storage = storage;
}
public void start() throws IOException {
server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/api/current", this::handleCurrent);
server.createContext("/api/history", this::handleHistory);
server.createContext("/", this::handleStatic);
server.setExecutor(Executors.newFixedThreadPool(4));
server.start();
System.out.println("Web服务器启动,端口: " + port);
}
private void handleCurrent(HttpExchange exchange) throws IOException {
try {
Map<String, Double> data = storage.getLatest();
StringBuilder json = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, Double> entry : data.entrySet()) {
if (!first) json.append(",");
json.append("\"").append(entry.getKey()).append("\":")
.append(entry.getValue());
first = false;
}
json.append("}");
sendJson(exchange, 200, json.toString());
} catch (Exception e) {
sendJson(exchange, 500, "{\"error\":\"" + e.getMessage() + "\"}");
}
}
private void handleHistory(HttpExchange exchange) throws IOException {
Map<String, String> params = parseQuery(exchange.getRequestURI().getQuery());
String sensorId = params.getOrDefault("sensor", "temperature");
int hours = Integer.parseInt(params.getOrDefault("hours", "24"));
LocalDateTime end = LocalDateTime.now();
LocalDateTime start = end.minusHours(hours);
try {
List<DataCollector.SensorData> data = storage.query(sensorId, start, end);
StringBuilder json = new StringBuilder("[");
boolean first = true;
for (DataCollector.SensorData d : data) {
if (!first) json.append(",");
json.append("{\"time\":\"").append(d.timestamp)
.append("\",\"value\":").append(d.value).append("}");
first = false;
}
json.append("]");
sendJson(exchange, 200, json.toString());
} catch (Exception e) {
sendJson(exchange, 500, "{\"error\":\"" + e.getMessage() + "\"}");
}
}
private void handleStatic(HttpExchange exchange) throws IOException {
String path = exchange.getRequestURI().getPath();
if ("/".equals(path)) path = "/index.html";
InputStream is = getClass().getResourceAsStream("/web" + path);
if (is != null) {
exchange.getResponseHeaders().set("Content-Type", getContentType(path));
exchange.sendResponseHeaders(200, 0);
try (OutputStream os = exchange.getResponseBody()) {
is.transferTo(os);
}
is.close();
} else {
sendJson(exchange, 404, "{\"error\":\"Not found\"}");
}
}
private void sendJson(HttpExchange exchange, int code, String json) throws IOException {
exchange.getResponseHeaders().set("Content-Type", "application/json");
byte[] bytes = json.getBytes();
exchange.sendResponseHeaders(code, bytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(bytes);
}
}
private Map<String, String> parseQuery(String query) {
Map<String, String> params = new HashMap<>();
if (query != null) {
for (String param : query.split("&")) {
String[] pair = param.split("=");
if (pair.length == 2) {
params.put(pair[0], pair[1]);
}
}
}
return params;
}
private String getContentType(String path) {
if (path.endsWith(".html")) return "text/html";
if (path.endsWith(".css")) return "text/css";
if (path.endsWith(".js")) return "application/javascript";
return "application/octet-stream";
}
public void stop() {
if (server != null) {
server.stop(0);
}
}
}
四、总结 #
环境监测站项目要点:
- 多传感器集成:统一管理多种传感器
- 数据持久化:SQLite本地存储
- 报警机制:阈值检测与通知
- Web可视化:HTTP服务提供数据接口
- 模块化设计:采集、存储、展示分离
下一章我们将学习工业数据网关项目。
最后更新:2026-03-27