定时任务 #
一、任务概述 #
1.1 什么是任务 #
text
InfluxDB任务:
定义
├── 定期执行的Flux脚本
├── 自动化数据处理
└── 无需人工干预
用途
├── 数据降采样
├── 数据聚合
├── 数据清理
├── 告警检查
└── 数据备份
1.2 任务结构 #
text
任务结构:
option task = {
name: "task-name", // 任务名称
every: 1h, // 执行间隔
offset: 0m, // 偏移时间
cron: "0 * * * *", // Cron表达式(可选)
}
// Flux查询语句
from(bucket: "source")
|> range(start: -task.every)
// ... 数据处理
|> to(bucket: "destination")
二、创建任务 #
2.1 Web界面创建 #
text
创建步骤:
1. 登录InfluxDB UI
└── http://localhost:8086
2. 进入任务页面
└── 左侧菜单 → Tasks
3. 创建任务
├── 点击 "Create Task"
├── 输入任务名称
├── 设置执行计划
└── 编写Flux脚本
4. 保存任务
└── 点击 "Save"
2.2 CLI创建 #
bash
# 从文件创建任务
influx task create --file task.flux
# task.flux内容
option task = {
name: "downsample-cpu",
every: 1h
}
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-downsampled", org: "my-org")
2.3 API创建 #
bash
# 使用API创建任务
curl -X POST "http://localhost:8086/api/v2/tasks" \
--header "Authorization: Token YOUR_TOKEN" \
--header "Content-Type: application/json" \
--data '{
"name": "downsample-cpu",
"every": "1h",
"flux": "option task = {name: \"downsample-cpu\", every: 1h}\n\nfrom(bucket: \"metrics\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"cpu\")\n |> aggregateWindow(every: 5m, fn: mean)\n |> to(bucket: \"metrics-downsampled\")"
}'
三、任务调度 #
3.1 固定间隔 #
flux
// 每小时执行
option task = {
name: "hourly-task",
every: 1h
}
// 每5分钟执行
option task = {
name: "5min-task",
every: 5m
}
// 每天执行
option task = {
name: "daily-task",
every: 24h
}
3.2 Cron表达式 #
flux
// 每小时整点执行
option task = {
name: "cron-hourly",
cron: "0 * * * *"
}
// 每天凌晨2点执行
option task = {
name: "cron-daily",
cron: "0 2 * * *"
}
// 每周一早上8点执行
option task = {
name: "cron-weekly",
cron: "0 8 * * 1"
}
// 每月1号凌晨执行
option task = {
name: "cron-monthly",
cron: "0 0 1 * *"
}
text
Cron表达式格式:
字段 允许值 特殊字符
─────────────────────────────────────
分钟 0-59 * / , -
小时 0-23 * / , -
日期 1-31 * / , ?
月份 1-12 * / , -
星期 0-6 * / , ?
特殊字符:
* - 任意值
/ - 间隔
, - 列表
- - 范围
? - 不指定
3.3 偏移时间 #
flux
// 延迟5分钟执行
option task = {
name: "offset-task",
every: 1h,
offset: 5m
}
// 等待数据完全写入后再处理
option task = {
name: "late-data-task",
every: 1h,
offset: 10m
}
四、常用任务示例 #
4.1 数据降采样 #
flux
// 降采样任务:将原始数据聚合为5分钟间隔
option task = {
name: "downsample-metrics",
every: 5m
}
from(bucket: "metrics-raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "metrics-downsampled", org: "my-org")
4.2 数据聚合 #
flux
// 每小时聚合任务
option task = {
name: "hourly-aggregation",
every: 1h
}
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> group(columns: ["service", "endpoint"])
|> aggregateWindow(every: 1h, fn: sum)
|> to(bucket: "metrics-hourly", org: "my-org")
4.3 数据清理 #
flux
// 清理过期数据
option task = {
name: "cleanup-old-data",
every: 24h
}
import "date"
from(bucket: "logs")
|> range(start: -30d, stop: -7d)
|> filter(fn: (r) => r._measurement == "application")
|> delete()
4.4 告警检查 #
flux
// CPU使用率告警任务
option task = {
name: "cpu-alert",
every: 5m
}
import "http"
import "json"
threshold = 80.0
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage"
)
|> mean()
|> filter(fn: (r) => r._value > threshold)
|> map(fn: (r) => {
http.post(
url: "https://hooks.slack.com/services/xxx",
headers: {"Content-Type": "application/json"},
data: json.encode(v: {
text: "CPU告警: ${r.host} 使用率 ${string(v: r._value)}%"
})
)
return r
})
4.5 数据迁移 #
flux
// 数据迁移任务
option task = {
name: "data-migration",
every: 1h
}
from(bucket: "source-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> to(bucket: "dest-bucket", org: "my-org")
五、管理任务 #
5.1 查看任务 #
bash
# 列出所有任务
influx task list
# 查看特定任务
influx task list --name "downsample-cpu"
# 查看任务详情
influx task list --id TASK_ID
5.2 更新任务 #
bash
# 更新任务
influx task update \
--id TASK_ID \
--name "new-task-name" \
--every 30m
# 从文件更新
influx task update --id TASK_ID --file updated-task.flux
5.3 启用/禁用任务 #
bash
# 禁用任务
influx task update --id TASK_ID --status inactive
# 启用任务
influx task update --id TASK_ID --status active
5.4 删除任务 #
bash
# 删除任务
influx task delete --id TASK_ID
# 按名称删除
influx task delete --name "old-task"
六、任务日志 #
6.1 查看日志 #
bash
# 查看任务运行日志
influx task log list --id TASK_ID
# 查看特定运行的日志
influx task log list --id TASK_ID --run-id RUN_ID
6.2 Web界面查看 #
text
查看任务日志:
1. 进入Tasks页面
2. 点击任务名称
3. 选择"Run History"标签
4. 查看运行记录和日志
七、任务监控 #
7.1 运行状态 #
text
任务状态:
active
├── 任务正在运行
├── 按计划执行
└── 正常状态
inactive
├── 任务已暂停
├── 不会执行
└── 手动禁用
failed
├── 执行失败
├── 需要检查
└── 查看日志排查
7.2 监控任务执行 #
flux
// 监控任务执行情况
import "influxdata/influxdb"
import "influxdata/influxdb/monitor"
monitor.from(start: -1h)
|> filter(fn: (r) => r._measurement == "task")
|> filter(fn: (r) => r.status == "failed")
八、高级任务技巧 #
8.1 条件执行 #
flux
// 条件执行任务
option task = {
name: "conditional-task",
every: 1h
}
result = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean()
// 只有超过阈值才写入
result
|> filter(fn: (r) => r._value > 70)
|> to(bucket: "alerts")
8.2 多Bucket操作 #
flux
// 多Bucket数据聚合
option task = {
name: "multi-bucket-task",
every: 1h
}
cpu = from(bucket: "metrics-cpu")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
memory = from(bucket: "metrics-memory")
|> range(start: -1h)
|> aggregateWindow(every: 1h, fn: mean)
union(tables: [cpu, memory])
|> to(bucket: "metrics-combined")
8.3 错误处理 #
flux
// 带错误处理的任务
option task = {
name: "error-handling-task",
every: 5m
}
import "experimental"
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> experimental.set(o: {error: "none"})
|> to(bucket: "processed")
九、任务最佳实践 #
9.1 性能优化 #
text
任务性能建议:
时间范围
├── 使用适当的时间范围
├── 避免查询过大范围
└── 使用task.every变量
数据处理
├── 先过滤再处理
├── 使用聚合减少数据量
└── 批量写入
执行时间
├── 合理设置执行间隔
├── 避免任务重叠
└── 使用offset延迟
9.2 可靠性建议 #
text
任务可靠性建议:
错误处理
├── 添加错误日志
├── 实现重试逻辑
└── 监控任务状态
数据一致性
├── 使用幂等操作
├── 处理重复数据
└── 检查数据完整性
监控告警
├── 监控任务执行
├── 失败时发送告警
└── 定期检查日志
十、完整示例 #
10.1 综合监控任务 #
flux
// 综合监控任务
option task = {
name: "comprehensive-monitoring",
every: 5m,
offset: 1m
}
import "http"
import "json"
import "date"
// 阈值配置
thresholds = {
cpu: 80.0,
memory: 85.0,
disk: 90.0
}
// 发送告警函数
sendAlert = (metric, host, value, threshold) => {
http.post(
url: "https://hooks.slack.com/services/xxx",
headers: {"Content-Type": "application/json"},
data: json.encode(v: {
text: "⚠️ ${metric}告警\n主机: ${host}\n当前值: ${string(v: value)}%\n阈值: ${string(v: threshold)}%"
})
)
}
// 检查CPU
cpuAlerts = from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage"
)
|> mean()
|> filter(fn: (r) => r._value > thresholds.cpu)
|> map(fn: (r) => {
sendAlert(metric: "CPU", host: r.host, value: r._value, threshold: thresholds.cpu)
return r
})
// 检查内存
memoryAlerts = from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) =>
r._measurement == "memory" and
r._field == "usage"
)
|> mean()
|> filter(fn: (r) => r._value > thresholds.memory)
|> map(fn: (r) => {
sendAlert(metric: "内存", host: r.host, value: r._value, threshold: thresholds.memory)
return r
})
// 合并结果
union(tables: [cpuAlerts, memoryAlerts])
|> to(bucket: "alerts-history")
十一、总结 #
定时任务要点:
- 任务结构:option task + Flux脚本
- 调度方式:every或cron
- 常用场景:降采样、聚合、告警
- 管理操作:创建、更新、删除、监控
- 最佳实践:性能优化、错误处理
下一步,让我们学习连续查询!
最后更新:2026-03-27