Flux函数库 #
一、函数库概述 #
1.1 内置包 #
text
Flux内置包:
核心包
├── universe - 基础函数
├── influxdata/influxdb - InfluxDB操作
└── builtin - 所有内置函数
常用包
├── date - 时间操作
├── strings - 字符串操作
├── math - 数学运算
├── json - JSON处理
└── csv - CSV处理
1.2 导入包 #
flux
// 导入包
import "date"
import "strings"
import "math"
// 使用导入的函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
hour: date.hour(t: r._time),
upper: strings.toUpper(v: r.host)
}))
二、时间函数库 #
2.1 date包 #
flux
import "date"
// 获取时间组件
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
year: date.year(t: r._time),
month: date.month(t: r._time),
day: date.day(t: r._time),
hour: date.hour(t: r._time),
minute: date.minute(t: r._time),
weekday: date.weekDay(t: r._time)
}))
// 时间格式化
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
}))
// 时间计算
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
next_hour: date.add(t: r._time, d: 1h),
prev_hour: date.sub(t: r._time, d: 1h),
hour_start: date.truncate(t: r._time, unit: 1h)
}))
2.2 常用时间函数 #
flux
import "date"
// date.hour - 获取小时
date.hour(t: now())
// date.minute - 获取分钟
date.minute(t: now())
// date.day - 获取日期
date.day(t: now())
// date.weekDay - 获取星期(0-6)
date.weekDay(t: now())
// date.truncate - 截断时间
date.truncate(t: now(), unit: 1h)
// date.add - 时间加法
date.add(t: now(), d: 1h)
// date.sub - 时间减法
date.sub(t: now(), d: 1h)
三、字符串函数库 #
3.1 strings包 #
flux
import "strings"
// 大小写转换
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
upper: strings.toUpper(v: r.status),
lower: strings.toLower(v: r.status),
title: strings.title(v: r.status)
}))
// 字符串操作
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
length: strings.strlen(v: r.message),
trimmed: strings.trim(v: r.message),
replaced: strings.replace(v: r.message, t: "error", u: "ERROR")
}))
// 字符串检查
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
strings.containsStr(v: r.message, substr: "error") and
strings.hasPrefix(v: r.message, prefix: "ERR")
)
3.2 常用字符串函数 #
flux
import "strings"
// strings.toUpper - 转大写
strings.toUpper(v: "hello")
// strings.toLower - 转小写
strings.toLower(v: "HELLO")
// strings.trim - 去空格
strings.trim(v: " hello ")
// strings.replace - 替换
strings.replace(v: "hello world", t: "world", u: "InfluxDB")
// strings.split - 分割
strings.split(v: "a,b,c", t: ",")
// strings.join - 连接
strings.join(arr: ["a", "b", "c"], v: ",")
// strings.containsStr - 包含检查
strings.containsStr(v: "hello world", substr: "world")
// strings.hasPrefix - 前缀检查
strings.hasPrefix(v: "hello", prefix: "he")
// strings.hasSuffix - 后缀检查
strings.hasSuffix(v: "hello", suffix: "lo")
四、数学函数库 #
4.1 math包 #
flux
import "math"
// 数学运算
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
abs: math.abs(x: r._value),
sqrt: math.sqrt(x: r._value),
pow: math.pow(x: r._value, n: 2.0),
log: math.log(x: r._value),
log10: math.log10(x: r._value)
}))
// 取整函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> map(fn: (r) => ({
r with
floor: math.floor(x: r._value),
ceil: math.ceil(x: r._value),
round: math.round(x: r._value)
}))
4.2 常用数学函数 #
flux
import "math"
// math.abs - 绝对值
math.abs(x: -10.5)
// math.sqrt - 平方根
math.sqrt(x: 16.0)
// math.pow - 幂运算
math.pow(x: 2.0, n: 3.0)
// math.floor - 向下取整
math.floor(x: 3.7)
// math.ceil - 向上取整
math.ceil(x: 3.2)
// math.round - 四舍五入
math.round(x: 3.5)
// math.log - 自然对数
math.log(x: 10.0)
// math.log10 - 常用对数
math.log10(x: 100.0)
// math.sin/cos/tan - 三角函数
math.sin(x: 1.0)
math.cos(x: 1.0)
math.tan(x: 1.0)
五、JSON函数库 #
5.1 json包 #
flux
import "json"
// 编码JSON
data = {
name: "InfluxDB",
version: "2.7",
features: ["time-series", "flux"]
}
json.encode(v: data)
// 解码JSON
jsonStr = "{\"name\":\"InfluxDB\",\"version\":\"2.7\"}"
json.decode(v: jsonStr)
5.2 JSON应用示例 #
flux
import "json"
import "http"
// 发送JSON数据
alertData = {
measurement: "cpu",
value: 95.5,
host: "server01",
timestamp: now()
}
http.post(
url: "https://api.example.com/alert",
headers: {"Content-Type": "application/json"},
data: json.encode(v: alertData)
)
六、HTTP函数库 #
6.1 http包 #
flux
import "http"
// GET请求
response = http.get(url: "https://api.example.com/data")
// POST请求
response = http.post(
url: "https://api.example.com/data",
headers: {"Content-Type": "application/json"},
data: "{\"key\":\"value\"}"
)
// 处理响应
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 80)
|> map(fn: (r) => {
response = http.post(
url: "https://hooks.example.com/alert",
headers: {"Content-Type": "application/json"},
data: "{\"host\":\"${r.host}\",\"value\":${string(v: r._value)}}"
)
return r
})
七、InfluxDB函数库 #
7.1 influxdb包 #
flux
import "influxdata/influxdb"
// 列出所有Bucket
influxdb.buckets()
// 列出所有组织
influxdb.orgs()
// 获取Cardinality
influxdb.cardinality(
bucket: "my-bucket",
start: -30d
)
// 获取Series数量
influxdb.series(
bucket: "my-bucket",
start: -30d
)
7.2 v1兼容包 #
flux
import "influxdata/influxdb/v1"
// 使用InfluxQL语法查询
v1.query(
bucket: "my-bucket",
query: "SELECT mean(\"value\") FROM \"cpu\" WHERE time > now() - 1h GROUP BY time(5m)"
)
八、自定义函数 #
8.1 基本自定义函数 #
flux
// 定义简单函数
double = (x) => x * 2
// 使用函数
double(x: 5) // 返回 10
// 带默认参数
greet = (name="World") => "Hello, ${name}!"
greet() // "Hello, World!"
greet(name: "Flux") // "Hello, Flux!"
8.2 管道函数 #
flux
// 定义管道函数
multiply = (tables=<-, factor) => {
return tables
|> map(fn: (r) => ({ r with _value: r._value * factor }))
}
// 使用管道函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> multiply(factor: 100)
8.3 复杂自定义函数 #
flux
// 定义复杂函数
calculateStats = (tables=<-) => {
return tables
|> reduce(
fn: (r, accumulator) => ({
sum: r._value + accumulator.sum,
count: accumulator.count + 1.0,
min: if r._value < accumulator.min then r._value else accumulator.min,
max: if r._value > accumulator.max then r._value else accumulator.max
}),
identity: {sum: 0.0, count: 0.0, min: float(v: +∞), max: float(v: -∞)}
)
|> map(fn: (r) => ({
mean: r.sum / r.count,
min: r.min,
max: r.max,
count: r.count
}))
}
// 使用自定义函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> calculateStats()
8.4 可复用查询函数 #
flux
// 定义可复用查询
queryMetrics = (bucket, measurement, field, start=-1h) => {
return from(bucket: bucket)
|> range(start: start)
|> filter(fn: (r) =>
r._measurement == measurement and
r._field == field
)
}
// 使用函数
queryMetrics(bucket: "metrics", measurement: "cpu", field: "usage", start: -24h)
|> mean()
queryMetrics(bucket: "metrics", measurement: "memory", field: "used", start: -1h)
|> max()
九、contrib包 #
9.1 使用contrib包 #
flux
// 导入contrib包
import "contrib/jsternberg/aggregate"
// 使用contrib函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregate.rate(every: 5m, unit: 1s)
9.2 常用contrib包 #
text
常用contrib包:
contrib/RohanSreeramaoi/naiveBayesClassifier
├── 朴素贝叶斯分类器
└── 机器学习应用
contrib/anaisdg/anomalydetection
├── 异常检测
└── 数据分析
contrib/jsternberg/aggregate
├── 聚合扩展
└── rate计算
contrib/bonitoo-io/tickit
├── 告警处理
└── 通知发送
十、函数组合 #
10.1 函数链式调用 #
flux
// 组合多个函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean)
|> map(fn: (r) => ({ r with _value: r._value * 100.0 }))
|> filter(fn: (r) => r._value > 50)
|> sort(columns: ["_value"], desc: true)
|> limit(n: 10)
10.2 函数封装 #
flux
// 封装复杂查询
monitorCPU = (bucket, threshold=80, start=-1h) => {
return from(bucket: bucket)
|> range(start: start)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage"
)
|> aggregateWindow(every: 5m, fn: mean)
|> filter(fn: (r) => r._value > threshold)
|> map(fn: (r) => ({
r with
status: "warning",
threshold: threshold
}))
}
// 使用封装函数
monitorCPU(bucket: "metrics", threshold: 75, start: -24h)
十一、总结 #
Flux函数库要点:
- 内置包:date、strings、math等常用包
- 导入语法:import “package”
- 自定义函数:封装可复用逻辑
- 管道函数:支持链式调用
- contrib包:社区贡献的扩展函数
下一步,让我们学习定时任务!
最后更新:2026-03-27