基础查询 #

一、查询概述 #

1.1 Flux查询结构 #

text
Flux查询基本结构:

from(bucket: "bucket-name")    // 数据源
    |> range(start: -1h)        // 时间范围
    |> filter(fn: (r) => ...)   // 过滤条件
    |> ...                      // 其他操作

1.2 查询执行流程 #

text
查询执行流程:

1. 数据源选择
   └── from() 指定Bucket

2. 时间范围限定
   └── range() 必需操作

3. 数据过滤
   └── filter() 筛选数据

4. 数据处理
   └── 聚合、转换等

5. 结果输出
   └── 返回表流

二、from函数 #

2.1 基本用法 #

flux
// 从指定Bucket查询
from(bucket: "my-bucket")

// 指定组织和Bucket
from(
    bucket: "my-bucket",
    org: "my-org"
)

// 使用Bucket ID
from(bucketID: "1234567890abcdef")

2.2 查询示例 #

flux
// 最简单的查询
from(bucket: "my-bucket")

// 注意:必须配合range使用
from(bucket: "my-bucket")
    |> range(start: -1h)

三、range函数 #

3.1 时间范围 #

flux
// 最近1小时
from(bucket: "my-bucket")
    |> range(start: -1h)

// 最近24小时
from(bucket: "my-bucket")
    |> range(start: -24h)

// 最近7天
from(bucket: "my-bucket")
    |> range(start: -7d)

// 最近30天
from(bucket: "my-bucket")
    |> range(start: -30d)

3.2 绝对时间 #

flux
// 使用绝对时间
from(bucket: "my-bucket")
    |> range(
        start: 2024-01-01T00:00:00Z,
        stop: 2024-01-02T00:00:00Z
    )

// 单个绝对时间
from(bucket: "my-bucket")
    |> range(start: 2024-01-01T00:00:00Z)

3.3 时间表达式 #

flux
// 时间表达式
from(bucket: "my-bucket")
    |> range(start: -1h30m)  // 1小时30分钟前

// 使用变量
start_time = -24h
from(bucket: "my-bucket")
    |> range(start: start_time)

// 使用函数
import "date"
from(bucket: "my-bucket")
    |> range(start: date.sub(from: now(), d: 1h))

3.4 时间单位 #

text
时间单位:

单位      符号      示例
────────────────────────────
纳秒      ns        100ns
微秒      us        100us
毫秒      ms        100ms
秒        s         100s
分钟      m         30m
小时      h         1h
天        d         7d
周        w         1w
月        mo        1mo
年        y         1y

四、filter函数 #

4.1 基本过滤 #

flux
// 过滤measurement
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")

// 过滤field
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "usage")

// 过滤tag
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host == "server01")

4.2 多条件过滤 #

flux
// AND条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and 
        r.host == "server01"
    )

// OR条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r.host == "server01" or 
        r.host == "server02"
    )

// 复杂条件
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        (r.host == "server01" or r.host == "server02") and
        r._field == "usage"
    )

4.3 值过滤 #

flux
// 过滤数值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 50)

// 范围过滤
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._value > 50 and r._value < 100)

// 存在性检查
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => exists r._value)

4.4 正则过滤 #

flux
// 正则匹配
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host =~ /^server\d+$/)

// 正则不匹配
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r.host !~ /^server\d+$/)

4.5 多个filter #

flux
// 链式filter
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r.host == "server01")
    |> filter(fn: (r) => r._field == "usage")

// 等价于单个filter
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r.host == "server01" and
        r._field == "usage"
    )

五、查询结果结构 #

5.1 表流格式 #

text
查询结果结构:

表流(Table Stream)
├── 多个表(Table)
│   ├── 组键(Group Key)
│   └── 行记录(Records)
│       ├── _time: 时间戳
│       ├── _value: 值
│       ├── _field: 字段名
│       ├── _measurement: 度量名
│       └── tags: 标签
└── ...

示例输出:
_time                _value  _field  _measurement  host
────────────────────────────────────────────────────────
2024-01-01T00:00:00Z 78.5    usage   cpu           server01
2024-01-01T00:01:00Z 82.3    usage   cpu           server01
2024-01-01T00:02:00Z 75.1    usage   cpu           server01

5.2 默认列 #

text
默认列说明:

列名            说明
────────────────────────────────
_time          时间戳
_value         字段值
_field         字段名
_measurement   度量名
start          时间范围开始
stop           时间范围结束

六、排序和限制 #

6.1 排序 #

flux
// 按值排序
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> sort(columns: ["_value"], desc: true)

// 按时间排序
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> sort(columns: ["_time"])

// 多列排序
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> sort(columns: ["host", "_value"])

6.2 限制结果 #

flux
// 限制返回数量
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> limit(n: 10)

// 限制并跳过
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> limit(n: 10, offset: 5)

6.3 取前N条 #

flux
// 取最大的10个值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> sort(columns: ["_value"], desc: true)
    |> limit(n: 10)

// 取最小的10个值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> sort(columns: ["_value"])
    |> limit(n: 10)

七、分组操作 #

7.1 group函数 #

flux
// 按指定列分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host"])

// 按多列分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group(columns: ["host", "region"])

// 取消分组
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> group()

7.2 分组效果 #

text
分组前后对比:

分组前(一个表):
_time                _value  host
────────────────────────────────────
2024-01-01T00:00:00Z 78.5    server01
2024-01-01T00:00:00Z 65.2    server02
2024-01-01T00:01:00Z 82.3    server01
2024-01-01T00:01:00Z 70.1    server02

分组后(group by host):

表1 (host=server01):
_time                _value
────────────────────────────
2024-01-01T00:00:00Z 78.5
2024-01-01T00:01:00Z 82.3

表2 (host=server02):
_time                _value
────────────────────────────
2024-01-01T00:00:00Z 65.2
2024-01-01T00:01:00Z 70.1

八、去重操作 #

8.1 distinct函数 #

flux
// 获取不重复的值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> keep(columns: ["host"])
    |> distinct(column: "host")

8.2 unique函数 #

flux
// 获取唯一值
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> unique(column: "host")

九、列操作 #

9.1 保留列 #

flux
// 只保留指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> keep(columns: ["_time", "_value", "host"])

9.2 删除列 #

flux
// 删除指定列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> drop(columns: ["start", "stop"])

// 删除匹配的列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> drop(fn: (column) => column =~ /^_start/)

9.3 重命名列 #

flux
// 重命名列
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> rename(columns: {"_value": "cpu_usage"})

十、CLI查询 #

10.1 基本查询 #

bash
# 使用influx query命令
influx query 'from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")'

# 使用文件
influx query --file query.flux

# 指定组织
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
    --org my-org

10.2 输出格式 #

bash
# 表格格式(默认)
influx query 'from(bucket: "my-bucket") |> range(start: -1h)'

# CSV格式
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
    --raw

# JSON格式
influx query 'from(bucket: "my-bucket") |> range(start: -1h)' \
    --json

十一、API查询 #

11.1 HTTP查询 #

bash
# 使用HTTP API查询
curl -X POST "http://localhost:8086/api/v2/query?org=my-org" \
    --header "Authorization: Token YOUR_TOKEN" \
    --header "Content-Type: application/vnd.flux" \
    --data 'from(bucket: "my-bucket")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "cpu")'

11.2 Python查询 #

python
from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")

query_api = client.query_api()

# 执行查询
query = '''
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
'''

tables = query_api.query(query)

# 处理结果
for table in tables:
    for record in table.records:
        print(f"{record.get_time()}: {record.get_value()}")

client.close()

11.3 流式查询 #

python
from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
query_api = client.query_api()

query = '''
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
'''

# 流式处理
query_api.query_stream(query, record_consumer=lambda record: print(record))

client.close()

十二、查询优化 #

12.1 优化建议 #

text
查询优化建议:

时间范围
├── 尽量缩小时间范围
├── range是必需的
└── 避免查询过长时间

过滤条件
├── 先过滤measurement
├── 再过滤tag
└── 最后过滤field

分组操作
├── 合理使用group
├── 避免过度分组
└── 注意基数问题

索引利用
├── tag查询使用索引
├── field查询不使用索引
└── 优先使用tag过滤

12.2 性能对比 #

flux
// 慢查询(先查field)
from(bucket: "my-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => r._value > 50)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r.host == "server01")

// 快查询(先查measurement和tag)
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> filter(fn: (r) => r.host == "server01")
    |> filter(fn: (r) => r._value > 50)

十三、完整示例 #

13.1 服务器监控查询 #

flux
// 查询服务器CPU使用率
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        r.host == "server01"
    )
    |> sort(columns: ["_time"])

13.2 多服务器查询 #

flux
// 查询多个服务器的CPU使用率
from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        (r.host == "server01" or r.host == "server02")
    )
    |> group(columns: ["host"])

13.3 异常值查询 #

flux
// 查询CPU使用率超过80%的记录
from(bucket: "metrics")
    |> range(start: -24h)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage" and
        r._value > 80
    )
    |> sort(columns: ["_value"], desc: true)
    |> limit(n: 100)

十四、总结 #

基础查询要点:

  1. 必需操作:from + range
  2. 过滤优先:先过滤measurement和tag
  3. 时间范围:尽量缩小查询范围
  4. 合理分组:根据需求选择分组方式
  5. 结果处理:使用sort、limit等处理结果

下一步,让我们学习过滤操作!

最后更新:2026-03-27