连续聚合 #
一、连续聚合概述 #
1.1 什么是连续聚合 #
连续聚合(Continuous Aggregates)是TimescaleDB特有的功能,它自动维护聚合数据的物化视图。
text
连续聚合原理:
原始数据
├── sensor_data(原始超表)
│ └── 每秒多条数据
│
自动刷新
│
├── 连续聚合视图
│ └── 自动维护
│
聚合数据
├── hourly_stats(小时统计)
│ └── 每小时聚合
├── daily_stats(日统计)
│ └── 每天聚合
└── monthly_stats(月统计)
└── 每月聚合
1.2 连续聚合优势 #
text
连续聚合优势:
性能提升
├── 预计算聚合结果
├── 查询速度快
├── 减少实时计算
└── 降低系统负载
自动维护
├── 自动刷新数据
├── 增量更新
├── 实时性好
└── 无需手动维护
存储优化
├── 数据压缩
├── 减少存储空间
└── 分层存储
查询简化
├── 简化复杂查询
├── 统一查询接口
└── 提高开发效率
1.3 连续聚合 vs 普通物化视图 #
text
对比普通物化视图:
特性 连续聚合 普通物化视图
─────────────────────────────────────────────
自动刷新 ✓ ✗
增量更新 ✓ ✗
实时查询 ✓ ✗
时序优化 ✓ ✗
压缩支持 ✓ ✗
保留策略 ✓ ✗
二、创建连续聚合 #
2.1 基本创建 #
sql
-- 创建原始表
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
SELECT create_hypertable('sensor_data', 'time');
-- 创建连续聚合
CREATE MATERIALIZED VIEW hourly_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp,
COUNT(*) AS reading_count
FROM sensor_data
GROUP BY bucket, sensor_id;
-- 查看连续聚合
SELECT * FROM timescaledb_information.continuous_aggregates;
2.2 创建参数 #
sql
-- 完整参数创建
CREATE MATERIALIZED VIEW daily_stats
WITH (
timescaledb.continuous,
timescaledb.materialized_only = false
) AS
SELECT
time_bucket('1 day', time) AS day,
sensor_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp,
COUNT(*) AS reading_count
FROM sensor_data
GROUP BY day, sensor_id
WITH DATA;
-- 参数说明
-- timescaledb.continuous: 启用连续聚合
-- timescaledb.materialized_only: 是否仅查询物化数据
-- WITH DATA: 立即填充数据
-- WITH NO DATA: 不填充数据
2.3 多级连续聚合 #
sql
-- 小时聚合
CREATE MATERIALIZED VIEW hourly_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
AVG(temperature) AS avg_temp,
COUNT(*) AS reading_count
FROM sensor_data
GROUP BY bucket, sensor_id;
-- 日聚合(基于小时聚合)
CREATE MATERIALIZED VIEW daily_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', bucket) AS day,
sensor_id,
AVG(avg_temp) AS avg_temp,
SUM(reading_count) AS total_readings
FROM hourly_stats
GROUP BY day, sensor_id;
-- 月聚合(基于日聚合)
CREATE MATERIALIZED VIEW monthly_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 month', day) AS month,
sensor_id,
AVG(avg_temp) AS avg_temp,
SUM(total_readings) AS total_readings
FROM daily_stats
GROUP BY month, sensor_id;
三、刷新策略 #
3.1 自动刷新 #
sql
-- 查看刷新策略
SELECT
view_name,
materialization_hypertable_name,
materialized_only
FROM timescaledb_information.continuous_aggregates;
-- 查看刷新作业
SELECT
job_id,
schedule_interval,
max_runtime,
scheduled
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';
3.2 手动刷新 #
sql
-- 手动刷新整个视图
CALL refresh_continuous_aggregate('hourly_stats', NULL, NULL);
-- 刷新特定时间范围
CALL refresh_continuous_aggregate(
'hourly_stats',
NOW() - INTERVAL '2 hours',
NOW()
);
-- 刷新历史数据
CALL refresh_continuous_aggregate(
'hourly_stats',
'2024-01-01'::timestamptz,
'2024-01-31'::timestamptz
);
3.3 刷新策略配置 #
sql
-- 添加自定义刷新策略
SELECT add_continuous_aggregate_policy(
'hourly_stats',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- 修改刷新策略
SELECT alter_job(
job_id,
schedule_interval => INTERVAL '30 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = '_timescaledb_internal._materialized_hypertable_2';
-- 删除刷新策略
SELECT remove_continuous_aggregate_policy('hourly_stats');
四、查询连续聚合 #
4.1 直接查询 #
sql
-- 查询物化视图
SELECT * FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '24 hours'
ORDER BY bucket DESC;
-- 实时查询(默认)
-- 结合物化数据和实时数据
SELECT * FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '1 hour';
-- 仅查询物化数据
SET timescaledb.materialized_only = true;
SELECT * FROM hourly_stats;
SET timescaledb.materialized_only = false;
4.2 实时聚合 #
text
实时聚合原理:
查询时间范围
├── 历史数据
│ └── 使用物化数据
│
├── 最近数据
│ └── 实时计算
│
└── 合并结果
└── 返回完整数据
4.3 查询优化 #
sql
-- 使用索引
CREATE INDEX idx_hourly_bucket ON hourly_stats (bucket DESC);
-- 使用WHERE条件
SELECT * FROM hourly_stats
WHERE bucket BETWEEN '2024-01-01' AND '2024-01-31'
AND sensor_id = 1;
-- 使用聚合
SELECT
bucket,
AVG(avg_temp) as overall_avg_temp
FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '7 days'
GROUP BY bucket;
五、管理连续聚合 #
5.1 查看信息 #
sql
-- 查看所有连续聚合
SELECT
view_name,
view_schema,
materialization_hypertable_name,
materialized_only
FROM timescaledb_information.continuous_aggregates;
-- 查看详细信息
SELECT * FROM timescaledb_information.continuous_aggregates;
-- 查看物化超表
SELECT
hypertable_name,
num_chunks,
table_size
FROM timescaledb_information.hypertables
WHERE hypertable_name LIKE '_materialized%';
-- 查看刷新状态
SELECT
view_name,
last_refresh,
next_refresh
FROM timescaledb_information.continuous_aggregate_stats;
5.2 修改连续聚合 #
sql
-- 修改物化属性
ALTER MATERIALIZED VIEW hourly_stats
SET (timescaledb.materialized_only = true);
-- 重命名
ALTER MATERIALIZED VIEW hourly_stats RENAME TO hourly_temperature_stats;
-- 添加索引
CREATE INDEX idx_hourly_sensor ON hourly_stats (sensor_id, bucket DESC);
5.3 删除连续聚合 #
sql
-- 删除连续聚合
DROP MATERIALIZED VIEW hourly_stats;
-- 级联删除
DROP MATERIALIZED VIEW hourly_stats CASCADE;
-- 删除前先删除刷新策略
SELECT remove_continuous_aggregate_policy('hourly_stats');
DROP MATERIALIZED VIEW hourly_stats;
六、高级特性 #
6.1 带过滤的聚合 #
sql
-- 创建带过滤的连续聚合
CREATE MATERIALIZED VIEW high_temp_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
COUNT(*) FILTER (WHERE temperature > 30) AS high_temp_count,
AVG(temperature) FILTER (WHERE temperature > 25) AS warm_avg_temp
FROM sensor_data
GROUP BY bucket, sensor_id;
6.2 多表聚合 #
sql
-- 创建传感器信息表
CREATE TABLE sensors (
sensor_id INTEGER PRIMARY KEY,
location VARCHAR(100)
);
-- 创建带JOIN的视图(需要特殊处理)
CREATE MATERIALIZED VIEW location_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', sd.time) AS bucket,
s.location,
AVG(sd.temperature) AS avg_temp
FROM sensor_data sd
JOIN sensors s ON sd.sensor_id = s.sensor_id
GROUP BY bucket, s.location;
6.3 自定义聚合函数 #
sql
-- 创建自定义聚合函数
CREATE OR REPLACE AGGREGATE temp_stats(DOUBLE PRECISION) (
SFUNC = array_agg_transfn,
STYPE = internal,
FINALFUNC = array_agg_finalfn
);
-- 使用自定义聚合
CREATE MATERIALIZED VIEW custom_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
temp_stats(temperature) AS temp_stats
FROM sensor_data
GROUP BY bucket, sensor_id;
七、压缩与保留 #
7.1 压缩连续聚合 #
sql
-- 启用压缩
ALTER MATERIALIZED VIEW hourly_stats SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'bucket DESC'
);
-- 添加压缩策略
SELECT add_compression_policy(
'_timescaledb_internal._materialized_hypertable_2',
INTERVAL '30 days'
);
-- 查看压缩状态
SELECT
chunk_name,
is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = '_timescaledb_internal._materialized_hypertable_2';
7.2 保留策略 #
sql
-- 添加保留策略
SELECT add_retention_policy(
'_timescaledb_internal._materialized_hypertable_2',
INTERVAL '365 days'
);
-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention'
AND hypertable_name LIKE '_materialized%';
八、性能优化 #
8.1 刷新优化 #
sql
-- 调整刷新间隔
SELECT alter_job(
job_id,
schedule_interval => INTERVAL '30 minutes',
max_runtime => INTERVAL '10 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';
-- 调整刷新窗口
SELECT remove_continuous_aggregate_policy('hourly_stats');
SELECT add_continuous_aggregate_policy(
'hourly_stats',
start_offset => INTERVAL '6 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
8.2 查询优化 #
sql
-- 创建合适的索引
CREATE INDEX idx_hourly_sensor_bucket ON hourly_stats (sensor_id, bucket DESC);
-- 使用部分索引
CREATE INDEX idx_hourly_recent ON hourly_stats (bucket DESC)
WHERE bucket > NOW() - INTERVAL '30 days';
-- 更新统计信息
ANALYZE hourly_stats;
8.3 存储优化 #
sql
-- 查看存储大小
SELECT
view_name,
pg_size_pretty(total_bytes) as size
FROM timescaledb_information.continuous_aggregates c
JOIN chunk_relation_size(c.materialization_hypertable_name) s ON true;
-- 清理旧数据
SELECT drop_chunks(
'_timescaledb_internal._materialized_hypertable_2',
older_than => INTERVAL '365 days'
);
九、监控与调试 #
9.1 监控刷新 #
sql
-- 查看刷新历史
SELECT
view_name,
last_refresh_start,
last_refresh_finish,
last_refresh_duration
FROM timescaledb_information.continuous_aggregate_stats;
-- 查看刷新作业状态
SELECT
job_id,
last_run,
next_run,
last_run_success,
total_runs,
total_failures
FROM timescaledb_information.job_stats
WHERE job_id IN (
SELECT job_id FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
);
9.2 调试问题 #
sql
-- 查看错误日志
SELECT * FROM timescaledb_information.job_errors
WHERE job_id IN (
SELECT job_id FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
);
-- 手动刷新测试
CALL refresh_continuous_aggregate('hourly_stats', NULL, NULL);
-- 检查数据一致性
SELECT
(SELECT count(*) FROM sensor_data WHERE time > NOW() - INTERVAL '1 hour') as raw_count,
(SELECT sum(reading_count) FROM hourly_stats WHERE bucket > NOW() - INTERVAL '1 hour') as agg_count;
十、最佳实践 #
10.1 设计原则 #
text
连续聚合设计原则:
聚合粒度
├── 根据查询需求选择
├── 小时/天/周/月
└── 避免过细粒度
刷新策略
├── 根据数据延迟要求
├── 平衡性能和实时性
└── 避免频繁刷新
多级聚合
├── 从细粒度到粗粒度
├── 减少计算量
└── 提高查询性能
10.2 常见问题 #
sql
-- 问题1:刷新延迟
-- 解决:调整刷新策略
SELECT alter_job(
job_id,
schedule_interval => INTERVAL '15 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';
-- 问题2:存储过大
-- 解决:添加压缩和保留策略
ALTER MATERIALIZED VIEW hourly_stats SET (timescaledb.compress);
SELECT add_compression_policy(...);
SELECT add_retention_policy(...);
-- 问题3:查询性能差
-- 解决:创建索引
CREATE INDEX idx_hourly_sensor_bucket ON hourly_stats (sensor_id, bucket DESC);
ANALYZE hourly_stats;
十一、总结 #
连续聚合要点:
| 操作 | 命令 | 说明 |
|---|---|---|
| 创建 | CREATE MATERIALIZED VIEW | 创建连续聚合 |
| 刷新 | CALL refresh_continuous_aggregate | 手动刷新 |
| 策略 | add_continuous_aggregate_policy | 添加刷新策略 |
| 查询 | SELECT FROM view | 查询聚合数据 |
最佳实践:
- 聚合粒度:根据查询需求选择合适的粒度
- 刷新策略:平衡性能和实时性
- 多级聚合:从细粒度到粗粒度
- 压缩保留:添加压缩和保留策略
下一步,让我们学习数据压缩!
最后更新:2026-03-27