写入数据 #

一、写入概述 #

1.1 写入方式 #

text
InfluxDB写入方式:

HTTP API
├── 最通用的方式
├── 支持所有语言
├── 使用行协议
└── 支持批量写入

CLI工具
├── 命令行操作
├── 适合测试和脚本
└── 支持文件导入

客户端库
├── 官方支持多语言
├── 高级API封装
└── 自动重试和批处理

Telegraf
├── 数据采集代理
├── 200+输入插件
└── 自动写入InfluxDB

1.2 写入流程 #

text
写入流程:

数据准备
    │
    ▼
构建行协议
    │
    ▼
发送HTTP请求
    │
    ├── 认证(Token)
    │
    ├── 指定组织和Bucket
    │
    └── 指定精度
            │
            ▼
        写入存储
            │
            ▼
        返回结果

二、HTTP API写入 #

2.1 基本写入 #

bash
# 基本写入请求
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw "temperature,location=room1 value=23.5"

# 带时间戳写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw "temperature,location=room1 value=23.5 1704067200000000000"

2.2 指定精度 #

bash
# 纳秒精度(默认)
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=ns" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw "temperature value=23.5 1704067200000000000"

# 毫秒精度
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=ms" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw "temperature value=23.5 1704067200000"

# 秒精度
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket&precision=s" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw "temperature value=23.5 1704067200"

2.3 批量写入 #

bash
# 批量写入多行数据
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-raw '
temperature,location=room1 value=23.5
temperature,location=room2 value=24.0
temperature,location=room3 value=22.8
cpu,host=server01 usage=78.5
memory,host=server01 used=8192i
'

2.4 使用文件写入 #

bash
# 从文件写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-binary @data.txt

# data.txt内容
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000

2.5 写入响应 #

text
HTTP响应状态码:

204 No Content
├── 写入成功
└── 无返回内容

400 Bad Request
├── 行协议语法错误
├── 数据类型错误
└── 检查请求格式

401 Unauthorized
├── Token无效
├── Token过期
└── 检查认证信息

403 Forbidden
├── 无写入权限
└── 检查Token权限

404 Not Found
├── Bucket不存在
├── 组织不存在
└── 检查资源名称

413 Request Entity Too Large
├── 请求体过大
└── 减小批量大小

429 Too Many Requests
├── 请求过于频繁
└── 降低写入速率

500 Internal Server Error
├── 服务器内部错误
└── 检查服务器日志

三、CLI写入 #

3.1 基本写入 #

bash
# 使用influx write命令
influx write \
    --bucket my-bucket \
    --org my-org \
    "temperature,location=room1 value=23.5"

# 指定精度
influx write \
    --bucket my-bucket \
    --org my-org \
    --precision s \
    "temperature value=23.5 1704067200"

3.2 从文件写入 #

bash
# 从文件写入
influx write \
    --bucket my-bucket \
    --org my-org \
    --file data.txt

# 从多个文件写入
influx write \
    --bucket my-bucket \
    --org my-org \
    --file data1.txt \
    --file data2.txt

# 从标准输入写入
cat data.txt | influx write \
    --bucket my-bucket \
    --org my-org \
    --stdin

3.3 使用URL写入 #

bash
# 从URL写入
influx write \
    --bucket my-bucket \
    --org my-org \
    --url https://example.com/data.txt

# 压缩文件
influx write \
    --bucket my-bucket \
    --org my-org \
    --url https://example.com/data.txt.gz \
    --gzip

3.4 写入选项 #

bash
# 完整选项示例
influx write \
    --bucket my-bucket \
    --org my-org \
    --precision ms \
    --format line-protocol \
    --file data.txt \
    --skipHeader 1 \
    --debug

四、Python客户端写入 #

4.1 安装客户端 #

bash
# 安装influxdb-client
pip install influxdb-client

4.2 基本写入 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# 连接配置
url = "http://localhost:8086"
token = "YOUR_TOKEN"
org = "my-org"
bucket = "my-bucket"

# 创建客户端
client = InfluxDBClient(url=url, token=token, org=org)

# 创建写入API
write_api = client.write_api(write_options=SYNCHRONOUS)

# 写入数据点
point = Point("temperature") \
    .tag("location", "room1") \
    .field("value", 23.5)

write_api.write(bucket=bucket, record=point)

# 关闭客户端
client.close()

4.3 批量写入 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

# 创建多个数据点
points = [
    Point("temperature").tag("location", "room1").field("value", 23.5),
    Point("temperature").tag("location", "room2").field("value", 24.0),
    Point("temperature").tag("location", "room3").field("value", 22.8),
]

# 批量写入
write_api.write(bucket="my-bucket", record=points)

client.close()

4.4 使用行协议 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

# 使用行协议字符串
line_protocol = "temperature,location=room1 value=23.5"
write_api.write(bucket="my-bucket", record=line_protocol)

# 多行协议
line_protocols = """
temperature,location=room1 value=23.5
temperature,location=room2 value=24.0
temperature,location=room3 value=22.8
"""
write_api.write(bucket="my-bucket", record=line_protocols.strip())

client.close()

4.5 异步写入 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")

# 使用异步写入
write_api = client.write_api(write_options=ASYNCHRONOUS)

# 写入数据
point = Point("temperature").tag("location", "room1").field("value", 23.5)
write_api.write(bucket="my-bucket", record=point)

# 异步操作,不等待完成
print("写入请求已发送")

client.close()

4.6 批处理配置 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")

# 配置批处理选项
write_options = WriteOptions(
    batch_size=1000,        # 每批数量
    flush_interval=1000,    # 刷新间隔(ms)
    jitter_interval=0,      # 抖动间隔
    retry_interval=5000,    # 重试间隔(ms)
    max_retries=5,          # 最大重试次数
    max_retry_delay=30000,  # 最大重试延迟(ms)
    exponential_base=2      # 指数退避基数
)

write_api = client.write_api(write_options=write_options)

# 写入大量数据
for i in range(10000):
    point = Point("temperature") \
        .tag("location", f"room{i % 10}") \
        .field("value", 20 + i * 0.01)
    write_api.write(bucket="my-bucket", record=point)

# 确保所有数据写入
write_api.close()
client.close()

五、其他语言客户端 #

5.1 Go客户端 #

go
package main

import (
    "context"
    "time"
    
    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // 创建客户端
    client := influxdb2.NewClient("http://localhost:8086", "YOUR_TOKEN")
    defer client.Close()
    
    // 获取写入API
    writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
    
    // 创建数据点
    p := influxdb2.NewPoint(
        "temperature",
        map[string]string{"location": "room1"},
        map[string]interface{}{"value": 23.5},
        time.Now(),
    )
    
    // 写入数据
    err := writeAPI.WritePoint(context.Background(), p)
    if err != nil {
        panic(err)
    }
}

5.2 Java客户端 #

java
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;

import java.time.Instant;

public class InfluxDBWrite {
    public static void main(String[] args) {
        // 创建客户端
        InfluxDBClient client = InfluxDBClientFactory.create(
            "http://localhost:8086", 
            "YOUR_TOKEN".toCharArray()
        );
        
        // 创建写入API
        WriteApi writeApi = client.getWriteApi();
        
        // 创建数据点
        Point point = Point.measurement("temperature")
            .addTag("location", "room1")
            .addField("value", 23.5)
            .time(Instant.now(), WritePrecision.NS);
        
        // 写入数据
        writeApi.writePoint("my-bucket", "my-org", point);
        
        // 关闭客户端
        client.close();
    }
}

5.3 JavaScript/Node.js客户端 #

javascript
const {InfluxDB, Point} = require('@influxdata/influxdb-client');

// 配置客户端
const client = new InfluxDB({
    url: 'http://localhost:8086',
    token: 'YOUR_TOKEN'
});

// 创建写入API
const writeApi = client.getWriteApi('my-org', 'my-bucket', 'ns');

// 创建数据点
const point = new Point('temperature')
    .tag('location', 'room1')
    .floatField('value', 23.5);

// 写入数据
writeApi.writePoint(point);

// 刷新并关闭
writeApi.close().then(() => {
    console.log('写入完成');
});

六、写入最佳实践 #

6.1 批量写入 #

text
批量写入建议:

批量大小
├── 推荐:1000-5000条/批
├── 最大:取决于网络和服务器
└── 过大会导致超时

写入频率
├── 推荐:1-5秒/批
├── 避免频繁小批量
└── 平衡实时性和效率

示例配置
├── batch_size: 5000
├── flush_interval: 5000ms
└── 并发写入:2-4个连接

6.2 时间戳管理 #

text
时间戳最佳实践:

精度选择
├── 纳秒:高精度场景
├── 毫秒:一般监控场景
└── 秒:低精度场景

时间戳来源
├── 客户端时间:更准确的事件时间
├── 服务端时间:省略时间戳
└── 避免时钟不同步问题

时区处理
├── 使用UTC时间
├── RFC3339格式
└── 注意时区转换

6.3 错误处理 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

try:
    write_api.write(bucket="my-bucket", record="temperature value=23.5")
except Exception as e:
    print(f"写入失败: {e}")
    # 重试逻辑
    # 记录错误日志
finally:
    client.close()

6.4 性能优化 #

text
性能优化建议:

网络优化
├── 使用HTTP/2
├── 启用压缩
├── 保持连接
└── 就近部署

客户端优化
├── 批量写入
├── 异步写入
├── 连接池
└── 合理重试

数据优化
├── 合理的tag基数
├── 避免过多field
├── 精简数据点
└── 合理保留策略

七、完整示例 #

7.1 监控数据写入脚本 #

python
#!/usr/bin/env python3
import psutil
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions

# 配置
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "YOUR_TOKEN"
INFLUX_ORG = "my-org"
INFLUX_BUCKET = "metrics"

def collect_system_metrics():
    """收集系统指标"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    
    points = [
        Point("cpu")
            .tag("host", "server01")
            .field("usage", cpu_percent),
        Point("memory")
            .tag("host", "server01")
            .field("total", memory.total)
            .field("used", memory.used)
            .field("percent", memory.percent),
        Point("disk")
            .tag("host", "server01")
            .tag("path", "/")
            .field("total", disk.total)
            .field("used", disk.used)
            .field("percent", disk.percent),
    ]
    return points

def main():
    client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
    
    write_options = WriteOptions(batch_size=100, flush_interval=1000)
    write_api = client.write_api(write_options=write_options)
    
    try:
        while True:
            points = collect_system_metrics()
            write_api.write(bucket=INFLUX_BUCKET, record=points)
            print(f"写入 {len(points)} 个数据点")
            time.sleep(10)
    except KeyboardInterrupt:
        print("\n停止收集")
    finally:
        write_api.close()
        client.close()

if __name__ == "__main__":
    main()

八、总结 #

数据写入要点:

  1. 选择合适方式:HTTP API、CLI或客户端库
  2. 批量写入:提高写入效率
  3. 错误处理:实现重试机制
  4. 性能优化:合理配置批处理参数
  5. 时间戳管理:选择合适的精度

下一步,让我们学习批量写入!

最后更新:2026-03-27