Flux函数库 #

一、函数库概述 #

1.1 内置包 #

text
Flux内置包:

核心包
├── universe - 基础函数
├── influxdata/influxdb - InfluxDB操作
└── builtin - 所有内置函数

常用包
├── date - 时间操作
├── strings - 字符串操作
├── math - 数学运算
├── json - JSON处理
└── csv - CSV处理

1.2 导入包 #

flux
// 导入包
import "date"
import "strings"
import "math"

// 使用导入的函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        hour: date.hour(t: r._time),
        upper: strings.toUpper(v: r.host)
    }))

二、时间函数库 #

2.1 date包 #

flux
import "date"

// 获取时间组件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        year: date.year(t: r._time),
        month: date.month(t: r._time),
        day: date.day(t: r._time),
        hour: date.hour(t: r._time),
        minute: date.minute(t: r._time),
        weekday: date.weekDay(t: r._time)
    }))

// 时间格式化
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
    }))

// 时间计算
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        next_hour: date.add(t: r._time, d: 1h),
        prev_hour: date.sub(t: r._time, d: 1h),
        hour_start: date.truncate(t: r._time, unit: 1h)
    }))

2.2 常用时间函数 #

flux
import "date"

// date.hour - 获取小时
date.hour(t: now())

// date.minute - 获取分钟
date.minute(t: now())

// date.day - 获取日期
date.day(t: now())

// date.weekDay - 获取星期(0-6)
date.weekDay(t: now())

// date.truncate - 截断时间
date.truncate(t: now(), unit: 1h)

// date.add - 时间加法
date.add(t: now(), d: 1h)

// date.sub - 时间减法
date.sub(t: now(), d: 1h)

三、字符串函数库 #

3.1 strings包 #

flux
import "strings"

// 大小写转换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        upper: strings.toUpper(v: r.status),
        lower: strings.toLower(v: r.status),
        title: strings.title(v: r.status)
    }))

// 字符串操作
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        length: strings.strlen(v: r.message),
        trimmed: strings.trim(v: r.message),
        replaced: strings.replace(v: r.message, t: "error", u: "ERROR")
    }))

// 字符串检查
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        strings.containsStr(v: r.message, substr: "error") and
        strings.hasPrefix(v: r.message, prefix: "ERR")
    )

3.2 常用字符串函数 #

flux
import "strings"

// strings.toUpper - 转大写
strings.toUpper(v: "hello")

// strings.toLower - 转小写
strings.toLower(v: "HELLO")

// strings.trim - 去空格
strings.trim(v: "  hello  ")

// strings.replace - 替换
strings.replace(v: "hello world", t: "world", u: "InfluxDB")

// strings.split - 分割
strings.split(v: "a,b,c", t: ",")

// strings.join - 连接
strings.join(arr: ["a", "b", "c"], v: ",")

// strings.containsStr - 包含检查
strings.containsStr(v: "hello world", substr: "world")

// strings.hasPrefix - 前缀检查
strings.hasPrefix(v: "hello", prefix: "he")

// strings.hasSuffix - 后缀检查
strings.hasSuffix(v: "hello", suffix: "lo")

四、数学函数库 #

4.1 math包 #

flux
import "math"

// 数学运算
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        abs: math.abs(x: r._value),
        sqrt: math.sqrt(x: r._value),
        pow: math.pow(x: r._value, n: 2.0),
        log: math.log(x: r._value),
        log10: math.log10(x: r._value)
    }))

// 取整函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> map(fn: (r) => ({
        r with
        floor: math.floor(x: r._value),
        ceil: math.ceil(x: r._value),
        round: math.round(x: r._value)
    }))

4.2 常用数学函数 #

flux
import "math"

// math.abs - 绝对值
math.abs(x: -10.5)

// math.sqrt - 平方根
math.sqrt(x: 16.0)

// math.pow - 幂运算
math.pow(x: 2.0, n: 3.0)

// math.floor - 向下取整
math.floor(x: 3.7)

// math.ceil - 向上取整
math.ceil(x: 3.2)

// math.round - 四舍五入
math.round(x: 3.5)

// math.log - 自然对数
math.log(x: 10.0)

// math.log10 - 常用对数
math.log10(x: 100.0)

// math.sin/cos/tan - 三角函数
math.sin(x: 1.0)
math.cos(x: 1.0)
math.tan(x: 1.0)

五、JSON函数库 #

5.1 json包 #

flux
import "json"

// 编码JSON
data = {
    name: "InfluxDB",
    version: "2.7",
    features: ["time-series", "flux"]
}
json.encode(v: data)

// 解码JSON
jsonStr = "{\"name\":\"InfluxDB\",\"version\":\"2.7\"}"
json.decode(v: jsonStr)

5.2 JSON应用示例 #

flux
import "json"
import "http"

// 发送JSON数据
alertData = {
    measurement: "cpu",
    value: 95.5,
    host: "server01",
    timestamp: now()
}

http.post(
    url: "https://api.example.com/alert",
    headers: {"Content-Type": "application/json"},
    data: json.encode(v: alertData)
)

六、HTTP函数库 #

6.1 http包 #

flux
import "http"

// GET请求
response = http.get(url: "https://api.example.com/data")

// POST请求
response = http.post(
    url: "https://api.example.com/data",
    headers: {"Content-Type": "application/json"},
    data: "{\"key\":\"value\"}"
)

// 处理响应
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 80)
    |> map(fn: (r) => {
        response = http.post(
            url: "https://hooks.example.com/alert",
            headers: {"Content-Type": "application/json"},
            data: "{\"host\":\"${r.host}\",\"value\":${string(v: r._value)}}"
        )
        return r
    })

七、InfluxDB函数库 #

7.1 influxdb包 #

flux
import "influxdata/influxdb"

// 列出所有Bucket
influxdb.buckets()

// 列出所有组织
influxdb.orgs()

// 获取Cardinality
influxdb.cardinality(
    bucket: "my-bucket",
    start: -30d
)

// 获取Series数量
influxdb.series(
    bucket: "my-bucket",
    start: -30d
)

7.2 v1兼容包 #

flux
import "influxdata/influxdb/v1"

// 使用InfluxQL语法查询
v1.query(
    bucket: "my-bucket",
    query: "SELECT mean(\"value\") FROM \"cpu\" WHERE time > now() - 1h GROUP BY time(5m)"
)

八、自定义函数 #

8.1 基本自定义函数 #

flux
// 定义简单函数
double = (x) => x * 2

// 使用函数
double(x: 5)  // 返回 10

// 带默认参数
greet = (name="World") => "Hello, ${name}!"

greet()              // "Hello, World!"
greet(name: "Flux")  // "Hello, Flux!"

8.2 管道函数 #

flux
// 定义管道函数
multiply = (tables=<-, factor) => {
    return tables
        |> map(fn: (r) => ({ r with _value: r._value * factor }))
}

// 使用管道函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> multiply(factor: 100)

8.3 复杂自定义函数 #

flux
// 定义复杂函数
calculateStats = (tables=<-) => {
    return tables
        |> reduce(
            fn: (r, accumulator) => ({
                sum: r._value + accumulator.sum,
                count: accumulator.count + 1.0,
                min: if r._value < accumulator.min then r._value else accumulator.min,
                max: if r._value > accumulator.max then r._value else accumulator.max
            }),
            identity: {sum: 0.0, count: 0.0, min: float(v: +∞), max: float(v: -∞)}
        )
        |> map(fn: (r) => ({
            mean: r.sum / r.count,
            min: r.min,
            max: r.max,
            count: r.count
        }))
}

// 使用自定义函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> calculateStats()

8.4 可复用查询函数 #

flux
// 定义可复用查询
queryMetrics = (bucket, measurement, field, start=-1h) => {
    return from(bucket: bucket)
        |> range(start: start)
        |> filter(fn: (r) => 
            r._measurement == measurement and
            r._field == field
        )
}

// 使用函数
queryMetrics(bucket: "metrics", measurement: "cpu", field: "usage", start: -24h)
    |> mean()

queryMetrics(bucket: "metrics", measurement: "memory", field: "used", start: -1h)
    |> max()

九、contrib包 #

9.1 使用contrib包 #

flux
// 导入contrib包
import "contrib/jsternberg/aggregate"

// 使用contrib函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregate.rate(every: 5m, unit: 1s)

9.2 常用contrib包 #

text
常用contrib包:

contrib/RohanSreeramaoi/naiveBayesClassifier
├── 朴素贝叶斯分类器
└── 机器学习应用

contrib/anaisdg/anomalydetection
├── 异常检测
└── 数据分析

contrib/jsternberg/aggregate
├── 聚合扩展
└── rate计算

contrib/bonitoo-io/tickit
├── 告警处理
└── 通知发送

十、函数组合 #

10.1 函数链式调用 #

flux
// 组合多个函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean)
    |> map(fn: (r) => ({ r with _value: r._value * 100.0 }))
    |> filter(fn: (r) => r._value > 50)
    |> sort(columns: ["_value"], desc: true)
    |> limit(n: 10)

10.2 函数封装 #

flux
// 封装复杂查询
monitorCPU = (bucket, threshold=80, start=-1h) => {
    return from(bucket: bucket)
        |> range(start: start)
        |> filter(fn: (r) => 
            r._measurement == "cpu" and
            r._field == "usage"
        )
        |> aggregateWindow(every: 5m, fn: mean)
        |> filter(fn: (r) => r._value > threshold)
        |> map(fn: (r) => ({
            r with
            status: "warning",
            threshold: threshold
        }))
}

// 使用封装函数
monitorCPU(bucket: "metrics", threshold: 75, start: -24h)

十一、总结 #

Flux函数库要点:

  1. 内置包:date、strings、math等常用包
  2. 导入语法:import “package”
  3. 自定义函数:封装可复用逻辑
  4. 管道函数:支持链式调用
  5. contrib包:社区贡献的扩展函数

下一步,让我们学习定时任务!

最后更新:2026-03-27