定时任务 #

一、任务概述 #

1.1 什么是任务 #

text
InfluxDB任务:

定义
├── 定期执行的Flux脚本
├── 自动化数据处理
└── 无需人工干预

用途
├── 数据降采样
├── 数据聚合
├── 数据清理
├── 告警检查
└── 数据备份

1.2 任务结构 #

text
任务结构:

option task = {
    name: "task-name",       // 任务名称
    every: 1h,               // 执行间隔
    offset: 0m,              // 偏移时间
    cron: "0 * * * *",       // Cron表达式(可选)
}

// Flux查询语句
from(bucket: "source")
    |> range(start: -task.every)
    // ... 数据处理
    |> to(bucket: "destination")

二、创建任务 #

2.1 Web界面创建 #

text
创建步骤:

1. 登录InfluxDB UI
   └── http://localhost:8086

2. 进入任务页面
   └── 左侧菜单 → Tasks

3. 创建任务
   ├── 点击 "Create Task"
   ├── 输入任务名称
   ├── 设置执行计划
   └── 编写Flux脚本

4. 保存任务
   └── 点击 "Save"

2.2 CLI创建 #

bash
# 从文件创建任务
influx task create --file task.flux

# task.flux内容
option task = {
    name: "downsample-cpu",
    every: 1h
}

from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-downsampled", org: "my-org")

2.3 API创建 #

bash
# 使用API创建任务
curl -X POST "http://localhost:8086/api/v2/tasks" \
    --header "Authorization: Token YOUR_TOKEN" \
    --header "Content-Type: application/json" \
    --data '{
        "name": "downsample-cpu",
        "every": "1h",
        "flux": "option task = {name: \"downsample-cpu\", every: 1h}\n\nfrom(bucket: \"metrics\")\n    |> range(start: -1h)\n    |> filter(fn: (r) => r._measurement == \"cpu\")\n    |> aggregateWindow(every: 5m, fn: mean)\n    |> to(bucket: \"metrics-downsampled\")"
    }'

三、任务调度 #

3.1 固定间隔 #

flux
// 每小时执行
option task = {
    name: "hourly-task",
    every: 1h
}

// 每5分钟执行
option task = {
    name: "5min-task",
    every: 5m
}

// 每天执行
option task = {
    name: "daily-task",
    every: 24h
}

3.2 Cron表达式 #

flux
// 每小时整点执行
option task = {
    name: "cron-hourly",
    cron: "0 * * * *"
}

// 每天凌晨2点执行
option task = {
    name: "cron-daily",
    cron: "0 2 * * *"
}

// 每周一早上8点执行
option task = {
    name: "cron-weekly",
    cron: "0 8 * * 1"
}

// 每月1号凌晨执行
option task = {
    name: "cron-monthly",
    cron: "0 0 1 * *"
}
text
Cron表达式格式:

字段        允许值           特殊字符
─────────────────────────────────────
分钟        0-59            * / , -
小时        0-23            * / , -
日期        1-31            * / , ?
月份        1-12            * / , -
星期        0-6             * / , ?

特殊字符:
* - 任意值
/ - 间隔
, - 列表
- - 范围
? - 不指定

3.3 偏移时间 #

flux
// 延迟5分钟执行
option task = {
    name: "offset-task",
    every: 1h,
    offset: 5m
}

// 等待数据完全写入后再处理
option task = {
    name: "late-data-task",
    every: 1h,
    offset: 10m
}

四、常用任务示例 #

4.1 数据降采样 #

flux
// 降采样任务:将原始数据聚合为5分钟间隔
option task = {
    name: "downsample-metrics",
    every: 5m
}

from(bucket: "metrics-raw")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "metrics-downsampled", org: "my-org")

4.2 数据聚合 #

flux
// 每小时聚合任务
option task = {
    name: "hourly-aggregation",
    every: 1h
}

from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "http_requests")
    |> group(columns: ["service", "endpoint"])
    |> aggregateWindow(every: 1h, fn: sum)
    |> to(bucket: "metrics-hourly", org: "my-org")

4.3 数据清理 #

flux
// 清理过期数据
option task = {
    name: "cleanup-old-data",
    every: 24h
}

import "date"

from(bucket: "logs")
    |> range(start: -30d, stop: -7d)
    |> filter(fn: (r) => r._measurement == "application")
    |> delete()

4.4 告警检查 #

flux
// CPU使用率告警任务
option task = {
    name: "cpu-alert",
    every: 5m
}

import "http"
import "json"

threshold = 80.0

from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage"
    )
    |> mean()
    |> filter(fn: (r) => r._value > threshold)
    |> map(fn: (r) => {
        http.post(
            url: "https://hooks.slack.com/services/xxx",
            headers: {"Content-Type": "application/json"},
            data: json.encode(v: {
                text: "CPU告警: ${r.host} 使用率 ${string(v: r._value)}%"
            })
        )
        return r
    })

4.5 数据迁移 #

flux
// 数据迁移任务
option task = {
    name: "data-migration",
    every: 1h
}

from(bucket: "source-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> to(bucket: "dest-bucket", org: "my-org")

五、管理任务 #

5.1 查看任务 #

bash
# 列出所有任务
influx task list

# 查看特定任务
influx task list --name "downsample-cpu"

# 查看任务详情
influx task list --id TASK_ID

5.2 更新任务 #

bash
# 更新任务
influx task update \
    --id TASK_ID \
    --name "new-task-name" \
    --every 30m

# 从文件更新
influx task update --id TASK_ID --file updated-task.flux

5.3 启用/禁用任务 #

bash
# 禁用任务
influx task update --id TASK_ID --status inactive

# 启用任务
influx task update --id TASK_ID --status active

5.4 删除任务 #

bash
# 删除任务
influx task delete --id TASK_ID

# 按名称删除
influx task delete --name "old-task"

六、任务日志 #

6.1 查看日志 #

bash
# 查看任务运行日志
influx task log list --id TASK_ID

# 查看特定运行的日志
influx task log list --id TASK_ID --run-id RUN_ID

6.2 Web界面查看 #

text
查看任务日志:

1. 进入Tasks页面
2. 点击任务名称
3. 选择"Run History"标签
4. 查看运行记录和日志

七、任务监控 #

7.1 运行状态 #

text
任务状态:

active
├── 任务正在运行
├── 按计划执行
└── 正常状态

inactive
├── 任务已暂停
├── 不会执行
└── 手动禁用

failed
├── 执行失败
├── 需要检查
└── 查看日志排查

7.2 监控任务执行 #

flux
// 监控任务执行情况
import "influxdata/influxdb"
import "influxdata/influxdb/monitor"

monitor.from(start: -1h)
    |> filter(fn: (r) => r._measurement == "task")
    |> filter(fn: (r) => r.status == "failed")

八、高级任务技巧 #

8.1 条件执行 #

flux
// 条件执行任务
option task = {
    name: "conditional-task",
    every: 1h
}

result = from(bucket: "metrics")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> mean()

// 只有超过阈值才写入
result
    |> filter(fn: (r) => r._value > 70)
    |> to(bucket: "alerts")

8.2 多Bucket操作 #

flux
// 多Bucket数据聚合
option task = {
    name: "multi-bucket-task",
    every: 1h
}

cpu = from(bucket: "metrics-cpu")
    |> range(start: -1h)
    |> aggregateWindow(every: 1h, fn: mean)

memory = from(bucket: "metrics-memory")
    |> range(start: -1h)
    |> aggregateWindow(every: 1h, fn: mean)

union(tables: [cpu, memory])
    |> to(bucket: "metrics-combined")

8.3 错误处理 #

flux
// 带错误处理的任务
option task = {
    name: "error-handling-task",
    every: 5m
}

import "experimental"

from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "cpu")
    |> experimental.set(o: {error: "none"})
    |> to(bucket: "processed")

九、任务最佳实践 #

9.1 性能优化 #

text
任务性能建议:

时间范围
├── 使用适当的时间范围
├── 避免查询过大范围
└── 使用task.every变量

数据处理
├── 先过滤再处理
├── 使用聚合减少数据量
└── 批量写入

执行时间
├── 合理设置执行间隔
├── 避免任务重叠
└── 使用offset延迟

9.2 可靠性建议 #

text
任务可靠性建议:

错误处理
├── 添加错误日志
├── 实现重试逻辑
└── 监控任务状态

数据一致性
├── 使用幂等操作
├── 处理重复数据
└── 检查数据完整性

监控告警
├── 监控任务执行
├── 失败时发送告警
└── 定期检查日志

十、完整示例 #

10.1 综合监控任务 #

flux
// 综合监控任务
option task = {
    name: "comprehensive-monitoring",
    every: 5m,
    offset: 1m
}

import "http"
import "json"
import "date"

// 阈值配置
thresholds = {
    cpu: 80.0,
    memory: 85.0,
    disk: 90.0
}

// 发送告警函数
sendAlert = (metric, host, value, threshold) => {
    http.post(
        url: "https://hooks.slack.com/services/xxx",
        headers: {"Content-Type": "application/json"},
        data: json.encode(v: {
            text: "⚠️ ${metric}告警\n主机: ${host}\n当前值: ${string(v: value)}%\n阈值: ${string(v: threshold)}%"
        })
    )
}

// 检查CPU
cpuAlerts = from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => 
        r._measurement == "cpu" and
        r._field == "usage"
    )
    |> mean()
    |> filter(fn: (r) => r._value > thresholds.cpu)
    |> map(fn: (r) => {
        sendAlert(metric: "CPU", host: r.host, value: r._value, threshold: thresholds.cpu)
        return r
    })

// 检查内存
memoryAlerts = from(bucket: "metrics")
    |> range(start: -5m)
    |> filter(fn: (r) => 
        r._measurement == "memory" and
        r._field == "usage"
    )
    |> mean()
    |> filter(fn: (r) => r._value > thresholds.memory)
    |> map(fn: (r) => {
        sendAlert(metric: "内存", host: r.host, value: r._value, threshold: thresholds.memory)
        return r
    })

// 合并结果
union(tables: [cpuAlerts, memoryAlerts])
    |> to(bucket: "alerts-history")

十一、总结 #

定时任务要点:

  1. 任务结构:option task + Flux脚本
  2. 调度方式:every或cron
  3. 常用场景:降采样、聚合、告警
  4. 管理操作:创建、更新、删除、监控
  5. 最佳实践:性能优化、错误处理

下一步,让我们学习连续查询!

最后更新:2026-03-27