写入数据 #
一、写入概述 #
1.1 写入方式 #
text
InfluxDB写入方式:
HTTP API
├── 最通用的方式
├── 支持所有语言
├── 使用行协议
└── 支持批量写入
CLI工具
├── 命令行操作
├── 适合测试和脚本
└── 支持文件导入
客户端库
├── 官方支持多语言
├── 高级API封装
└── 自动重试和批处理
Telegraf
├── 数据采集代理
├── 200+输入插件
└── 自动写入InfluxDB
1.2 写入流程 #
text
写入流程:
数据准备
│
▼
构建行协议
│
▼
发送HTTP请求
│
├── 认证(Token)
│
├── 指定组织和Bucket
│
└── 指定精度
│
▼
写入存储
│
▼
返回结果
二、HTTP API写入 #
2.1 基本写入 #
bash
# 基本写入请求
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw "temperature,location=room1 value=23.5"
# 带时间戳写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw "temperature,location=room1 value=23.5 1704067200000000000"
2.2 指定精度 #
bash
# 纳秒精度(默认)
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=ns" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw "temperature value=23.5 1704067200000000000"
# 毫秒精度
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=ms" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw "temperature value=23.5 1704067200000"
# 秒精度
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw "temperature value=23.5 1704067200"
2.3 批量写入 #
bash
# 批量写入多行数据
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--data-raw '
temperature,location=room1 value=23.5
temperature,location=room2 value=24.0
temperature,location=room3 value=22.8
cpu,host=server01 usage=78.5
memory,host=server01 used=8192i
'
2.4 使用文件写入 #
bash
# 从文件写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--data-binary @data.txt
# data.txt内容
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000
2.5 写入响应 #
text
HTTP响应状态码:
204 No Content
├── 写入成功
└── 无返回内容
400 Bad Request
├── 行协议语法错误
├── 数据类型错误
└── 检查请求格式
401 Unauthorized
├── Token无效
├── Token过期
└── 检查认证信息
403 Forbidden
├── 无写入权限
└── 检查Token权限
404 Not Found
├── Bucket不存在
├── 组织不存在
└── 检查资源名称
413 Request Entity Too Large
├── 请求体过大
└── 减小批量大小
429 Too Many Requests
├── 请求过于频繁
└── 降低写入速率
500 Internal Server Error
├── 服务器内部错误
└── 检查服务器日志
三、CLI写入 #
3.1 基本写入 #
bash
# 使用influx write命令
influx write \
--bucket my-bucket \
--org my-org \
"temperature,location=room1 value=23.5"
# 指定精度
influx write \
--bucket my-bucket \
--org my-org \
--precision s \
"temperature value=23.5 1704067200"
3.2 从文件写入 #
bash
# 从文件写入
influx write \
--bucket my-bucket \
--org my-org \
--file data.txt
# 从多个文件写入
influx write \
--bucket my-bucket \
--org my-org \
--file data1.txt \
--file data2.txt
# 从标准输入写入
cat data.txt | influx write \
--bucket my-bucket \
--org my-org \
--stdin
3.3 使用URL写入 #
bash
# 从URL写入
influx write \
--bucket my-bucket \
--org my-org \
--url https://example.com/data.txt
# 压缩文件
influx write \
--bucket my-bucket \
--org my-org \
--url https://example.com/data.txt.gz \
--gzip
3.4 写入选项 #
bash
# 完整选项示例
influx write \
--bucket my-bucket \
--org my-org \
--precision ms \
--format line-protocol \
--file data.txt \
--skipHeader 1 \
--debug
四、Python客户端写入 #
4.1 安装客户端 #
bash
# 安装influxdb-client
pip install influxdb-client
4.2 基本写入 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# 连接配置
url = "http://localhost:8086"
token = "YOUR_TOKEN"
org = "my-org"
bucket = "my-bucket"
# 创建客户端
client = InfluxDBClient(url=url, token=token, org=org)
# 创建写入API
write_api = client.write_api(write_options=SYNCHRONOUS)
# 写入数据点
point = Point("temperature") \
.tag("location", "room1") \
.field("value", 23.5)
write_api.write(bucket=bucket, record=point)
# 关闭客户端
client.close()
4.3 批量写入 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
# 创建多个数据点
points = [
Point("temperature").tag("location", "room1").field("value", 23.5),
Point("temperature").tag("location", "room2").field("value", 24.0),
Point("temperature").tag("location", "room3").field("value", 22.8),
]
# 批量写入
write_api.write(bucket="my-bucket", record=points)
client.close()
4.4 使用行协议 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
# 使用行协议字符串
line_protocol = "temperature,location=room1 value=23.5"
write_api.write(bucket="my-bucket", record=line_protocol)
# 多行协议
line_protocols = """
temperature,location=room1 value=23.5
temperature,location=room2 value=24.0
temperature,location=room3 value=22.8
"""
write_api.write(bucket="my-bucket", record=line_protocols.strip())
client.close()
4.5 异步写入 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
# 使用异步写入
write_api = client.write_api(write_options=ASYNCHRONOUS)
# 写入数据
point = Point("temperature").tag("location", "room1").field("value", 23.5)
write_api.write(bucket="my-bucket", record=point)
# 异步操作,不等待完成
print("写入请求已发送")
client.close()
4.6 批处理配置 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
# 配置批处理选项
write_options = WriteOptions(
batch_size=1000, # 每批数量
flush_interval=1000, # 刷新间隔(ms)
jitter_interval=0, # 抖动间隔
retry_interval=5000, # 重试间隔(ms)
max_retries=5, # 最大重试次数
max_retry_delay=30000, # 最大重试延迟(ms)
exponential_base=2 # 指数退避基数
)
write_api = client.write_api(write_options=write_options)
# 写入大量数据
for i in range(10000):
point = Point("temperature") \
.tag("location", f"room{i % 10}") \
.field("value", 20 + i * 0.01)
write_api.write(bucket="my-bucket", record=point)
# 确保所有数据写入
write_api.close()
client.close()
五、其他语言客户端 #
5.1 Go客户端 #
go
package main
import (
"context"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
// 创建客户端
client := influxdb2.NewClient("http://localhost:8086", "YOUR_TOKEN")
defer client.Close()
// 获取写入API
writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
// 创建数据点
p := influxdb2.NewPoint(
"temperature",
map[string]string{"location": "room1"},
map[string]interface{}{"value": 23.5},
time.Now(),
)
// 写入数据
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
panic(err)
}
}
5.2 Java客户端 #
java
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import java.time.Instant;
public class InfluxDBWrite {
public static void main(String[] args) {
// 创建客户端
InfluxDBClient client = InfluxDBClientFactory.create(
"http://localhost:8086",
"YOUR_TOKEN".toCharArray()
);
// 创建写入API
WriteApi writeApi = client.getWriteApi();
// 创建数据点
Point point = Point.measurement("temperature")
.addTag("location", "room1")
.addField("value", 23.5)
.time(Instant.now(), WritePrecision.NS);
// 写入数据
writeApi.writePoint("my-bucket", "my-org", point);
// 关闭客户端
client.close();
}
}
5.3 JavaScript/Node.js客户端 #
javascript
const {InfluxDB, Point} = require('@influxdata/influxdb-client');
// 配置客户端
const client = new InfluxDB({
url: 'http://localhost:8086',
token: 'YOUR_TOKEN'
});
// 创建写入API
const writeApi = client.getWriteApi('my-org', 'my-bucket', 'ns');
// 创建数据点
const point = new Point('temperature')
.tag('location', 'room1')
.floatField('value', 23.5);
// 写入数据
writeApi.writePoint(point);
// 刷新并关闭
writeApi.close().then(() => {
console.log('写入完成');
});
六、写入最佳实践 #
6.1 批量写入 #
text
批量写入建议:
批量大小
├── 推荐:1000-5000条/批
├── 最大:取决于网络和服务器
└── 过大会导致超时
写入频率
├── 推荐:1-5秒/批
├── 避免频繁小批量
└── 平衡实时性和效率
示例配置
├── batch_size: 5000
├── flush_interval: 5000ms
└── 并发写入:2-4个连接
6.2 时间戳管理 #
text
时间戳最佳实践:
精度选择
├── 纳秒:高精度场景
├── 毫秒:一般监控场景
└── 秒:低精度场景
时间戳来源
├── 客户端时间:更准确的事件时间
├── 服务端时间:省略时间戳
└── 避免时钟不同步问题
时区处理
├── 使用UTC时间
├── RFC3339格式
└── 注意时区转换
6.3 错误处理 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
try:
write_api.write(bucket="my-bucket", record="temperature value=23.5")
except Exception as e:
print(f"写入失败: {e}")
# 重试逻辑
# 记录错误日志
finally:
client.close()
6.4 性能优化 #
text
性能优化建议:
网络优化
├── 使用HTTP/2
├── 启用压缩
├── 保持连接
└── 就近部署
客户端优化
├── 批量写入
├── 异步写入
├── 连接池
└── 合理重试
数据优化
├── 合理的tag基数
├── 避免过多field
├── 精简数据点
└── 合理保留策略
七、完整示例 #
7.1 监控数据写入脚本 #
python
#!/usr/bin/env python3
import psutil
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
# 配置
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "YOUR_TOKEN"
INFLUX_ORG = "my-org"
INFLUX_BUCKET = "metrics"
def collect_system_metrics():
"""收集系统指标"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
points = [
Point("cpu")
.tag("host", "server01")
.field("usage", cpu_percent),
Point("memory")
.tag("host", "server01")
.field("total", memory.total)
.field("used", memory.used)
.field("percent", memory.percent),
Point("disk")
.tag("host", "server01")
.tag("path", "/")
.field("total", disk.total)
.field("used", disk.used)
.field("percent", disk.percent),
]
return points
def main():
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_options = WriteOptions(batch_size=100, flush_interval=1000)
write_api = client.write_api(write_options=write_options)
try:
while True:
points = collect_system_metrics()
write_api.write(bucket=INFLUX_BUCKET, record=points)
print(f"写入 {len(points)} 个数据点")
time.sleep(10)
except KeyboardInterrupt:
print("\n停止收集")
finally:
write_api.close()
client.close()
if __name__ == "__main__":
main()
八、总结 #
数据写入要点:
- 选择合适方式:HTTP API、CLI或客户端库
- 批量写入:提高写入效率
- 错误处理:实现重试机制
- 性能优化:合理配置批处理参数
- 时间戳管理:选择合适的精度
下一步,让我们学习批量写入!
最后更新:2026-03-27