过滤操作 #
一、filter函数详解 #
1.1 filter语法 #
flux
// 基本语法
filter(fn: (r) => condition)
// 参数说明
// r - 表示每一行记录
// condition - 返回布尔值的条件表达式
1.2 工作原理 #
text
filter工作流程:
输入表流
│
▼
遍历每条记录
│
├── 条件为true → 保留记录
│
└── 条件为false → 丢弃记录
│
▼
输出表流
二、基础过滤 #
2.1 按measurement过滤 #
flux
// 单个measurement
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
// 多个measurement(OR)
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" or
r._measurement == "memory"
)
// 正则匹配measurement
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement =~ /^cpu/)
2.2 按field过滤 #
flux
// 单个field
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "usage")
// 多个field
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._field == "usage" or
r._field == "idle"
)
// 正则匹配field
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._field =~ /usage/)
2.3 按tag过滤 #
flux
// 单个tag值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host == "server01")
// 多个tag值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" or
r.host == "server02" or
r.host == "server03"
)
// 多个tag组合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" and
r.region == "us-west"
)
三、值过滤 #
3.1 数值比较 #
flux
// 大于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 50)
// 小于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value < 100)
// 等于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value == 75.5)
// 不等于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value != 0)
// 范围
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 50 and r._value < 100)
// 大于等于/小于等于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value >= 50 and r._value <= 100)
3.2 空值处理 #
flux
// 检查值存在
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => exists r._value)
// 检查值不存在
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => not exists r._value)
// 过滤空值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value != null)
3.3 字符串匹配 #
flux
// 精确匹配
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.status == "active")
// 包含字符串
import "strings"
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => strings.containsStr(v: r.status, substr: "active"))
// 前缀匹配
import "strings"
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => strings.hasPrefix(v: r.status, prefix: "err"))
// 后缀匹配
import "strings"
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => strings.hasSuffix(v: r.status, suffix: "ing"))
四、正则表达式过滤 #
4.1 基本正则 #
flux
// 匹配正则
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^server\d+$/)
// 不匹配正则
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host !~ /^server\d+$/)
4.2 正则示例 #
flux
// 匹配以server开头
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^server/)
// 匹配数字结尾
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /\d+$/)
// 匹配特定模式
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^server\d{2}$/)
// 匹配多个模式
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^(server|client)\d+$/)
// 忽略大小写
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.status =~ /(?i)active/)
4.3 复杂正则 #
flux
// 匹配IP地址
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.ip =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/)
// 匹配邮箱
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.email =~ /^[\w.-]+@[\w.-]+\.\w+$/)
// 匹配日期格式
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.date =~ /^\d{4}-\d{2}-\d{2}$/)
五、逻辑组合 #
5.1 AND组合 #
flux
// 多条件AND
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
r.host == "server01" and
r._value > 50
)
5.2 OR组合 #
flux
// 多条件OR
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" or
r.host == "server02" or
r.host == "server03"
)
// 使用contains简化OR
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
contains(value: r.host, set: ["server01", "server02", "server03"])
)
5.3 混合组合 #
flux
// AND和OR混合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
(r.host == "server01" or r.host == "server02") and
r._value > 50
)
// 复杂组合
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
(r._measurement == "cpu" and r._value > 80) or
(r._measurement == "memory" and r._value > 90)
)
5.4 NOT组合 #
flux
// NOT条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
not (r.host == "server01" or r.host == "server02")
)
// 排除特定值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host != "server01" and
r.host != "server02"
)
六、时间过滤 #
6.1 时间范围过滤 #
flux
// 过滤特定时间点之前
from(bucket: "my-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._time < 2024-01-01T12:00:00Z)
// 过滤特定时间点之后
from(bucket: "my-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._time > 2024-01-01T12:00:00Z)
// 时间范围
from(bucket: "my-bucket")
|> range(start: -24h)
|> filter(fn: (r) =>
r._time > 2024-01-01T10:00:00Z and
r._time < 2024-01-01T14:00:00Z
)
6.2 时间属性过滤 #
flux
import "date"
// 过滤工作时间(9-18点)
from(bucket: "my-bucket")
|> range(start: -7d)
|> filter(fn: (r) =>
date.hour(t: r._time) >= 9 and
date.hour(t: r._time) < 18
)
// 过滤工作日
from(bucket: "my-bucket")
|> range(start: -30d)
|> filter(fn: (r) =>
date.weekDay(t: r._time) >= 1 and
date.weekDay(t: r._time) <= 5
)
// 过滤特定月份
from(bucket: "my-bucket")
|> range(start: -1y)
|> filter(fn: (r) => date.month(t: r._time) == 1)
七、高级过滤技巧 #
7.1 使用变量 #
flux
// 定义过滤条件变量
measurement_filter = (r) => r._measurement == "cpu"
host_filter = (r) => r.host == "server01"
value_filter = (r) => r._value > 50
// 组合使用
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
measurement_filter(r: r) and
host_filter(r: r) and
value_filter(r: r)
)
7.2 动态过滤 #
flux
// 动态构建过滤条件
hosts = ["server01", "server02", "server03"]
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => contains(value: r.host, set: hosts))
7.3 条件过滤函数 #
flux
// 定义可复用的过滤函数
filterByMeasurement = (tables=<-, measurement) => {
return tables
|> filter(fn: (r) => r._measurement == measurement)
}
filterByHost = (tables=<-, hosts) => {
return tables
|> filter(fn: (r) => contains(value: r.host, set: hosts))
}
// 使用自定义函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filterByMeasurement(measurement: "cpu")
|> filterByHost(hosts: ["server01", "server02"])
7.4 多条件过滤模式 #
flux
// 使用函数封装复杂过滤
complexFilter = (r, measurement, hosts, minValue, maxValue) => {
return r._measurement == measurement and
contains(value: r.host, set: hosts) and
r._value >= minValue and
r._value <= maxValue
}
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
complexFilter(
r: r,
measurement: "cpu",
hosts: ["server01", "server02"],
minValue: 50,
maxValue: 100
)
)
八、性能优化 #
8.1 过滤顺序 #
flux
// 推荐:先过滤索引字段
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu") // 先过滤measurement
|> filter(fn: (r) => r.host == "server01") // 再过滤tag
|> filter(fn: (r) => r._value > 50) // 最后过滤值
// 不推荐:先过滤值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 50) // 值过滤不使用索引
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "server01")
8.2 合并过滤 #
flux
// 推荐:合并为单个filter
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host == "server01" and
r._value > 50
)
// 不推荐:多个filter链
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "server01")
|> filter(fn: (r) => r._value > 50)
8.3 使用contains优化 #
flux
// 使用contains替代多个OR
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
contains(value: r.host, set: ["server01", "server02", "server03"])
)
// 等价于
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" or
r.host == "server02" or
r.host == "server03"
)
九、实际应用示例 #
9.1 监控告警过滤 #
flux
// 过滤CPU使用率超过阈值的记录
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
r._value > 80
)
|> sort(columns: ["_value"], desc: true)
9.2 多条件业务查询 #
flux
// 查询生产环境特定服务的错误日志
from(bucket: "logs")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "application" and
r.env == "production" and
r.service =~ /^api-/ and
r.level == "error"
)
9.3 时间段分析 #
flux
import "date"
// 查询工作时间的高负载记录
from(bucket: "metrics")
|> range(start: -7d)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
date.hour(t: r._time) >= 9 and
date.hour(t: r._time) < 18 and
date.weekDay(t: r._time) >= 1 and
date.weekDay(t: r._time) <= 5 and
r._value > 70
)
十、总结 #
过滤操作要点:
- 优先过滤索引字段:measurement和tag
- 合并过滤条件:减少filter调用
- 合理使用正则:简化复杂匹配
- 使用contains:优化多值OR查询
- 自定义函数:封装可复用过滤逻辑
下一步,让我们学习聚合函数!
最后更新:2026-03-27