连续查询 #

一、连续查询概述 #

1.1 什么是连续查询 #

text
连续查询定义:

概念
├── 预定义的查询
├── 自动定期执行
├── 结果写入目标Bucket
└── 类似数据库视图

作用
├── 数据降采样
├── 数据聚合
├── 减少查询计算
└── 提高查询性能

1.2 连续查询vs任务 #

text
对比:

特性          连续查询(CQ)        任务(Task)
─────────────────────────────────────────────
查询语言      Flux               Flux
灵活性        中等               高
配置方式      Flux脚本           Flux脚本
触发方式      时间驱动           时间驱动
结果存储      写入Bucket         写入Bucket
复杂度        简单               复杂

推荐使用:
├── 简单聚合 → 连续查询
├── 复杂处理 → 任务
└── InfluxDB 2.x → 推荐使用任务

二、使用任务实现连续查询 #

2.1 基本模式 #

flux
// 使用任务实现连续查询
option task = {
    name: "cq-cpu-mean",
    every: 5m
}

from(bucket: "metrics-raw")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-5m", org: "my-org")

2.2 多级降采样 #

flux
// 5分钟降采样
option task = {
    name: "cq-5m",
    every: 5m
}

from(bucket: "metrics-raw")
    |> range(start: -5m)
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-5m")

// 1小时降采样
option task = {
    name: "cq-1h",
    every: 1h
}

from(bucket: "metrics-5m")
    |> range(start: -1h)
    |> aggregateWindow(every: 1h, fn: mean)
    |> to(bucket: "metrics-1h")

// 1天降采样
option task = {
    name: "cq-1d",
    every: 24h
}

from(bucket: "metrics-1h")
    |> range(start: -24h)
    |> aggregateWindow(every: 1d, fn: mean)
    |> to(bucket: "metrics-1d")

三、常见连续查询场景 #

3.1 数据降采样 #

flux
// CPU数据降采样
option task = {
    name: "cq-downsample-cpu",
    every: 5m
}

from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(
        every: 5m,
        fn: mean,
        createEmpty: false
    )
    |> to(bucket: "metrics-downsampled")

3.2 数据聚合 #

flux
// 请求统计聚合
option task = {
    name: "cq-request-stats",
    every: 1h
}

from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "http_requests")
    |> group(columns: ["service", "endpoint"])
    |> aggregateWindow(
        every: 1h,
        fn: sum,
        createEmpty: false
    )
    |> to(bucket: "metrics-hourly")

3.3 数据转换 #

flux
// 温度单位转换
option task = {
    name: "cq-temperature-convert",
    every: 5m
}

from(bucket: "sensors")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> filter(fn: (r) => r.unit == "fahrenheit")
    |> map(fn: (r) => ({
        r with
        _value: (r._value - 32.0) * 5.0 / 9.0,
        unit: "celsius"
    }))
    |> to(bucket: "sensors-metric")

3.4 数据过滤 #

flux
// 过滤异常值
option task = {
    name: "cq-filter-anomalies",
    every: 5m
}

from(bucket: "metrics-raw")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r._value >= 0.0 and r._value <= 100.0)
    |> to(bucket: "metrics-clean")

四、高级连续查询 #

4.1 多Measurement处理 #

flux
// 处理多个measurement
option task = {
    name: "cq-multi-measurement",
    every: 5m
}

measurements = ["cpu", "memory", "disk"]

results = measurements |> map(fn: (m) =>
    from(bucket: "metrics")
        |> range(start: -5m)
        |> filter(fn: (r) => r._measurement == m)
        |> aggregateWindow(every: 5m, fn: mean)
)

union(tables: results)
    |> to(bucket: "metrics-5m")

4.2 条件聚合 #

flux
// 条件聚合
option task = {
    name: "cq-conditional",
    every: 1h
}

from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "http_requests")
    |> group(columns: ["service", "status"])
    |> aggregateWindow(
        every: 1h,
        fn: sum,
        createEmpty: false
    )
    |> map(fn: (r) => ({
        r with
        status_type: if int(v: r.status) < 400 then "success" else "error"
    }))
    |> group(columns: ["service", "status_type"])
    |> sum()
    |> to(bucket: "metrics-hourly")

4.3 数据合并 #

flux
// 合并多个数据源
option task = {
    name: "cq-merge-sources",
    every: 5m
}

cpu = from(bucket: "metrics-cpu")
    |> range(start: -5m)
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "cpu"})

memory = from(bucket: "metrics-memory")
    |> range(start: -5m)
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "memory"})

join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
    |> map(fn: (r) => ({
        _time: r._time,
        host: r.host,
        _measurement: "system",
        _field: "combined_load",
        _value: r.cpu * 0.5 + r.memory * 0.5
    }))
    |> to(bucket: "metrics-combined")

五、连续查询管理 #

5.1 查看连续查询 #

bash
# 查看所有任务(连续查询以任务形式存在)
influx task list

# 查看特定任务
influx task list --name "cq-cpu-mean"

5.2 更新连续查询 #

bash
# 更新任务
influx task update \
    --id TASK_ID \
    --every 10m

# 从文件更新
influx task update --id TASK_ID --file updated-cq.flux

5.3 删除连续查询 #

bash
# 删除任务
influx task delete --id TASK_ID

六、性能优化 #

6.1 执行时间优化 #

flux
// 使用offset避免数据延迟
option task = {
    name: "cq-optimized",
    every: 5m,
    offset: 30s  // 延迟30秒执行
}

from(bucket: "metrics")
    |> range(start: -5m)
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-5m")

6.2 数据量优化 #

flux
// 先过滤再聚合
option task = {
    name: "cq-filter-first",
    every: 5m
}

from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage"
    )
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-5m")

七、最佳实践 #

7.1 设计建议 #

text
连续查询设计建议:

时间粒度
├── 原始数据:秒级
├── 5分钟:实时监控
├── 1小时:日常分析
└── 1天:历史趋势

保留策略
├── 原始数据:7天
├── 5分钟数据:30天
├── 1小时数据:90天
└── 1天数据:365天

聚合函数
├── 平均值:mean
├── 总和:sum
├── 最大值:max
├── 最小值:min
└── 计数:count

7.2 监控建议 #

text
连续查询监控:

执行状态
├── 定期检查任务日志
├── 监控执行时间
└── 检查失败次数

数据质量
├── 验证聚合结果
├── 检查数据完整性
└── 对比源数据和结果

性能指标
├── 执行耗时
├── 数据量变化
└── 资源使用

八、完整示例 #

8.1 监控数据降采样系统 #

flux
// 1分钟降采样
option task = {
    name: "cq-monitor-1m",
    every: 1m,
    offset: 10s
}

from(bucket: "monitoring-raw")
    |> range(start: -1m)
    |> filter(fn: (r) => 
        r._measurement == "cpu" or
        r._measurement == "memory" or
        r._measurement == "disk"
    )
    |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
    |> to(bucket: "monitoring-1m")

// 5分钟降采样
option task = {
    name: "cq-monitor-5m",
    every: 5m,
    offset: 30s
}

from(bucket: "monitoring-1m")
    |> range(start: -5m)
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
    |> to(bucket: "monitoring-5m")

// 1小时降采样
option task = {
    name: "cq-monitor-1h",
    every: 1h,
    offset: 1m
}

from(bucket: "monitoring-5m")
    |> range(start: -1h)
    |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
    |> to(bucket: "monitoring-1h")

九、总结 #

连续查询要点:

  1. 实现方式:使用任务实现连续查询
  2. 主要用途:数据降采样、聚合
  3. 多级降采样:构建数据层次结构
  4. 性能优化:合理设置执行时间和offset
  5. 监控管理:定期检查执行状态和数据质量

下一步,让我们学习用户管理!

最后更新:2026-03-27