聚合函数 #

一、聚合概述 #

1.1 什么是聚合 #

text
聚合操作:

定义
├── 将多条记录合并为一条
├── 计算统计值
└── 减少数据量

常见聚合
├── mean - 平均值
├── sum - 总和
├── count - 计数
├── max - 最大值
├── min - 最小值
└── median - 中位数

1.2 聚合流程 #

text
聚合流程:

输入数据
├── 多条记录
├── 按组键分组
└── 每组独立聚合
        │
        ▼
聚合计算
├── 应用聚合函数
├── 生成统计值
└── 输出单条记录
        │
        ▼
输出结果
└── 每组一条记录

二、基础聚合函数 #

2.1 mean - 平均值 #

flux
// 计算平均值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> mean()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> mean(column: "_value")

// 分组后计算平均值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> mean()

2.2 sum - 总和 #

flux
// 计算总和
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "requests")
    |> sum()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "requests")
    |> sum(column: "_value")

// 分组求和
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "requests")
    |> group(columns: ["service"])
    |> sum()

2.3 count - 计数 #

flux
// 计数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> count()

// 指定列计数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> count(column: "_value")

// 分组计数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> count()

2.4 max - 最大值 #

flux
// 最大值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> max()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> max(column: "_value")

// 分组最大值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> max()

2.5 min - 最小值 #

flux
// 最小值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> min()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> min(column: "_value")

// 分组最小值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> min()

三、统计聚合函数 #

3.1 median - 中位数 #

flux
// 中位数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> median()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> median(column: "_value")

3.2 stddev - 标准差 #

flux
// 标准差
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> stddev()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> stddev(column: "_value")

3.3 variance - 方差 #

flux
// 方差
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> variance()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> variance(column: "_value")

3.4 spread - 极差 #

flux
// 极差(最大值-最小值)
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> spread()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> spread(column: "_value")

3.5 skew - 偏度 #

flux
// 偏度
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> skew()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> skew(column: "_value")

3.6 kurtosis - 峰度 #

flux
// 峰度
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> kurtosis()

// 指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> kurtosis(column: "_value")

四、百分位数 #

4.1 quantile - 分位数 #

flux
// 中位数(50%分位数)
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> quantile(q: 0.5)

// 90%分位数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> quantile(q: 0.9)

// 95%分位数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> quantile(q: 0.95)

// 99%分位数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> quantile(q: 0.99)

4.2 多分位数 #

flux
// 计算多个分位数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> quantiles(q: [0.25, 0.5, 0.75, 0.9, 0.95, 0.99])

五、时间窗口聚合 #

5.1 aggregateWindow #

flux
// 每5分钟计算平均值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean)

// 每10分钟计算最大值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 10m, fn: max)

// 每小时计算总和
from(bucket: "my-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "requests")
    |> aggregateWindow(every: 1h, fn: sum)

// 每5分钟计数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "events")
    |> aggregateWindow(every: 5m, fn: count)

5.2 窗口参数 #

flux
// 完整参数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(
        every: 5m,           // 窗口大小
        period: 5m,          // 窗口周期
        offset: 0s,          // 偏移量
        fn: mean,            // 聚合函数
        location: "UTC",     // 时区
        createEmpty: true    // 创建空窗口
    )

5.3 窗口对齐 #

flux
// 从整点开始
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean, offset: 0s)

// 从第5分钟开始
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean, offset: 5s)

5.4 空窗口处理 #

flux
// 创建空窗口
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: true)

// 不创建空窗口
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

六、自定义聚合 #

6.1 自定义聚合函数 #

flux
// 自定义聚合函数
customAggregate = (tables=<-, column="_value") => {
    return tables
        |> reduce(
            fn: (r, accumulator) => ({
                sum: r._value + accumulator.sum,
                count: accumulator.count + 1.0
            }),
            identity: {sum: 0.0, count: 0.0}
        )
        |> map(fn: (r) => ({ r with _value: r.sum / r.count }))
}

// 使用自定义聚合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> customAggregate()

6.2 reduce函数 #

flux
// 使用reduce计算多个统计值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> reduce(
        fn: (r, accumulator) => ({
            sum: r._value + accumulator.sum,
            count: accumulator.count + 1.0,
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max
        }),
        identity: {sum: 0.0, count: 0.0, min: float(v: +∞), max: float(v: -∞)}
    )
    |> map(fn: (r) => ({
        _value: r.sum / r.count,
        mean: r.sum / r.count,
        min: r.min,
        max: r.max,
        count: r.count
    }))

七、分组聚合 #

7.1 分组后聚合 #

flux
// 按主机分组计算平均值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> mean()

// 按多列分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host", "region"])
    |> mean()

7.2 多字段聚合 #

flux
// 按字段分组聚合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["_field"])
    |> mean()

// 取消分组后聚合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group()
    |> mean()

八、聚合结果处理 #

8.1 重命名结果 #

flux
// 重命名聚合结果
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> mean()
    |> rename(columns: {"_value": "avg_value"})

8.2 添加列 #

flux
// 添加计算列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> mean()
    |> map(fn: (r) => ({ 
        r with 
        avg_value: r._value,
        threshold: 80.0,
        status: if r._value > 80.0 then "warning" else "normal"
    }))

8.3 格式化输出 #

flux
// 格式化输出
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])
    |> mean()
    |> keep(columns: ["host", "_value"])
    |> rename(columns: {"_value": "avg_cpu"})

九、实际应用示例 #

9.1 服务器性能统计 #

flux
// 计算每台服务器的CPU平均使用率
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage"
    )
    |> group(columns: ["host"])
    |> mean()
    |> rename(columns: {"_value": "avg_cpu_usage"})
    |> sort(columns: ["avg_cpu_usage"], desc: true)

9.2 请求统计 #

flux
// 每小时请求数统计
from(bucket: "metrics")
    |> range(start: -24h)
    |> filter(fn: (r) => 
        r._measurement == "http_requests" and
        r._field == "count"
    )
    |> aggregateWindow(every: 1h, fn: sum)
    |> sort(columns: ["_time"])

9.3 响应时间分析 #

flux
// 响应时间百分位数
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "http_requests" and
        r._field == "latency"
    )
    |> group(columns: ["service"])
    |> quantiles(q: [0.5, 0.9, 0.95, 0.99])

9.4 错误率计算 #

flux
// 计算错误率
total = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "http_requests" and
        r._field == "count"
    )
    |> sum()

errors = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "http_requests" and
        r._field == "error_count"
    )
    |> sum()

join(tables: {total: total, errors: errors}, on: ["_time"])
    |> map(fn: (r) => ({
        _time: r._time,
        _value: r._value_errors / r._value_total * 100.0
    }))

十、性能考虑 #

10.1 聚合优化 #

text
聚合优化建议:

数据量
├── 先过滤再聚合
├── 缩小时间范围
└── 合理分组

窗口大小
├── 窗口越大,数据越少
├── 窗口越小,数据越多
└── 根据需求选择

聚合函数
├── mean/sum/count 快
├── quantile 较慢
└── reduce 最灵活但较慢

10.2 内存使用 #

text
内存使用考虑:

聚合前
├── 数据量大
├── 内存占用高
└── 需要流式处理

聚合后
├── 数据量小
├── 内存占用低
└── 结果易于处理

十一、总结 #

聚合函数要点:

  1. 基础聚合:mean、sum、count、max、min
  2. 统计聚合:median、stddev、variance、spread
  3. 分位数:quantile、quantiles
  4. 时间窗口:aggregateWindow
  5. 自定义聚合:reduce

下一步,让我们学习数据转换!

最后更新:2026-03-27