基础查询 #
一、查询概述 #
1.1 Flux查询结构 #
text
Flux查询基本结构:
from(bucket: "bucket-name") // 数据源
|> range(start: -1h) // 时间范围
|> filter(fn: (r) => ...) // 过滤条件
|> ... // 其他操作
1.2 查询执行流程 #
text
查询执行流程:
1. 数据源选择
└── from() 指定Bucket
2. 时间范围限定
└── range() 必需操作
3. 数据过滤
└── filter() 筛选数据
4. 数据处理
└── 聚合、转换等
5. 结果输出
└── 返回表流
二、from函数 #
2.1 基本用法 #
flux
// 从指定Bucket查询
from(bucket: "my-bucket")
// 指定组织和Bucket
from(
bucket: "my-bucket",
org: "my-org"
)
// 使用Bucket ID
from(bucketID: "1234567890abcdef")
2.2 查询示例 #
flux
// 最简单的查询
from(bucket: "my-bucket")
// 注意:必须配合range使用
from(bucket: "my-bucket")
|> range(start: -1h)
三、range函数 #
3.1 时间范围 #
flux
// 最近1小时
from(bucket: "my-bucket")
|> range(start: -1h)
// 最近24小时
from(bucket: "my-bucket")
|> range(start: -24h)
// 最近7天
from(bucket: "my-bucket")
|> range(start: -7d)
// 最近30天
from(bucket: "my-bucket")
|> range(start: -30d)
3.2 绝对时间 #
flux
// 使用绝对时间
from(bucket: "my-bucket")
|> range(
start: 2024-01-01T00:00:00Z,
stop: 2024-01-02T00:00:00Z
)
// 单个绝对时间
from(bucket: "my-bucket")
|> range(start: 2024-01-01T00:00:00Z)
3.3 时间表达式 #
flux
// 时间表达式
from(bucket: "my-bucket")
|> range(start: -1h30m) // 1小时30分钟前
// 使用变量
start_time = -24h
from(bucket: "my-bucket")
|> range(start: start_time)
// 使用函数
import "date"
from(bucket: "my-bucket")
|> range(start: date.sub(from: now(), d: 1h))
3.4 时间单位 #
text
时间单位:
单位 符号 示例
────────────────────────────
纳秒 ns 100ns
微秒 us 100us
毫秒 ms 100ms
秒 s 100s
分钟 m 30m
小时 h 1h
天 d 7d
周 w 1w
月 mo 1mo
年 y 1y
四、filter函数 #
4.1 基本过滤 #
flux
// 过滤measurement
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
// 过滤field
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "usage")
// 过滤tag
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host == "server01")
4.2 多条件过滤 #
flux
// AND条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host == "server01"
)
// OR条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r.host == "server01" or
r.host == "server02"
)
// 复杂条件
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
(r.host == "server01" or r.host == "server02") and
r._field == "usage"
)
4.3 值过滤 #
flux
// 过滤数值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 50)
// 范围过滤
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._value > 50 and r._value < 100)
// 存在性检查
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => exists r._value)
4.4 正则过滤 #
flux
// 正则匹配
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host =~ /^server\d+$/)
// 正则不匹配
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r.host !~ /^server\d+$/)
4.5 多个filter #
flux
// 链式filter
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "server01")
|> filter(fn: (r) => r._field == "usage")
// 等价于单个filter
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.host == "server01" and
r._field == "usage"
)
五、查询结果结构 #
5.1 表流格式 #
text
查询结果结构:
表流(Table Stream)
├── 多个表(Table)
│ ├── 组键(Group Key)
│ └── 行记录(Records)
│ ├── _time: 时间戳
│ ├── _value: 值
│ ├── _field: 字段名
│ ├── _measurement: 度量名
│ └── tags: 标签
└── ...
示例输出:
_time _value _field _measurement host
────────────────────────────────────────────────────────
2024-01-01T00:00:00Z 78.5 usage cpu server01
2024-01-01T00:01:00Z 82.3 usage cpu server01
2024-01-01T00:02:00Z 75.1 usage cpu server01
5.2 默认列 #
text
默认列说明:
列名 说明
────────────────────────────────
_time 时间戳
_value 字段值
_field 字段名
_measurement 度量名
start 时间范围开始
stop 时间范围结束
六、排序和限制 #
6.1 排序 #
flux
// 按值排序
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> sort(columns: ["_value"], desc: true)
// 按时间排序
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> sort(columns: ["_time"])
// 多列排序
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> sort(columns: ["host", "_value"])
6.2 限制结果 #
flux
// 限制返回数量
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> limit(n: 10)
// 限制并跳过
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> limit(n: 10, offset: 5)
6.3 取前N条 #
flux
// 取最大的10个值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> sort(columns: ["_value"], desc: true)
|> limit(n: 10)
// 取最小的10个值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> sort(columns: ["_value"])
|> limit(n: 10)
七、分组操作 #
7.1 group函数 #
flux
// 按指定列分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host"])
// 按多列分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group(columns: ["host", "region"])
// 取消分组
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> group()
7.2 分组效果 #
text
分组前后对比:
分组前(一个表):
_time _value host
────────────────────────────────────
2024-01-01T00:00:00Z 78.5 server01
2024-01-01T00:00:00Z 65.2 server02
2024-01-01T00:01:00Z 82.3 server01
2024-01-01T00:01:00Z 70.1 server02
分组后(group by host):
表1 (host=server01):
_time _value
────────────────────────────
2024-01-01T00:00:00Z 78.5
2024-01-01T00:01:00Z 82.3
表2 (host=server02):
_time _value
────────────────────────────
2024-01-01T00:00:00Z 65.2
2024-01-01T00:01:00Z 70.1
八、去重操作 #
8.1 distinct函数 #
flux
// 获取不重复的值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> keep(columns: ["host"])
|> distinct(column: "host")
8.2 unique函数 #
flux
// 获取唯一值
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> unique(column: "host")
九、列操作 #
9.1 保留列 #
flux
// 只保留指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> keep(columns: ["_time", "_value", "host"])
9.2 删除列 #
flux
// 删除指定列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> drop(columns: ["start", "stop"])
// 删除匹配的列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> drop(fn: (column) => column =~ /^_start/)
9.3 重命名列 #
flux
// 重命名列
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> rename(columns: {"_value": "cpu_usage"})
十、CLI查询 #
10.1 基本查询 #
bash
# 使用influx query命令
influx query 'from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")'
# 使用文件
influx query --file query.flux
# 指定组织
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
--org my-org
10.2 输出格式 #
bash
# 表格格式(默认)
influx query 'from(bucket: "my-bucket") |> range(start: -1h)'
# CSV格式
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
--raw
# JSON格式
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
--json
十一、API查询 #
11.1 HTTP查询 #
bash
# 使用HTTP API查询
curl -X POST "http://localhost:8086/api/v2/query?org=my-org" \
--header "Authorization: Token YOUR_TOKEN" \
--header "Content-Type: application/vnd.flux" \
--data 'from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")'
11.2 Python查询 #
python
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
query_api = client.query_api()
# 执行查询
query = '''
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
'''
tables = query_api.query(query)
# 处理结果
for table in tables:
for record in table.records:
print(f"{record.get_time()}: {record.get_value()}")
client.close()
11.3 流式查询 #
python
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
query_api = client.query_api()
query = '''
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
'''
# 流式处理
query_api.query_stream(query, record_consumer=lambda record: print(record))
client.close()
十二、查询优化 #
12.1 优化建议 #
text
查询优化建议:
时间范围
├── 尽量缩小时间范围
├── range是必需的
└── 避免查询过长时间
过滤条件
├── 先过滤measurement
├── 再过滤tag
└── 最后过滤field
分组操作
├── 合理使用group
├── 避免过度分组
└── 注意基数问题
索引利用
├── tag查询使用索引
├── field查询不使用索引
└── 优先使用tag过滤
12.2 性能对比 #
flux
// 慢查询(先查field)
from(bucket: "my-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._value > 50)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "server01")
// 快查询(先查measurement和tag)
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "server01")
|> filter(fn: (r) => r._value > 50)
十三、完整示例 #
13.1 服务器监控查询 #
flux
// 查询服务器CPU使用率
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
r.host == "server01"
)
|> sort(columns: ["_time"])
13.2 多服务器查询 #
flux
// 查询多个服务器的CPU使用率
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
(r.host == "server01" or r.host == "server02")
)
|> group(columns: ["host"])
13.3 异常值查询 #
flux
// 查询CPU使用率超过80%的记录
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage" and
r._value > 80
)
|> sort(columns: ["_value"], desc: true)
|> limit(n: 100)
十四、总结 #
基础查询要点:
- 必需操作:from + range
- 过滤优先:先过滤measurement和tag
- 时间范围:尽量缩小查询范围
- 合理分组:根据需求选择分组方式
- 结果处理:使用sort、limit等处理结果
下一步,让我们学习过滤操作!
最后更新:2026-03-27