数据转换 #

一、数据转换概述 #

1.1 什么是数据转换 #

text
数据转换:

定义
├── 修改数据结构
├── 添加/删除列
├── 改变数据格式
└── 合并/拆分数据

常见操作
├── map - 映射转换
├── pivot - 行列转换
├── join - 数据连接
├── union - 数据合并
└── schema修改

1.2 转换流程 #

text
数据转换流程:

输入数据
    │
    ▼
转换操作
├── 映射
├── 过滤
├── 分组
├── 连接
└── 格式化
        │
        ▼
输出数据

二、map函数 #

2.1 基本用法 #

flux
// 修改值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0
    }))

// 添加新列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        status: if r._value > 80 then "warning" else "normal"
    }))

// 修改多列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0,
        unit: "percent",
        timestamp: r._time
    }))

2.2 条件映射 #

flux
// 条件转换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        level: if r._value >= 90 then "critical"
               else if r._value >= 80 then "warning"
               else if r._value >= 70 then "notice"
               else "normal"
    }))

// 值转换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "status")
    |> map(fn: (r) => ({
        r with
        status_code: if r._value == "active" then 1
                     else if r._value == "inactive" then 0
                     else -1
    }))

2.3 类型转换 #

flux
// 类型转换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        _value: float(v: r._value),
        host_id: int(v: r.host_id)
    }))

// 字符串格式化
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        display: "CPU: ${string(v: r._value)}%"
    }))

2.4 时间操作 #

flux
import "date"

// 提取时间组件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        hour: date.hour(t: r._time),
        day: date.day(t: r._time),
        weekday: date.weekDay(t: r._time)
    }))

// 格式化时间
import "date"
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
    }))

三、pivot函数 #

3.1 基本用法 #

flux
// 行列转换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

// 结果示例
// 转换前:
// _time                _field  _value
// 2024-01-01T00:00:00Z usage   78.5
// 2024-01-01T00:00:00Z idle    21.5

// 转换后:
// _time                usage  idle
// 2024-01-01T00:00:00Z 78.5   21.5

3.2 多列pivot #

flux
// 按标签和字段pivot
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> pivot(
        rowKey: ["_time", "host"],
        columnKey: ["_field"],
        valueColumn: "_value"
    )

3.3 pivot应用场景 #

flux
// 比较不同指标
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "system")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({
        _time: r._time,
        total: r.cpu + r.memory
    }))

四、join函数 #

4.1 基本连接 #

flux
// 连接两个查询
cpu = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> rename(columns: {"_value": "cpu_value"})

memory = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory")
    |> rename(columns: {"_value": "memory_value"})

join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])

4.2 连接类型 #

flux
// 内连接(默认)
join(tables: {t1: table1, t2: table2}, on: ["_time"])

// 使用join.time
import "join"

cpu = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")

memory = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory")

join.time(
    left: cpu,
    right: memory,
    as: (l, r) => ({
        l with
        memory_value: r._value
    })
)

4.3 计算派生值 #

flux
// 计算CPU和内存的总负载
cpu = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "cpu_usage"})

memory = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory" and r._field == "usage")
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "memory_usage"})

join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
    |> map(fn: (r) => ({
        _time: r._time,
        host: r.host,
        total_load: r.cpu_usage + r.memory_usage
    }))

五、union函数 #

5.1 基本合并 #

flux
// 合并多个查询结果
cpu = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")

memory = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory")

union(tables: [cpu, memory])

5.2 多表合并 #

flux
// 合并多个measurement
measurements = ["cpu", "memory", "disk"]

results = measurements |> map(fn: (m) =>
    from(bucket: "metrics")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == m)
)

union(tables: results)

六、schema操作 #

6.1 keep - 保留列 #

flux
// 只保留指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> keep(columns: ["_time", "_value", "host"])

6.2 drop - 删除列 #

flux
// 删除指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> drop(columns: ["start", "stop"])

// 使用函数删除
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> drop(fn: (column) => column =~ /^_start/)

6.3 rename - 重命名列 #

flux
// 重命名单列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> rename(columns: {"_value": "cpu_usage"})

// 重命名多列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> rename(columns: {
        "_value": "cpu_usage",
        "host": "server_name"
    })

6.4 duplicate - 复制列 #

flux
// 复制列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> duplicate(column: "_value", as: "original_value")

七、时间转换 #

7.1 时间格式化 #

flux
import "date"

// 格式化时间
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
    }))

// 常用格式
// "2006-01-02"           - 日期
// "15:04:05"             - 时间
// "2006-01-02 15:04:05"  - 日期时间
// "2006-01-02T15:04:05Z" - ISO格式

7.2 时间计算 #

flux
import "date"

// 时间偏移
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        next_hour: date.add(t: r._time, d: 1h),
        prev_hour: date.sub(t: r._time, d: 1h)
    }))

// 时间截断
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        hour_start: date.truncate(t: r._time, unit: 1h),
        day_start: date.truncate(t: r._time, unit: 1d)
    }))

八、数学运算 #

8.1 基本运算 #

flux
// 算术运算
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        doubled: r._value * 2.0,
        half: r._value / 2.0,
        plus_ten: r._value + 10.0,
        minus_five: r._value - 5.0
    }))

// 幂运算
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        squared: r._value ^ 2,
        sqrt: math.sqrt(x: r._value)
    }))

8.2 数学函数 #

flux
import "math"

// 数学函数
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> map(fn: (r) => ({
        r with
        abs: math.abs(x: r._value),
        floor: math.floor(x: r._value),
        ceil: math.ceil(x: r._value),
        round: math.round(x: r._value),
        log: math.log(x: r._value),
        log10: math.log10(x: r._value)
    }))

九、字符串操作 #

9.1 字符串函数 #

flux
import "strings"

// 字符串操作
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "logs")
    |> map(fn: (r) => ({
        r with
        upper: strings.toUpper(v: r.message),
        lower: strings.toLower(v: r.message),
        length: strings.strlen(v: r.message),
        trimmed: strings.trim(v: r.message)
    }))

// 字符串替换
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "logs")
    |> map(fn: (r) => ({
        r with
        replaced: strings.replace(v: r.message, t: "error", u: "ERROR")
    }))

// 字符串分割
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "logs")
    |> map(fn: (r) => ({
        r with
        parts: strings.split(v: r.message, t: " ")
    }))

十、实际应用示例 #

10.1 综合性能指标 #

flux
// 计算综合性能分数
cpu = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "cpu"})

memory = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "memory" and r._field == "usage")
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "memory"})

disk = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "disk" and r._field == "usage")
    |> aggregateWindow(every: 5m, fn: mean)
    |> rename(columns: {"_value": "disk"})

join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
    |> join(tables: {disk: disk}, on: ["_time", "host"])
    |> map(fn: (r) => ({
        _time: r._time,
        host: r.host,
        performance_score: (r.cpu * 0.4 + r.memory * 0.3 + r.disk * 0.3),
        status: if (r.cpu * 0.4 + r.memory * 0.3 + r.disk * 0.3) > 80 then "warning"
                else "normal"
    }))

10.2 数据格式化输出 #

flux
import "date"

// 格式化输出报表
from(bucket: "metrics")
    |> range(start: -1d)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 1h, fn: mean)
    |> map(fn: (r) => ({
        _time: r._time,
        time_str: date.format(t: r._time, format: "2006-01-02 15:04"),
        host: r.host,
        cpu_usage: "${string(v: math.round(x: r._value * 100.0) / 100.0)}%",
        status: if r._value > 80 then "⚠️ Warning" else "✅ Normal"
    }))
    |> keep(columns: ["time_str", "host", "cpu_usage", "status"])

十一、总结 #

数据转换要点:

  1. map函数:灵活的数据映射和转换
  2. pivot函数:行列转换,便于比较
  3. join函数:连接多个数据源
  4. union函数:合并多个结果集
  5. schema操作:修改数据结构

下一步,让我们学习Flux函数库!

最后更新:2026-03-27