Flux语法基础 #
一、Flux概述 #
1.1 什么是Flux #
Flux是InfluxDB 2.0引入的功能强大的数据脚本和查询语言,专门为处理时序数据而设计。
text
Flux特点:
功能强大
├── 灵活的数据转换
├── 丰富的函数库
├── 支持复杂查询
└── 可编写脚本
易于理解
├── 类似SQL语法
├── 管道操作符
├── 函数式编程
└── 链式调用
可扩展
├── 自定义函数
├── 外部包导入
├── 用户定义函数
└── 社区贡献
1.2 Flux vs InfluxQL #
text
对比:
特性 Flux InfluxQL
─────────────────────────────────────────────────
语法风格 函数式 SQL风格
灵活性 高 中等
复杂查询 支持 有限
数据转换 强大 基础
跨Bucket查询 支持 不支持
学习曲线 较陡 平缓
版本 InfluxDB 2.x+ InfluxDB 1.x
二、基本语法结构 #
2.1 查询结构 #
flux
// 基本查询结构
from(bucket: "my-bucket") // 数据源
|> range(start: -1h) // 时间范围
|> filter(fn: (r) => // 过滤条件
r._measurement == "cpu"
)
2.2 管道操作符 #
text
管道操作符 |>
作用:
├── 将前一个操作的输出
├── 作为下一个操作的输入
└── 实现链式调用
示例:
data |> function1() |> function2() |> function3()
等价于:
function3(function2(function1(data)))
2.3 注释 #
flux
// 单行注释
/*
* 多行注释
* 第二行
*/
# 也可以使用井号注释(不推荐)
三、数据源操作 #
3.1 from函数 #
flux
// 基本用法
from(bucket: "my-bucket")
// 指定组织
from(
bucket: "my-bucket",
org: "my-org"
)
// 使用bucket ID
from(bucketID: "1234567890abcdef")
3.2 时间范围 #
flux
// 相对时间
from(bucket: "my-bucket")
|> range(start: -1h) // 最近1小时
from(bucket: "my-bucket")
|> range(start: -24h, stop: -1h) // 1小时前到24小时前
// 绝对时间
from(bucket: "my-bucket")
|> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-02T00:00:00Z)
// 时间格式
from(bucket: "my-bucket")
|> range(start: 2024-01-01T00:00:00.000000000Z)
3.3 时间表达式 #
flux
// 相对时间表达式
-1h // 1小时前
-30m // 30分钟前
-1d // 1天前
-1w // 1周前
-1mo // 1月前
-1y // 1年前
// 组合时间
-1h30m // 1小时30分钟前
四、过滤操作 #
4.1 基本过滤 #
flux
// 过滤measurement
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
// 过滤field
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "usage")
// 过滤tag
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host == "server01")
4.2 组合条件 #
flux
// AND条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host == "server01"
)
// OR条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" or
r.host == "server02"
)
// 复杂条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
(r.host == "server01" or r.host == "server02") and
r._field == "usage"
)
4.3 过滤函数 #
flux
// 使用exists检查
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => exists r.region)
// 正则匹配
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^server\d+$/)
// 包含检查
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => contains(value: r.host, set: ["server01", "server02"]))
五、数据类型 #
5.1 基本类型 #
flux
// 字符串
name = "InfluxDB"
// 整数
count = 100
// 浮点数
value = 23.5
// 布尔值
enabled = true
// 时间
timestamp = 2024-01-01T00:00:00Z
// 持续时间
duration = 1h30m
5.2 复合类型 #
flux
// 列表
servers = ["server01", "server02", "server03"]
// 字典
config = {
host: "localhost",
port: 8086,
enabled: true
}
// 记录
record = {
name: "cpu",
value: 78.5,
time: 2024-01-01T00:00:00Z
}
5.3 类型转换 #
flux
// 字符串转整数
int(v: "100")
// 字符串转浮点数
float(v: "23.5")
// 整数转字符串
string(v: 100)
// 时间转换
time(v: "2024-01-01T00:00:00Z")
六、函数定义 #
6.1 基本函数 #
flux
// 定义函数
greeting = (name) => {
return "Hello, ${name}!"
}
// 调用函数
greeting(name: "InfluxDB")
// 简写形式
double = (x) => x * 2
double(x: 5) // 返回 10
6.2 带默认参数 #
flux
// 默认参数
greet = (name="World") => {
return "Hello, ${name}!"
}
greet() // "Hello, World!"
greet(name: "Flux") // "Hello, Flux!"
6.3 管道函数 #
flux
// 管道接收函数
addValue = (r, v=<-) => {
return r + v
}
// 使用管道调用
5 |> addValue(v: 10) // 返回 15
// 数据处理函数
multiply = (table=<-, factor) => {
return table
|> map(fn: (r) => ({ r with _value: r._value * factor }))
}
from(bucket: "my-bucket")
|> range(start: -1h)
|> multiply(factor: 2)
七、表流概念 #
7.1 表流结构 #
text
Flux表流:
输入数据流
├── 分组为多个表
├── 每个表有相同的组键
└── 表包含行记录
表结构:
┌────────────────────────────────────────────────┐
│ _time │ _value │ _field │ host │
├────────────────────────────────────────────────┤
│ 2024-01-01T00:00:00Z │ 78.5 │ usage │ srv01 │
│ 2024-01-01T00:01:00Z │ 82.3 │ usage │ srv01 │
└────────────────────────────────────────────────┘
组键(Group Key)
├── _field, host 等标签组合
├── 决定表的分组方式
└── 相同组键的记录在同一表
7.2 分组操作 #
flux
// 按指定列分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> group(columns: ["host"])
// 取消分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> group()
// 按多列分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> group(columns: ["host", "region"])
7.3 查看分组 #
flux
// 查看表的组键
from(bucket: "my-bucket")
|> range(start: -1h)
|> group(columns: ["host"])
|> keys()
八、常用操作符 #
8.1 比较操作符 #
flux
// 等于
r._value == 100
// 不等于
r._value != 100
// 大于
r._value > 100
// 小于
r._value < 100
// 大于等于
r._value >= 100
// 小于等于
r._value <= 100
8.2 逻辑操作符 #
flux
// 逻辑与
r._value > 50 and r._value < 100
// 逻辑或
r.host == "server01" or r.host == "server02"
// 逻辑非
not r._value > 100
8.3 算术操作符 #
flux
// 加法
r._value + 10
// 减法
r._value - 10
// 乘法
r._value * 2
// 除法
r._value / 2
// 取模
r._value % 10
// 幂运算
r._value ^ 2
九、变量和常量 #
9.1 变量定义 #
flux
// 定义变量
bucket_name = "my-bucket"
time_range = -1h
// 使用变量
from(bucket: bucket_name)
|> range(start: time_range)
9.2 常量 #
flux
// 定义常量(不可变)
PI = 3.14159
// 使用常量
circle_area = (radius) => PI * radius ^ 2
十、条件表达式 #
10.1 if-then-else #
flux
// 条件表达式
result = if value > 100 then "high" else "low"
// 在map中使用
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
status: if r._value > 80 then "warning"
else if r._value > 90 then "critical"
else "normal"
}))
10.2 条件函数 #
flux
// 使用函数封装条件
getStatus = (value) => {
if value > 90 then return "critical"
else if value > 80 then return "warning"
else if value > 70 then return "notice"
else return "normal"
}
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({ r with status: getStatus(value: r._value) }))
十一、字符串操作 #
11.1 字符串模板 #
flux
// 字符串插值
name = "InfluxDB"
message = "Hello, ${name}!"
// 多行字符串
query = "
from(bucket: \"my-bucket\")
|> range(start: -1h)
"
11.2 字符串函数 #
flux
// 字符串拼接
"Hello" + " " + "World"
// 字符串长度
strings.strlen(v: "InfluxDB")
// 转大写
strings.toUpper(v: "influxdb")
// 转小写
strings.toLower(v: "INFLUXDB")
// 替换
strings.replace(v: "hello world", t: "world", u: "InfluxDB")
// 分割
strings.split(v: "a,b,c", t: ",")
// 去除空格
strings.trim(v: " hello ")
十二、时间操作 #
12.1 时间函数 #
flux
// 当前时间
now()
// 解析时间
time(v: "2024-01-01T00:00:00Z")
// 格式化时间
import "date"
date.format(t: now(), format: "2006-01-02")
// 时间截断
date.truncate(t: now(), unit: 1h)
// 时间计算
date.add(t: now(), d: 1h)
date.sub(t: now(), d: 1h)
12.2 时间组件 #
flux
import "date"
// 获取年份
date.year(t: now())
// 获取月份
date.month(t: now())
// 获取日期
date.day(t: now())
// 获取小时
date.hour(t: now())
// 获取分钟
date.minute(t: now())
// 获取星期
date.weekDay(t: now())
十三、完整示例 #
13.1 基础查询 #
flux
// 查询CPU使用率
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
r.host == "server01"
)
13.2 数据转换 #
flux
// 计算百分比
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> map(fn: (r) => ({
r with
_value: r._value * 100.0
}))
13.3 自定义函数查询 #
flux
// 定义可复用查询函数
queryCPU = (bucket, host, start=-1h) => {
return from(bucket: bucket)
|> range(start: start)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host == host
)
}
// 调用函数
queryCPU(bucket: "metrics", host: "server01", start: -24h)
十四、总结 #
Flux语法要点:
- 管道操作:使用
|>连接操作 - 函数式:支持自定义函数和链式调用
- 时间优先:range 是必需的操作
- 灵活过滤:支持复杂的过滤条件
- 类型丰富:支持多种数据类型和转换
下一步,让我们学习InfluxDB的数据类型!
最后更新:2026-03-27