连续查询 #
一、连续查询概述 #
1.1 什么是连续查询 #
text
连续查询定义:
概念
├── 预定义的查询
├── 自动定期执行
├── 结果写入目标Bucket
└── 类似数据库视图
作用
├── 数据降采样
├── 数据聚合
├── 减少查询计算
└── 提高查询性能
1.2 连续查询vs任务 #
text
对比:
特性 连续查询(CQ) 任务(Task)
─────────────────────────────────────────────
查询语言 Flux Flux
灵活性 中等 高
配置方式 Flux脚本 Flux脚本
触发方式 时间驱动 时间驱动
结果存储 写入Bucket 写入Bucket
复杂度 简单 复杂
推荐使用:
├── 简单聚合 → 连续查询
├── 复杂处理 → 任务
└── InfluxDB 2.x → 推荐使用任务
二、使用任务实现连续查询 #
2.1 基本模式 #
flux
// 使用任务实现连续查询
option task = {
name: "cq-cpu-mean",
every: 5m
}
from(bucket: "metrics-raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-5m", org: "my-org")
2.2 多级降采样 #
flux
// 5分钟降采样
option task = {
name: "cq-5m",
every: 5m
}
from(bucket: "metrics-raw")
|> range(start: -5m)
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-5m")
// 1小时降采样
option task = {
name: "cq-1h",
every: 1h
}
from(bucket: "metrics-5m")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
|> to(bucket: "metrics-1h")
// 1天降采样
option task = {
name: "cq-1d",
every: 24h
}
from(bucket: "metrics-1h")
|> range(start: -24h)
|> aggregateWindow(every: 1d, fn: mean)
|> to(bucket: "metrics-1d")
三、常见连续查询场景 #
3.1 数据降采样 #
flux
// CPU数据降采样
option task = {
name: "cq-downsample-cpu",
every: 5m
}
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(
every: 5m,
fn: mean,
createEmpty: false
)
|> to(bucket: "metrics-downsampled")
3.2 数据聚合 #
flux
// 请求统计聚合
option task = {
name: "cq-request-stats",
every: 1h
}
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> group(columns: ["service", "endpoint"])
|> aggregateWindow(
every: 1h,
fn: sum,
createEmpty: false
)
|> to(bucket: "metrics-hourly")
3.3 数据转换 #
flux
// 温度单位转换
option task = {
name: "cq-temperature-convert",
every: 5m
}
from(bucket: "sensors")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.unit == "fahrenheit")
|> map(fn: (r) => ({
r with
_value: (r._value - 32.0) * 5.0 / 9.0,
unit: "celsius"
}))
|> to(bucket: "sensors-metric")
3.4 数据过滤 #
flux
// 过滤异常值
option task = {
name: "cq-filter-anomalies",
every: 5m
}
from(bucket: "metrics-raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._value >= 0.0 and r._value <= 100.0)
|> to(bucket: "metrics-clean")
四、高级连续查询 #
4.1 多Measurement处理 #
flux
// 处理多个measurement
option task = {
name: "cq-multi-measurement",
every: 5m
}
measurements = ["cpu", "memory", "disk"]
results = measurements |> map(fn: (m) =>
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == m)
|> aggregateWindow(every: 5m, fn: mean)
)
union(tables: results)
|> to(bucket: "metrics-5m")
4.2 条件聚合 #
flux
// 条件聚合
option task = {
name: "cq-conditional",
every: 1h
}
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> group(columns: ["service", "status"])
|> aggregateWindow(
every: 1h,
fn: sum,
createEmpty: false
)
|> map(fn: (r) => ({
r with
status_type: if int(v: r.status) < 400 then "success" else "error"
}))
|> group(columns: ["service", "status_type"])
|> sum()
|> to(bucket: "metrics-hourly")
4.3 数据合并 #
flux
// 合并多个数据源
option task = {
name: "cq-merge-sources",
every: 5m
}
cpu = from(bucket: "metrics-cpu")
|> range(start: -5m)
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "cpu"})
memory = from(bucket: "metrics-memory")
|> range(start: -5m)
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "memory"})
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
|> map(fn: (r) => ({
_time: r._time,
host: r.host,
_measurement: "system",
_field: "combined_load",
_value: r.cpu * 0.5 + r.memory * 0.5
}))
|> to(bucket: "metrics-combined")
五、连续查询管理 #
5.1 查看连续查询 #
bash
# 查看所有任务(连续查询以任务形式存在)
influx task list
# 查看特定任务
influx task list --name "cq-cpu-mean"
5.2 更新连续查询 #
bash
# 更新任务
influx task update \
--id TASK_ID \
--every 10m
# 从文件更新
influx task update --id TASK_ID --file updated-cq.flux
5.3 删除连续查询 #
bash
# 删除任务
influx task delete --id TASK_ID
六、性能优化 #
6.1 执行时间优化 #
flux
// 使用offset避免数据延迟
option task = {
name: "cq-optimized",
every: 5m,
offset: 30s // 延迟30秒执行
}
from(bucket: "metrics")
|> range(start: -5m)
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-5m")
6.2 数据量优化 #
flux
// 先过滤再聚合
option task = {
name: "cq-filter-first",
every: 5m
}
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage"
)
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-5m")
七、最佳实践 #
7.1 设计建议 #
text
连续查询设计建议:
时间粒度
├── 原始数据:秒级
├── 5分钟:实时监控
├── 1小时:日常分析
└── 1天:历史趋势
保留策略
├── 原始数据:7天
├── 5分钟数据:30天
├── 1小时数据:90天
└── 1天数据:365天
聚合函数
├── 平均值:mean
├── 总和:sum
├── 最大值:max
├── 最小值:min
└── 计数:count
7.2 监控建议 #
text
连续查询监控:
执行状态
├── 定期检查任务日志
├── 监控执行时间
└── 检查失败次数
数据质量
├── 验证聚合结果
├── 检查数据完整性
└── 对比源数据和结果
性能指标
├── 执行耗时
├── 数据量变化
└── 资源使用
八、完整示例 #
8.1 监控数据降采样系统 #
flux
// 1分钟降采样
option task = {
name: "cq-monitor-1m",
every: 1m,
offset: 10s
}
from(bucket: "monitoring-raw")
|> range(start: -1m)
|> filter(fn: (r) =>
r._measurement == "cpu" or
r._measurement == "memory" or
r._measurement == "disk"
)
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> to(bucket: "monitoring-1m")
// 5分钟降采样
option task = {
name: "cq-monitor-5m",
every: 5m,
offset: 30s
}
from(bucket: "monitoring-1m")
|> range(start: -5m)
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> to(bucket: "monitoring-5m")
// 1小时降采样
option task = {
name: "cq-monitor-1h",
every: 1h,
offset: 1m
}
from(bucket: "monitoring-5m")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> to(bucket: "monitoring-1h")
九、总结 #
连续查询要点:
- 实现方式:使用任务实现连续查询
- 主要用途:数据降采样、聚合
- 多级降采样:构建数据层次结构
- 性能优化:合理设置执行时间和offset
- 监控管理:定期检查执行状态和数据质量
下一步,让我们学习用户管理!
最后更新:2026-03-27