过滤操作 #

一、filter函数详解 #

1.1 filter语法 #

flux
// 基本语法
filter(fn: (r) => condition)

// 参数说明
// r - 表示每一行记录
// condition - 返回布尔值的条件表达式

1.2 工作原理 #

text
filter工作流程:

输入表流
    │
    ▼
遍历每条记录
    │
    ├── 条件为true → 保留记录
    │
    └── 条件为false → 丢弃记录
            │
            ▼
        输出表流

二、基础过滤 #

2.1 按measurement过滤 #

flux
// 单个measurement
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")

// 多个measurement(OR)
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" or 
        r._measurement == "memory"
    )

// 正则匹配measurement
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement =~ /^cpu/)

2.2 按field过滤 #

flux
// 单个field
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "usage")

// 多个field
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._field == "usage" or 
        r._field == "idle"
    )

// 正则匹配field
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field =~ /usage/)

2.3 按tag过滤 #

flux
// 单个tag值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host == "server01")

// 多个tag值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" or 
        r.host == "server02" or
        r.host == "server03"
    )

// 多个tag组合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" and 
        r.region == "us-west"
    )

三、值过滤 #

3.1 数值比较 #

flux
// 大于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 50)

// 小于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value < 100)

// 等于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value == 75.5)

// 不等于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value != 0)

// 范围
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 50 and r._value < 100)

// 大于等于/小于等于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value >= 50 and r._value <= 100)

3.2 空值处理 #

flux
// 检查值存在
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => exists r._value)

// 检查值不存在
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => not exists r._value)

// 过滤空值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value != null)

3.3 字符串匹配 #

flux
// 精确匹配
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.status == "active")

// 包含字符串
import "strings"
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => strings.containsStr(v: r.status, substr: "active"))

// 前缀匹配
import "strings"
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => strings.hasPrefix(v: r.status, prefix: "err"))

// 后缀匹配
import "strings"
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => strings.hasSuffix(v: r.status, suffix: "ing"))

四、正则表达式过滤 #

4.1 基本正则 #

flux
// 匹配正则
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^server\d+$/)

// 不匹配正则
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host !~ /^server\d+$/)

4.2 正则示例 #

flux
// 匹配以server开头
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^server/)

// 匹配数字结尾
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /\d+$/)

// 匹配特定模式
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^server\d{2}$/)

// 匹配多个模式
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^(server|client)\d+$/)

// 忽略大小写
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.status =~ /(?i)active/)

4.3 复杂正则 #

flux
// 匹配IP地址
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.ip =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/)

// 匹配邮箱
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.email =~ /^[\w.-]+@[\w.-]+\.\w+$/)

// 匹配日期格式
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.date =~ /^\d{4}-\d{2}-\d{2}$/)

五、逻辑组合 #

5.1 AND组合 #

flux
// 多条件AND
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        r.host == "server01" and
        r._value > 50
    )

5.2 OR组合 #

flux
// 多条件OR
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" or
        r.host == "server02" or
        r.host == "server03"
    )

// 使用contains简化OR
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        contains(value: r.host, set: ["server01", "server02", "server03"])
    )

5.3 混合组合 #

flux
// AND和OR混合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        (r.host == "server01" or r.host == "server02") and
        r._value > 50
    )

// 复杂组合
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        (r._measurement == "cpu" and r._value > 80) or
        (r._measurement == "memory" and r._value > 90)
    )

5.4 NOT组合 #

flux
// NOT条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        not (r.host == "server01" or r.host == "server02")
    )

// 排除特定值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r.host != "server01" and
        r.host != "server02"
    )

六、时间过滤 #

6.1 时间范围过滤 #

flux
// 过滤特定时间点之前
from(bucket: "my-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._time < 2024-01-01T12:00:00Z)

// 过滤特定时间点之后
from(bucket: "my-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._time > 2024-01-01T12:00:00Z)

// 时间范围
from(bucket: "my-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => 
        r._time > 2024-01-01T10:00:00Z and
        r._time < 2024-01-01T14:00:00Z
    )

6.2 时间属性过滤 #

flux
import "date"

// 过滤工作时间(9-18点)
from(bucket: "my-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => 
        date.hour(t: r._time) >= 9 and
        date.hour(t: r._time) < 18
    )

// 过滤工作日
from(bucket: "my-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => 
        date.weekDay(t: r._time) >= 1 and
        date.weekDay(t: r._time) <= 5
    )

// 过滤特定月份
from(bucket: "my-bucket")
    |> range(start: -1y)
    |> filter(fn: (r) => date.month(t: r._time) == 1)

七、高级过滤技巧 #

7.1 使用变量 #

flux
// 定义过滤条件变量
measurement_filter = (r) => r._measurement == "cpu"
host_filter = (r) => r.host == "server01"
value_filter = (r) => r._value > 50

// 组合使用
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        measurement_filter(r: r) and
        host_filter(r: r) and
        value_filter(r: r)
    )

7.2 动态过滤 #

flux
// 动态构建过滤条件
hosts = ["server01", "server02", "server03"]

from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => contains(value: r.host, set: hosts))

7.3 条件过滤函数 #

flux
// 定义可复用的过滤函数
filterByMeasurement = (tables=<-, measurement) => {
    return tables
        |> filter(fn: (r) => r._measurement == measurement)
}

filterByHost = (tables=<-, hosts) => {
    return tables
        |> filter(fn: (r) => contains(value: r.host, set: hosts))
}

// 使用自定义函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filterByMeasurement(measurement: "cpu")
    |> filterByHost(hosts: ["server01", "server02"])

7.4 多条件过滤模式 #

flux
// 使用函数封装复杂过滤
complexFilter = (r, measurement, hosts, minValue, maxValue) => {
    return r._measurement == measurement and
           contains(value: r.host, set: hosts) and
           r._value >= minValue and
           r._value <= maxValue
}

from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        complexFilter(
            r: r,
            measurement: "cpu",
            hosts: ["server01", "server02"],
            minValue: 50,
            maxValue: 100
        )
    )

八、性能优化 #

8.1 过滤顺序 #

flux
// 推荐:先过滤索引字段
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")  // 先过滤measurement
    |> filter(fn: (r) => r.host == "server01")     // 再过滤tag
    |> filter(fn: (r) => r._value > 50)            // 最后过滤值

// 不推荐:先过滤值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 50)            // 值过滤不使用索引
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r.host == "server01")

8.2 合并过滤 #

flux
// 推荐:合并为单个filter
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r.host == "server01" and
        r._value > 50
    )

// 不推荐:多个filter链
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r.host == "server01")
    |> filter(fn: (r) => r._value > 50)

8.3 使用contains优化 #

flux
// 使用contains替代多个OR
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        contains(value: r.host, set: ["server01", "server02", "server03"])
    )

// 等价于
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" or
        r.host == "server02" or
        r.host == "server03"
    )

九、实际应用示例 #

9.1 监控告警过滤 #

flux
// 过滤CPU使用率超过阈值的记录
from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        r._value > 80
    )
    |> sort(columns: ["_value"], desc: true)

9.2 多条件业务查询 #

flux
// 查询生产环境特定服务的错误日志
from(bucket: "logs")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "application" and
        r.env == "production" and
        r.service =~ /^api-/ and
        r.level == "error"
    )

9.3 时间段分析 #

flux
import "date"

// 查询工作时间的高负载记录
from(bucket: "metrics")
    |> range(start: -7d)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        date.hour(t: r._time) >= 9 and
        date.hour(t: r._time) < 18 and
        date.weekDay(t: r._time) >= 1 and
        date.weekDay(t: r._time) <= 5 and
        r._value > 70
    )

十、总结 #

过滤操作要点:

  1. 优先过滤索引字段:measurement和tag
  2. 合并过滤条件:减少filter调用
  3. 合理使用正则:简化复杂匹配
  4. 使用contains:优化多值OR查询
  5. 自定义函数:封装可复用过滤逻辑

下一步,让我们学习聚合函数!

最后更新:2026-03-27