连续聚合 #

一、连续聚合概述 #

1.1 什么是连续聚合 #

连续聚合(Continuous Aggregates)是TimescaleDB特有的功能,它自动维护聚合数据的物化视图。

text
连续聚合原理:

原始数据
├── sensor_data(原始超表)
│   └── 每秒多条数据
│
自动刷新
│
├── 连续聚合视图
│   └── 自动维护
│
聚合数据
├── hourly_stats(小时统计)
│   └── 每小时聚合
├── daily_stats(日统计)
│   └── 每天聚合
└── monthly_stats(月统计)
    └── 每月聚合

1.2 连续聚合优势 #

text
连续聚合优势:

性能提升
├── 预计算聚合结果
├── 查询速度快
├── 减少实时计算
└── 降低系统负载

自动维护
├── 自动刷新数据
├── 增量更新
├── 实时性好
└── 无需手动维护

存储优化
├── 数据压缩
├── 减少存储空间
└── 分层存储

查询简化
├── 简化复杂查询
├── 统一查询接口
└── 提高开发效率

1.3 连续聚合 vs 普通物化视图 #

text
对比普通物化视图:

特性              连续聚合        普通物化视图
─────────────────────────────────────────────
自动刷新          ✓              ✗
增量更新          ✓              ✗
实时查询          ✓              ✗
时序优化          ✓              ✗
压缩支持          ✓              ✗
保留策略          ✓              ✗

二、创建连续聚合 #

2.1 基本创建 #

sql
-- 创建原始表
CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION
);

SELECT create_hypertable('sensor_data', 'time');

-- 创建连续聚合
CREATE MATERIALIZED VIEW hourly_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp,
    COUNT(*) AS reading_count
FROM sensor_data
GROUP BY bucket, sensor_id;

-- 查看连续聚合
SELECT * FROM timescaledb_information.continuous_aggregates;

2.2 创建参数 #

sql
-- 完整参数创建
CREATE MATERIALIZED VIEW daily_stats
WITH (
    timescaledb.continuous,
    timescaledb.materialized_only = false
) AS
SELECT 
    time_bucket('1 day', time) AS day,
    sensor_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp,
    COUNT(*) AS reading_count
FROM sensor_data
GROUP BY day, sensor_id
WITH DATA;

-- 参数说明
-- timescaledb.continuous: 启用连续聚合
-- timescaledb.materialized_only: 是否仅查询物化数据
-- WITH DATA: 立即填充数据
-- WITH NO DATA: 不填充数据

2.3 多级连续聚合 #

sql
-- 小时聚合
CREATE MATERIALIZED VIEW hourly_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    AVG(temperature) AS avg_temp,
    COUNT(*) AS reading_count
FROM sensor_data
GROUP BY bucket, sensor_id;

-- 日聚合(基于小时聚合)
CREATE MATERIALIZED VIEW daily_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 day', bucket) AS day,
    sensor_id,
    AVG(avg_temp) AS avg_temp,
    SUM(reading_count) AS total_readings
FROM hourly_stats
GROUP BY day, sensor_id;

-- 月聚合(基于日聚合)
CREATE MATERIALIZED VIEW monthly_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 month', day) AS month,
    sensor_id,
    AVG(avg_temp) AS avg_temp,
    SUM(total_readings) AS total_readings
FROM daily_stats
GROUP BY month, sensor_id;

三、刷新策略 #

3.1 自动刷新 #

sql
-- 查看刷新策略
SELECT 
    view_name,
    materialization_hypertable_name,
    materialized_only
FROM timescaledb_information.continuous_aggregates;

-- 查看刷新作业
SELECT 
    job_id,
    schedule_interval,
    max_runtime,
    scheduled
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';

3.2 手动刷新 #

sql
-- 手动刷新整个视图
CALL refresh_continuous_aggregate('hourly_stats', NULL, NULL);

-- 刷新特定时间范围
CALL refresh_continuous_aggregate(
    'hourly_stats',
    NOW() - INTERVAL '2 hours',
    NOW()
);

-- 刷新历史数据
CALL refresh_continuous_aggregate(
    'hourly_stats',
    '2024-01-01'::timestamptz,
    '2024-01-31'::timestamptz
);

3.3 刷新策略配置 #

sql
-- 添加自定义刷新策略
SELECT add_continuous_aggregate_policy(
    'hourly_stats',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

-- 修改刷新策略
SELECT alter_job(
    job_id,
    schedule_interval => INTERVAL '30 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
  AND hypertable_name = '_timescaledb_internal._materialized_hypertable_2';

-- 删除刷新策略
SELECT remove_continuous_aggregate_policy('hourly_stats');

四、查询连续聚合 #

4.1 直接查询 #

sql
-- 查询物化视图
SELECT * FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '24 hours'
ORDER BY bucket DESC;

-- 实时查询(默认)
-- 结合物化数据和实时数据
SELECT * FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '1 hour';

-- 仅查询物化数据
SET timescaledb.materialized_only = true;
SELECT * FROM hourly_stats;
SET timescaledb.materialized_only = false;

4.2 实时聚合 #

text
实时聚合原理:

查询时间范围
├── 历史数据
│   └── 使用物化数据
│
├── 最近数据
│   └── 实时计算
│
└── 合并结果
    └── 返回完整数据

4.3 查询优化 #

sql
-- 使用索引
CREATE INDEX idx_hourly_bucket ON hourly_stats (bucket DESC);

-- 使用WHERE条件
SELECT * FROM hourly_stats
WHERE bucket BETWEEN '2024-01-01' AND '2024-01-31'
  AND sensor_id = 1;

-- 使用聚合
SELECT 
    bucket,
    AVG(avg_temp) as overall_avg_temp
FROM hourly_stats
WHERE bucket > NOW() - INTERVAL '7 days'
GROUP BY bucket;

五、管理连续聚合 #

5.1 查看信息 #

sql
-- 查看所有连续聚合
SELECT 
    view_name,
    view_schema,
    materialization_hypertable_name,
    materialized_only
FROM timescaledb_information.continuous_aggregates;

-- 查看详细信息
SELECT * FROM timescaledb_information.continuous_aggregates;

-- 查看物化超表
SELECT 
    hypertable_name,
    num_chunks,
    table_size
FROM timescaledb_information.hypertables
WHERE hypertable_name LIKE '_materialized%';

-- 查看刷新状态
SELECT 
    view_name,
    last_refresh,
    next_refresh
FROM timescaledb_information.continuous_aggregate_stats;

5.2 修改连续聚合 #

sql
-- 修改物化属性
ALTER MATERIALIZED VIEW hourly_stats
SET (timescaledb.materialized_only = true);

-- 重命名
ALTER MATERIALIZED VIEW hourly_stats RENAME TO hourly_temperature_stats;

-- 添加索引
CREATE INDEX idx_hourly_sensor ON hourly_stats (sensor_id, bucket DESC);

5.3 删除连续聚合 #

sql
-- 删除连续聚合
DROP MATERIALIZED VIEW hourly_stats;

-- 级联删除
DROP MATERIALIZED VIEW hourly_stats CASCADE;

-- 删除前先删除刷新策略
SELECT remove_continuous_aggregate_policy('hourly_stats');
DROP MATERIALIZED VIEW hourly_stats;

六、高级特性 #

6.1 带过滤的聚合 #

sql
-- 创建带过滤的连续聚合
CREATE MATERIALIZED VIEW high_temp_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    COUNT(*) FILTER (WHERE temperature > 30) AS high_temp_count,
    AVG(temperature) FILTER (WHERE temperature > 25) AS warm_avg_temp
FROM sensor_data
GROUP BY bucket, sensor_id;

6.2 多表聚合 #

sql
-- 创建传感器信息表
CREATE TABLE sensors (
    sensor_id INTEGER PRIMARY KEY,
    location VARCHAR(100)
);

-- 创建带JOIN的视图(需要特殊处理)
CREATE MATERIALIZED VIEW location_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', sd.time) AS bucket,
    s.location,
    AVG(sd.temperature) AS avg_temp
FROM sensor_data sd
JOIN sensors s ON sd.sensor_id = s.sensor_id
GROUP BY bucket, s.location;

6.3 自定义聚合函数 #

sql
-- 创建自定义聚合函数
CREATE OR REPLACE AGGREGATE temp_stats(DOUBLE PRECISION) (
    SFUNC = array_agg_transfn,
    STYPE = internal,
    FINALFUNC = array_agg_finalfn
);

-- 使用自定义聚合
CREATE MATERIALIZED VIEW custom_stats
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    temp_stats(temperature) AS temp_stats
FROM sensor_data
GROUP BY bucket, sensor_id;

七、压缩与保留 #

7.1 压缩连续聚合 #

sql
-- 启用压缩
ALTER MATERIALIZED VIEW hourly_stats SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'sensor_id',
    timescaledb.compress_orderby = 'bucket DESC'
);

-- 添加压缩策略
SELECT add_compression_policy(
    '_timescaledb_internal._materialized_hypertable_2',
    INTERVAL '30 days'
);

-- 查看压缩状态
SELECT 
    chunk_name,
    is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = '_timescaledb_internal._materialized_hypertable_2';

7.2 保留策略 #

sql
-- 添加保留策略
SELECT add_retention_policy(
    '_timescaledb_internal._materialized_hypertable_2',
    INTERVAL '365 days'
);

-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention'
  AND hypertable_name LIKE '_materialized%';

八、性能优化 #

8.1 刷新优化 #

sql
-- 调整刷新间隔
SELECT alter_job(
    job_id,
    schedule_interval => INTERVAL '30 minutes',
    max_runtime => INTERVAL '10 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';

-- 调整刷新窗口
SELECT remove_continuous_aggregate_policy('hourly_stats');
SELECT add_continuous_aggregate_policy(
    'hourly_stats',
    start_offset => INTERVAL '6 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour'
);

8.2 查询优化 #

sql
-- 创建合适的索引
CREATE INDEX idx_hourly_sensor_bucket ON hourly_stats (sensor_id, bucket DESC);

-- 使用部分索引
CREATE INDEX idx_hourly_recent ON hourly_stats (bucket DESC)
WHERE bucket > NOW() - INTERVAL '30 days';

-- 更新统计信息
ANALYZE hourly_stats;

8.3 存储优化 #

sql
-- 查看存储大小
SELECT 
    view_name,
    pg_size_pretty(total_bytes) as size
FROM timescaledb_information.continuous_aggregates c
JOIN chunk_relation_size(c.materialization_hypertable_name) s ON true;

-- 清理旧数据
SELECT drop_chunks(
    '_timescaledb_internal._materialized_hypertable_2',
    older_than => INTERVAL '365 days'
);

九、监控与调试 #

9.1 监控刷新 #

sql
-- 查看刷新历史
SELECT 
    view_name,
    last_refresh_start,
    last_refresh_finish,
    last_refresh_duration
FROM timescaledb_information.continuous_aggregate_stats;

-- 查看刷新作业状态
SELECT 
    job_id,
    last_run,
    next_run,
    last_run_success,
    total_runs,
    total_failures
FROM timescaledb_information.job_stats
WHERE job_id IN (
    SELECT job_id FROM timescaledb_information.jobs
    WHERE proc_name = 'policy_refresh_continuous_aggregate'
);

9.2 调试问题 #

sql
-- 查看错误日志
SELECT * FROM timescaledb_information.job_errors
WHERE job_id IN (
    SELECT job_id FROM timescaledb_information.jobs
    WHERE proc_name = 'policy_refresh_continuous_aggregate'
);

-- 手动刷新测试
CALL refresh_continuous_aggregate('hourly_stats', NULL, NULL);

-- 检查数据一致性
SELECT 
    (SELECT count(*) FROM sensor_data WHERE time > NOW() - INTERVAL '1 hour') as raw_count,
    (SELECT sum(reading_count) FROM hourly_stats WHERE bucket > NOW() - INTERVAL '1 hour') as agg_count;

十、最佳实践 #

10.1 设计原则 #

text
连续聚合设计原则:

聚合粒度
├── 根据查询需求选择
├── 小时/天/周/月
└── 避免过细粒度

刷新策略
├── 根据数据延迟要求
├── 平衡性能和实时性
└── 避免频繁刷新

多级聚合
├── 从细粒度到粗粒度
├── 减少计算量
└── 提高查询性能

10.2 常见问题 #

sql
-- 问题1:刷新延迟
-- 解决:调整刷新策略
SELECT alter_job(
    job_id,
    schedule_interval => INTERVAL '15 minutes'
) FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate';

-- 问题2:存储过大
-- 解决:添加压缩和保留策略
ALTER MATERIALIZED VIEW hourly_stats SET (timescaledb.compress);
SELECT add_compression_policy(...);
SELECT add_retention_policy(...);

-- 问题3:查询性能差
-- 解决:创建索引
CREATE INDEX idx_hourly_sensor_bucket ON hourly_stats (sensor_id, bucket DESC);
ANALYZE hourly_stats;

十一、总结 #

连续聚合要点:

操作 命令 说明
创建 CREATE MATERIALIZED VIEW 创建连续聚合
刷新 CALL refresh_continuous_aggregate 手动刷新
策略 add_continuous_aggregate_policy 添加刷新策略
查询 SELECT FROM view 查询聚合数据

最佳实践:

  1. 聚合粒度:根据查询需求选择合适的粒度
  2. 刷新策略:平衡性能和实时性
  3. 多级聚合:从细粒度到粗粒度
  4. 压缩保留:添加压缩和保留策略

下一步,让我们学习数据压缩!

最后更新:2026-03-27