聚合函数 #
一、聚合概述 #
1.1 什么是聚合 #
text
聚合操作:
定义
├── 将多条记录合并为一条
├── 计算统计值
└── 减少数据量
常见聚合
├── mean - 平均值
├── sum - 总和
├── count - 计数
├── max - 最大值
├── min - 最小值
└── median - 中位数
1.2 聚合流程 #
text
聚合流程:
输入数据
├── 多条记录
├── 按组键分组
└── 每组独立聚合
│
▼
聚合计算
├── 应用聚合函数
├── 生成统计值
└── 输出单条记录
│
▼
输出结果
└── 每组一条记录
二、基础聚合函数 #
2.1 mean - 平均值 #
flux
// 计算平均值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean(column: "_value")
// 分组后计算平均值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> mean()
2.2 sum - 总和 #
flux
// 计算总和
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "requests")
|> sum()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "requests")
|> sum(column: "_value")
// 分组求和
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "requests")
|> group(columns: ["service"])
|> sum()
2.3 count - 计数 #
flux
// 计数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> count()
// 指定列计数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> count(column: "_value")
// 分组计数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> count()
2.4 max - 最大值 #
flux
// 最大值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> max()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> max(column: "_value")
// 分组最大值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> max()
2.5 min - 最小值 #
flux
// 最小值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> min()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> min(column: "_value")
// 分组最小值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> min()
三、统计聚合函数 #
3.1 median - 中位数 #
flux
// 中位数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> median()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> median(column: "_value")
3.2 stddev - 标准差 #
flux
// 标准差
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> stddev()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> stddev(column: "_value")
3.3 variance - 方差 #
flux
// 方差
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> variance()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> variance(column: "_value")
3.4 spread - 极差 #
flux
// 极差(最大值-最小值)
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> spread()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> spread(column: "_value")
3.5 skew - 偏度 #
flux
// 偏度
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> skew()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> skew(column: "_value")
3.6 kurtosis - 峰度 #
flux
// 峰度
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> kurtosis()
// 指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> kurtosis(column: "_value")
四、百分位数 #
4.1 quantile - 分位数 #
flux
// 中位数(50%分位数)
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> quantile(q: 0.5)
// 90%分位数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> quantile(q: 0.9)
// 95%分位数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> quantile(q: 0.95)
// 99%分位数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> quantile(q: 0.99)
4.2 多分位数 #
flux
// 计算多个分位数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> quantiles(q: [0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
五、时间窗口聚合 #
5.1 aggregateWindow #
flux
// 每5分钟计算平均值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean)
// 每10分钟计算最大值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 10m, fn: max)
// 每小时计算总和
from(bucket: "my-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "requests")
|> aggregateWindow(every: 1h, fn: sum)
// 每5分钟计数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "events")
|> aggregateWindow(every: 5m, fn: count)
5.2 窗口参数 #
flux
// 完整参数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(
every: 5m, // 窗口大小
period: 5m, // 窗口周期
offset: 0s, // 偏移量
fn: mean, // 聚合函数
location: "UTC", // 时区
createEmpty: true // 创建空窗口
)
5.3 窗口对齐 #
flux
// 从整点开始
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean, offset: 0s)
// 从第5分钟开始
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean, offset: 5s)
5.4 空窗口处理 #
flux
// 创建空窗口
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// 不创建空窗口
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
六、自定义聚合 #
6.1 自定义聚合函数 #
flux
// 自定义聚合函数
customAggregate = (tables=<-, column="_value") => {
return tables
|> reduce(
fn: (r, accumulator) => ({
sum: r._value + accumulator.sum,
count: accumulator.count + 1.0
}),
identity: {sum: 0.0, count: 0.0}
)
|> map(fn: (r) => ({ r with _value: r.sum / r.count }))
}
// 使用自定义聚合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> customAggregate()
6.2 reduce函数 #
flux
// 使用reduce计算多个统计值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> reduce(
fn: (r, accumulator) => ({
sum: r._value + accumulator.sum,
count: accumulator.count + 1.0,
min: if r._value < accumulator.min then r._value else accumulator.min,
max: if r._value > accumulator.max then r._value else accumulator.max
}),
identity: {sum: 0.0, count: 0.0, min: float(v: +∞), max: float(v: -∞)}
)
|> map(fn: (r) => ({
_value: r.sum / r.count,
mean: r.sum / r.count,
min: r.min,
max: r.max,
count: r.count
}))
七、分组聚合 #
7.1 分组后聚合 #
flux
// 按主机分组计算平均值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> mean()
// 按多列分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host", "region"])
|> mean()
7.2 多字段聚合 #
flux
// 按字段分组聚合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["_field"])
|> mean()
// 取消分组后聚合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group()
|> mean()
八、聚合结果处理 #
8.1 重命名结果 #
flux
// 重命名聚合结果
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean()
|> rename(columns: {"_value": "avg_value"})
8.2 添加列 #
flux
// 添加计算列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean()
|> map(fn: (r) => ({
r with
avg_value: r._value,
threshold: 80.0,
status: if r._value > 80.0 then "warning" else "normal"
}))
8.3 格式化输出 #
flux
// 格式化输出
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
|> mean()
|> keep(columns: ["host", "_value"])
|> rename(columns: {"_value": "avg_cpu"})
九、实际应用示例 #
9.1 服务器性能统计 #
flux
// 计算每台服务器的CPU平均使用率
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage"
)
|> group(columns: ["host"])
|> mean()
|> rename(columns: {"_value": "avg_cpu_usage"})
|> sort(columns: ["avg_cpu_usage"], desc: true)
9.2 请求统计 #
flux
// 每小时请求数统计
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) =>
r._measurement == "http_requests" and
r._field == "count"
)
|> aggregateWindow(every: 1h, fn: sum)
|> sort(columns: ["_time"])
9.3 响应时间分析 #
flux
// 响应时间百分位数
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "http_requests" and
r._field == "latency"
)
|> group(columns: ["service"])
|> quantiles(q: [0.5, 0.9, 0.95, 0.99])
9.4 错误率计算 #
flux
// 计算错误率
total = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "http_requests" and
r._field == "count"
)
|> sum()
errors = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "http_requests" and
r._field == "error_count"
)
|> sum()
join(tables: {total: total, errors: errors}, on: ["_time"])
|> map(fn: (r) => ({
_time: r._time,
_value: r._value_errors / r._value_total * 100.0
}))
十、性能考虑 #
10.1 聚合优化 #
text
聚合优化建议:
数据量
├── 先过滤再聚合
├── 缩小时间范围
└── 合理分组
窗口大小
├── 窗口越大,数据越少
├── 窗口越小,数据越多
└── 根据需求选择
聚合函数
├── mean/sum/count 快
├── quantile 较慢
└── reduce 最灵活但较慢
10.2 内存使用 #
text
内存使用考虑:
聚合前
├── 数据量大
├── 内存占用高
└── 需要流式处理
聚合后
├── 数据量小
├── 内存占用低
└── 结果易于处理
十一、总结 #
聚合函数要点:
- 基础聚合:mean、sum、count、max、min
- 统计聚合:median、stddev、variance、spread
- 分位数:quantile、quantiles
- 时间窗口:aggregateWindow
- 自定义聚合:reduce
下一步,让我们学习数据转换!
最后更新:2026-03-27