数据转换 #
一、数据转换概述 #
1.1 什么是数据转换 #
text
数据转换:
定义
├── 修改数据结构
├── 添加/删除列
├── 改变数据格式
└── 合并/拆分数据
常见操作
├── map - 映射转换
├── pivot - 行列转换
├── join - 数据连接
├── union - 数据合并
└── schema修改
1.2 转换流程 #
text
数据转换流程:
输入数据
│
▼
转换操作
├── 映射
├── 过滤
├── 分组
├── 连接
└── 格式化
│
▼
输出数据
二、map函数 #
2.1 基本用法 #
flux
// 修改值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
_value: r._value * 100.0
}))
// 添加新列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
status: if r._value > 80 then "warning" else "normal"
}))
// 修改多列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
_value: r._value * 100.0,
unit: "percent",
timestamp: r._time
}))
2.2 条件映射 #
flux
// 条件转换
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
level: if r._value >= 90 then "critical"
else if r._value >= 80 then "warning"
else if r._value >= 70 then "notice"
else "normal"
}))
// 值转换
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "status")
|> map(fn: (r) => ({
r with
status_code: if r._value == "active" then 1
else if r._value == "inactive" then 0
else -1
}))
2.3 类型转换 #
flux
// 类型转换
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
_value: float(v: r._value),
host_id: int(v: r.host_id)
}))
// 字符串格式化
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
display: "CPU: ${string(v: r._value)}%"
}))
2.4 时间操作 #
flux
import "date"
// 提取时间组件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
hour: date.hour(t: r._time),
day: date.day(t: r._time),
weekday: date.weekDay(t: r._time)
}))
// 格式化时间
import "date"
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
}))
三、pivot函数 #
3.1 基本用法 #
flux
// 行列转换
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
// 结果示例
// 转换前:
// _time _field _value
// 2024-01-01T00:00:00Z usage 78.5
// 2024-01-01T00:00:00Z idle 21.5
// 转换后:
// _time usage idle
// 2024-01-01T00:00:00Z 78.5 21.5
3.2 多列pivot #
flux
// 按标签和字段pivot
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> pivot(
rowKey: ["_time", "host"],
columnKey: ["_field"],
valueColumn: "_value"
)
3.3 pivot应用场景 #
flux
// 比较不同指标
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({
_time: r._time,
total: r.cpu + r.memory
}))
四、join函数 #
4.1 基本连接 #
flux
// 连接两个查询
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> rename(columns: {"_value": "cpu_value"})
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
|> rename(columns: {"_value": "memory_value"})
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
4.2 连接类型 #
flux
// 内连接(默认)
join(tables: {t1: table1, t2: table2}, on: ["_time"])
// 使用join.time
import "join"
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
join.time(
left: cpu,
right: memory,
as: (l, r) => ({
l with
memory_value: r._value
})
)
4.3 计算派生值 #
flux
// 计算CPU和内存的总负载
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "cpu_usage"})
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory" and r._field == "usage")
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "memory_usage"})
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
|> map(fn: (r) => ({
_time: r._time,
host: r.host,
total_load: r.cpu_usage + r.memory_usage
}))
五、union函数 #
5.1 基本合并 #
flux
// 合并多个查询结果
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
union(tables: [cpu, memory])
5.2 多表合并 #
flux
// 合并多个measurement
measurements = ["cpu", "memory", "disk"]
results = measurements |> map(fn: (m) =>
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == m)
)
union(tables: results)
六、schema操作 #
6.1 keep - 保留列 #
flux
// 只保留指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> keep(columns: ["_time", "_value", "host"])
6.2 drop - 删除列 #
flux
// 删除指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> drop(columns: ["start", "stop"])
// 使用函数删除
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> drop(fn: (column) => column =~ /^_start/)
6.3 rename - 重命名列 #
flux
// 重命名单列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> rename(columns: {"_value": "cpu_usage"})
// 重命名多列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> rename(columns: {
"_value": "cpu_usage",
"host": "server_name"
})
6.4 duplicate - 复制列 #
flux
// 复制列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> duplicate(column: "_value", as: "original_value")
七、时间转换 #
7.1 时间格式化 #
flux
import "date"
// 格式化时间
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
time_str: date.format(t: r._time, format: "2006-01-02 15:04:05")
}))
// 常用格式
// "2006-01-02" - 日期
// "15:04:05" - 时间
// "2006-01-02 15:04:05" - 日期时间
// "2006-01-02T15:04:05Z" - ISO格式
7.2 时间计算 #
flux
import "date"
// 时间偏移
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
next_hour: date.add(t: r._time, d: 1h),
prev_hour: date.sub(t: r._time, d: 1h)
}))
// 时间截断
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
hour_start: date.truncate(t: r._time, unit: 1h),
day_start: date.truncate(t: r._time, unit: 1d)
}))
八、数学运算 #
8.1 基本运算 #
flux
// 算术运算
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
doubled: r._value * 2.0,
half: r._value / 2.0,
plus_ten: r._value + 10.0,
minus_five: r._value - 5.0
}))
// 幂运算
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
squared: r._value ^ 2,
sqrt: math.sqrt(x: r._value)
}))
8.2 数学函数 #
flux
import "math"
// 数学函数
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> map(fn: (r) => ({
r with
abs: math.abs(x: r._value),
floor: math.floor(x: r._value),
ceil: math.ceil(x: r._value),
round: math.round(x: r._value),
log: math.log(x: r._value),
log10: math.log10(x: r._value)
}))
九、字符串操作 #
9.1 字符串函数 #
flux
import "strings"
// 字符串操作
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "logs")
|> map(fn: (r) => ({
r with
upper: strings.toUpper(v: r.message),
lower: strings.toLower(v: r.message),
length: strings.strlen(v: r.message),
trimmed: strings.trim(v: r.message)
}))
// 字符串替换
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "logs")
|> map(fn: (r) => ({
r with
replaced: strings.replace(v: r.message, t: "error", u: "ERROR")
}))
// 字符串分割
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "logs")
|> map(fn: (r) => ({
r with
parts: strings.split(v: r.message, t: " ")
}))
十、实际应用示例 #
10.1 综合性能指标 #
flux
// 计算综合性能分数
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage")
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "cpu"})
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory" and r._field == "usage")
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "memory"})
disk = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "disk" and r._field == "usage")
|> aggregateWindow(every: 5m, fn: mean)
|> rename(columns: {"_value": "disk"})
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
|> join(tables: {disk: disk}, on: ["_time", "host"])
|> map(fn: (r) => ({
_time: r._time,
host: r.host,
performance_score: (r.cpu * 0.4 + r.memory * 0.3 + r.disk * 0.3),
status: if (r.cpu * 0.4 + r.memory * 0.3 + r.disk * 0.3) > 80 then "warning"
else "normal"
}))
10.2 数据格式化输出 #
flux
import "date"
// 格式化输出报表
from(bucket: "metrics")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 1h, fn: mean)
|> map(fn: (r) => ({
_time: r._time,
time_str: date.format(t: r._time, format: "2006-01-02 15:04"),
host: r.host,
cpu_usage: "${string(v: math.round(x: r._value * 100.0) / 100.0)}%",
status: if r._value > 80 then "⚠️ Warning" else "✅ Normal"
}))
|> keep(columns: ["time_str", "host", "cpu_usage", "status"])
十一、总结 #
数据转换要点:
- map函数:灵活的数据映射和转换
- pivot函数:行列转换,便于比较
- join函数:连接多个数据源
- union函数:合并多个结果集
- schema操作:修改数据结构
下一步,让我们学习Flux函数库!
最后更新:2026-03-27