Flux语法基础 #

一、Flux概述 #

1.1 什么是Flux #

Flux是InfluxDB 2.0引入的功能强大的数据脚本和查询语言,专门为处理时序数据而设计。

text
Flux特点:

功能强大
├── 灵活的数据转换
├── 丰富的函数库
├── 支持复杂查询
└── 可编写脚本

易于理解
├── 类似SQL语法
├── 管道操作符
├── 函数式编程
└── 链式调用

可扩展
├── 自定义函数
├── 外部包导入
├── 用户定义函数
└── 社区贡献

1.2 Flux vs InfluxQL #

text
对比:

特性          Flux                    InfluxQL
─────────────────────────────────────────────────
语法风格      函数式                  SQL风格
灵活性        高                      中等
复杂查询      支持                    有限
数据转换      强大                    基础
跨Bucket查询  支持                    不支持
学习曲线      较陡                    平缓
版本          InfluxDB 2.x+           InfluxDB 1.x

二、基本语法结构 #

2.1 查询结构 #

flux
// 基本查询结构
from(bucket: "my-bucket")        // 数据源
    |> range(start: -1h)         // 时间范围
    |> filter(fn: (r) =>         // 过滤条件
        r._measurement == "cpu"
    )

2.2 管道操作符 #

text
管道操作符 |>

作用:
├── 将前一个操作的输出
├── 作为下一个操作的输入
└── 实现链式调用

示例:
data |> function1() |> function2() |> function3()

等价于:
function3(function2(function1(data)))

2.3 注释 #

flux
// 单行注释

/*
 * 多行注释
 * 第二行
 */

# 也可以使用井号注释(不推荐)

三、数据源操作 #

3.1 from函数 #

flux
// 基本用法
from(bucket: "my-bucket")

// 指定组织
from(
    bucket: "my-bucket",
    org: "my-org"
)

// 使用bucket ID
from(bucketID: "1234567890abcdef")

3.2 时间范围 #

flux
// 相对时间
from(bucket: "my-bucket")
    |> range(start: -1h)              // 最近1小时

from(bucket: "my-bucket")
    |> range(start: -24h, stop: -1h)  // 1小时前到24小时前

// 绝对时间
from(bucket: "my-bucket")
    |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-02T00:00:00Z)

// 时间格式
from(bucket: "my-bucket")
    |> range(start: 2024-01-01T00:00:00.000000000Z)

3.3 时间表达式 #

flux
// 相对时间表达式
-1h      // 1小时前
-30m     // 30分钟前
-1d      // 1天前
-1w      // 1周前
-1mo     // 1月前
-1y      // 1年前

// 组合时间
-1h30m   // 1小时30分钟前

四、过滤操作 #

4.1 基本过滤 #

flux
// 过滤measurement
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")

// 过滤field
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "usage")

// 过滤tag
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host == "server01")

4.2 组合条件 #

flux
// AND条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and 
        r.host == "server01"
    )

// OR条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" or 
        r.host == "server02"
    )

// 复杂条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        (r.host == "server01" or r.host == "server02") and
        r._field == "usage"
    )

4.3 过滤函数 #

flux
// 使用exists检查
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => exists r.region)

// 正则匹配
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^server\d+$/)

// 包含检查
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => contains(value: r.host, set: ["server01", "server02"]))

五、数据类型 #

5.1 基本类型 #

flux
// 字符串
name = "InfluxDB"

// 整数
count = 100

// 浮点数
value = 23.5

// 布尔值
enabled = true

// 时间
timestamp = 2024-01-01T00:00:00Z

// 持续时间
duration = 1h30m

5.2 复合类型 #

flux
// 列表
servers = ["server01", "server02", "server03"]

// 字典
config = {
    host: "localhost",
    port: 8086,
    enabled: true
}

// 记录
record = {
    name: "cpu",
    value: 78.5,
    time: 2024-01-01T00:00:00Z
}

5.3 类型转换 #

flux
// 字符串转整数
int(v: "100")

// 字符串转浮点数
float(v: "23.5")

// 整数转字符串
string(v: 100)

// 时间转换
time(v: "2024-01-01T00:00:00Z")

六、函数定义 #

6.1 基本函数 #

flux
// 定义函数
greeting = (name) => {
    return "Hello, ${name}!"
}

// 调用函数
greeting(name: "InfluxDB")

// 简写形式
double = (x) => x * 2
double(x: 5)  // 返回 10

6.2 带默认参数 #

flux
// 默认参数
greet = (name="World") => {
    return "Hello, ${name}!"
}

greet()              // "Hello, World!"
greet(name: "Flux")  // "Hello, Flux!"

6.3 管道函数 #

flux
// 管道接收函数
addValue = (r, v=<-) => {
    return r + v
}

// 使用管道调用
5 |> addValue(v: 10)  // 返回 15

// 数据处理函数
multiply = (table=<-, factor) => {
    return table
        |> map(fn: (r) => ({ r with _value: r._value * factor }))
}

from(bucket: "my-bucket")
    |> range(start: -1h)
    |> multiply(factor: 2)

七、表流概念 #

7.1 表流结构 #

text
Flux表流:

输入数据流
├── 分组为多个表
├── 每个表有相同的组键
└── 表包含行记录

表结构:
┌────────────────────────────────────────────────┐
│ _time                │ _value │ _field │ host  │
├────────────────────────────────────────────────┤
│ 2024-01-01T00:00:00Z │ 78.5   │ usage  │ srv01 │
│ 2024-01-01T00:01:00Z │ 82.3   │ usage  │ srv01 │
└────────────────────────────────────────────────┘

组键(Group Key)
├── _field, host 等标签组合
├── 决定表的分组方式
└── 相同组键的记录在同一表

7.2 分组操作 #

flux
// 按指定列分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> group(columns: ["host"])

// 取消分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> group()

// 按多列分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> group(columns: ["host", "region"])

7.3 查看分组 #

flux
// 查看表的组键
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> group(columns: ["host"])
    |> keys()

八、常用操作符 #

8.1 比较操作符 #

flux
// 等于
r._value == 100

// 不等于
r._value != 100

// 大于
r._value > 100

// 小于
r._value < 100

// 大于等于
r._value >= 100

// 小于等于
r._value <= 100

8.2 逻辑操作符 #

flux
// 逻辑与
r._value > 50 and r._value < 100

// 逻辑或
r.host == "server01" or r.host == "server02"

// 逻辑非
not r._value > 100

8.3 算术操作符 #

flux
// 加法
r._value + 10

// 减法
r._value - 10

// 乘法
r._value * 2

// 除法
r._value / 2

// 取模
r._value % 10

// 幂运算
r._value ^ 2

九、变量和常量 #

9.1 变量定义 #

flux
// 定义变量
bucket_name = "my-bucket"
time_range = -1h

// 使用变量
from(bucket: bucket_name)
    |> range(start: time_range)

9.2 常量 #

flux
// 定义常量(不可变)
PI = 3.14159

// 使用常量
circle_area = (radius) => PI * radius ^ 2

十、条件表达式 #

10.1 if-then-else #

flux
// 条件表达式
result = if value > 100 then "high" else "low"

// 在map中使用
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        status: if r._value > 80 then "warning" 
                else if r._value > 90 then "critical"
                else "normal"
    }))

10.2 条件函数 #

flux
// 使用函数封装条件
getStatus = (value) => {
    if value > 90 then return "critical"
    else if value > 80 then return "warning"
    else if value > 70 then return "notice"
    else return "normal"
}

from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({ r with status: getStatus(value: r._value) }))

十一、字符串操作 #

11.1 字符串模板 #

flux
// 字符串插值
name = "InfluxDB"
message = "Hello, ${name}!"

// 多行字符串
query = "
    from(bucket: \"my-bucket\")
    |> range(start: -1h)
"

11.2 字符串函数 #

flux
// 字符串拼接
"Hello" + " " + "World"

// 字符串长度
strings.strlen(v: "InfluxDB")

// 转大写
strings.toUpper(v: "influxdb")

// 转小写
strings.toLower(v: "INFLUXDB")

// 替换
strings.replace(v: "hello world", t: "world", u: "InfluxDB")

// 分割
strings.split(v: "a,b,c", t: ",")

// 去除空格
strings.trim(v: "  hello  ")

十二、时间操作 #

12.1 时间函数 #

flux
// 当前时间
now()

// 解析时间
time(v: "2024-01-01T00:00:00Z")

// 格式化时间
import "date"
date.format(t: now(), format: "2006-01-02")

// 时间截断
date.truncate(t: now(), unit: 1h)

// 时间计算
date.add(t: now(), d: 1h)
date.sub(t: now(), d: 1h)

12.2 时间组件 #

flux
import "date"

// 获取年份
date.year(t: now())

// 获取月份
date.month(t: now())

// 获取日期
date.day(t: now())

// 获取小时
date.hour(t: now())

// 获取分钟
date.minute(t: now())

// 获取星期
date.weekDay(t: now())

十三、完整示例 #

13.1 基础查询 #

flux
// 查询CPU使用率
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        r.host == "server01"
    )

13.2 数据转换 #

flux
// 计算百分比
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory")
    |> map(fn: (r) => ({ 
        r with 
        _value: r._value * 100.0 
    }))

13.3 自定义函数查询 #

flux
// 定义可复用查询函数
queryCPU = (bucket, host, start=-1h) => {
    return from(bucket: bucket)
        |> range(start: start)
        |> filter(fn: (r) => 
            r._measurement == "cpu" and
            r.host == host
        )
}

// 调用函数
queryCPU(bucket: "metrics", host: "server01", start: -24h)

十四、总结 #

Flux语法要点:

  1. 管道操作:使用 |> 连接操作
  2. 函数式:支持自定义函数和链式调用
  3. 时间优先:range 是必需的操作
  4. 灵活过滤:支持复杂的过滤条件
  5. 类型丰富:支持多种数据类型和转换

下一步,让我们学习InfluxDB的数据类型!

最后更新:2026-03-27