批量写入 #

一、批量写入概述 #

1.1 为什么需要批量写入 #

text
单条写入 vs 批量写入:

单条写入
├── 每次HTTP请求开销
├── 网络延迟累积
├── 服务器处理开销大
└── 吞吐量低

批量写入
├── 减少HTTP请求次数
├── 降低网络延迟影响
├── 服务器批量处理
└── 吞吐量高

性能对比:
单条写入:~100 points/s
批量写入:~100,000 points/s

1.2 批量写入原理 #

text
批量写入流程:

客户端
├── 收集数据点
├── 缓冲队列
├── 达到阈值触发
└── 发送批量请求
        │
        ▼
服务端
├── 接收批量数据
├── 解析行协议
├── 批量索引
└── 批量写入存储

二、HTTP批量写入 #

2.1 多行数据格式 #

bash
# 批量写入多行数据
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --header "Content-Type: text/plain" \
    --data-binary '
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000
cpu,host=server01 usage=78.5 1704067200000000000
cpu,host=server02 usage=65.2 1704067200000000000
memory,host=server01 used=8192i 1704067200000000000
'

2.2 批量大小限制 #

text
批量大小限制:

请求体大小
├── 默认最大:无明确限制
├── 建议:1-10MB
└── 过大会导致超时

数据点数量
├── 建议:5000-10000条/批
├── 最大:取决于服务器配置
└── 超时时间:默认30秒

网络考虑
├── 带宽限制
├── 延迟影响
└── 重传开销

2.3 从文件批量写入 #

bash
# 创建数据文件
cat > data.txt << 'EOF'
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000
cpu,host=server01 usage=78.5 1704067200000000000
cpu,host=server02 usage=65.2 1704067200000000000
EOF

# 从文件写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --data-binary @data.txt

# 压缩文件写入
gzip data.txt
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
    --header "Authorization: Token YOUR_TOKEN" \
    --header "Content-Encoding: gzip" \
    --data-binary @data.txt.gz

三、客户端批处理配置 #

3.1 Python批处理配置 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions, WriteType

client = InfluxDBClient(
    url="http://localhost:8086",
    token="YOUR_TOKEN",
    org="my-org"
)

# 配置批处理选项
write_options = WriteOptions(
    batch_size=5000,           # 每批数据点数量
    flush_interval=5000,       # 刷新间隔(毫秒)
    jitter_interval=2000,      # 抖动间隔(毫秒)
    retry_interval=5000,       # 重试间隔(毫秒)
    max_retries=5,             # 最大重试次数
    max_retry_delay=30000,     # 最大重试延迟(毫秒)
    exponential_base=2,        # 指数退避基数
    write_type=WriteType.batching  # 写入类型
)

write_api = client.write_api(write_options=write_options)

# 写入大量数据
for i in range(50000):
    point = Point("temperature") \
        .tag("location", f"room{i % 100}") \
        .field("value", 20 + i * 0.001)
    write_api.write(bucket="my-bucket", record=point)

# 确保所有数据刷新
write_api.close()
client.close()

3.2 批处理参数详解 #

text
批处理参数说明:

batch_size
├── 每批数据点数量
├── 达到数量立即发送
├── 推荐值:1000-10000
└── 过大可能导致超时

flush_interval
├── 刷新间隔(毫秒)
├── 定时发送未满批次
├── 推荐值:1000-10000
└── 平衡实时性和效率

jitter_interval
├── 抖动间隔(毫秒)
├── 随机化发送时间
├── 避免同时发送
└── 推荐值:0-2000

retry_interval
├── 重试间隔(毫秒)
├── 失败后等待时间
├── 推荐值:5000
└── 配合指数退避

max_retries
├── 最大重试次数
├── 放弃前的重试次数
├── 推荐值:3-5
└── 过多会延迟错误发现

exponential_base
├── 指数退避基数
├── 重试间隔倍数
├── 推荐值:2
└── 避免服务器过载

3.3 Java批处理配置 #

java
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.write.Point;
import com.influxdb.client.write.events.WriteSuccessEvent;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

public class BatchWriteExample {
    public static void main(String[] args) {
        InfluxDBClient client = InfluxDBClientFactory.create(
            "http://localhost:8086",
            "YOUR_TOKEN".toCharArray()
        );
        
        // 创建批处理写入API
        WriteApi writeApi = client.makeWriteApi();
        
        // 监听写入成功事件
        writeApi.listenEvents(WriteSuccessEvent.class, event -> {
            System.out.println("写入成功: " + event.getBatchSize() + " 条数据");
        });
        
        // 批量写入
        List<Point> points = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Point point = Point.measurement("temperature")
                .addTag("location", "room" + (i % 100))
                .addField("value", 20 + i * 0.001)
                .time(Instant.now(), WritePrecision.NS);
            points.add(point);
        }
        
        writeApi.writePoints("my-bucket", "my-org", points);
        
        // 关闭
        writeApi.close();
        client.close();
    }
}

3.4 Go批处理配置 #

go
package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/influxdata/influxdb-client-go/v2"
    "github.com/influxdata/influxdb-client-go/v2/api"
    "github.com/influxdata/influxdb-client-go/v2/api/write"
)

func main() {
    // 创建客户端
    client := influxdb2.NewClientWithOptions(
        "http://localhost:8086",
        "YOUR_TOKEN",
        influxdb2.DefaultOptions().
            SetBatchSize(5000).
            SetFlushInterval(5000),
    )
    defer client.Close()
    
    // 获取写入API
    writeAPI := client.WriteAPI("my-org", "my-bucket")
    
    // 监听错误
    errorsCh := writeAPI.Errors()
    go func() {
        for err := range errorsCh {
            fmt.Printf("写入错误: %v\n", err)
        }
    }()
    
    // 批量写入
    for i := 0; i < 50000; i++ {
        point := write.NewPoint(
            "temperature",
            map[string]string{"location": fmt.Sprintf("room%d", i%100)},
            map[string]interface{}{"value": 20 + float64(i)*0.001},
            time.Now(),
        )
        writeAPI.WritePoint(point)
    }
    
    // 等待刷新
    writeAPI.Flush()
}

四、批量写入策略 #

4.1 时间窗口策略 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")

# 时间窗口策略:每5秒或1000条发送
write_options = WriteOptions(
    batch_size=1000,
    flush_interval=5000
)

write_api = client.write_api(write_options=write_options)

# 持续写入
while True:
    # 生成数据点
    point = generate_data_point()
    write_api.write(bucket="my-bucket", record=point)
    
    # 5秒内自动发送,或达到1000条发送

4.2 数量优先策略 #

python
# 数量优先:达到数量立即发送
write_options = WriteOptions(
    batch_size=5000,
    flush_interval=60000  # 60秒超时
)

write_api = client.write_api(write_options=write_options)

# 快速写入大量数据
for i in range(100000):
    point = generate_data_point()
    write_api.write(bucket="my-bucket", record=point)
    # 达到5000条立即发送

4.3 混合策略 #

python
# 混合策略:平衡实时性和效率
write_options = WriteOptions(
    batch_size=2000,      # 2000条触发
    flush_interval=3000,  # 或3秒触发
    jitter_interval=500   # 添加抖动
)

write_api = client.write_api(write_options=write_options)

五、并行批量写入 #

5.1 多线程写入 #

python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
import threading
import queue

class BatchWriter:
    def __init__(self, url, token, org, bucket, num_workers=4):
        self.bucket = bucket
        self.queue = queue.Queue()
        self.workers = []
        
        for i in range(num_workers):
            client = InfluxDBClient(url=url, token=token, org=org)
            write_api = client.write_api(
                write_options=WriteOptions(batch_size=5000, flush_interval=5000)
            )
            worker = threading.Thread(
                target=self._worker,
                args=(write_api,)
            )
            worker.daemon = True
            worker.start()
            self.workers.append((worker, client, write_api))
    
    def _worker(self, write_api):
        while True:
            points = self.queue.get()
            if points is None:
                break
            write_api.write(bucket=self.bucket, record=points)
    
    def write(self, points):
        self.queue.put(points)
    
    def close(self):
        for _ in self.workers:
            self.queue.put(None)
        for worker, client, write_api in self.workers:
            worker.join()
            write_api.close()
            client.close()

# 使用示例
writer = BatchWriter(
    url="http://localhost:8086",
    token="YOUR_TOKEN",
    org="my-org",
    bucket="my-bucket",
    num_workers=4
)

# 并行写入
for i in range(100000):
    point = Point("temperature").tag("location", f"room{i%100}").field("value", 20+i*0.001)
    writer.write(point)

writer.close()

5.2 异步写入 #

python
import asyncio
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS

async def write_batch(write_api, bucket, points):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        None,
        lambda: write_api.write(bucket=bucket, record=points)
    )

async def main():
    client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
    write_api = client.write_api(write_options=ASYNCHRONOUS)
    
    # 创建多个写入任务
    tasks = []
    for batch_num in range(10):
        points = [
            Point("temperature")
                .tag("location", f"room{i}")
                .field("value", 20+i*0.1)
            for i in range(1000)
        ]
        tasks.append(write_batch(write_api, "my-bucket", points))
    
    # 并行执行
    await asyncio.gather(*tasks)
    
    client.close()

asyncio.run(main())

六、错误处理和重试 #

6.1 重试机制 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")

# 配置重试选项
write_options = WriteOptions(
    batch_size=5000,
    flush_interval=5000,
    retry_interval=5000,
    max_retries=5,
    max_retry_delay=30000,
    exponential_base=2
)

write_api = client.write_api(write_options=write_options)

# 监听错误
def on_error(e):
    logger.error(f"写入失败: {e}")
    # 可以添加自定义错误处理逻辑

# 写入数据
try:
    write_api.write(bucket="my-bucket", record="temperature value=23.5")
except Exception as e:
    logger.error(f"写入异常: {e}")
    # 手动重试逻辑

6.2 失败数据处理 #

python
import json
from datetime import datetime

class BatchWriterWithFallback:
    def __init__(self, client, bucket, fallback_file="failed_writes.json"):
        self.client = client
        self.bucket = bucket
        self.fallback_file = fallback_file
        self.write_api = client.write_api()
        self.failed_writes = []
    
    def write(self, points):
        try:
            self.write_api.write(bucket=self.bucket, record=points)
        except Exception as e:
            # 记录失败数据
            failed_record = {
                "timestamp": datetime.now().isoformat(),
                "error": str(e),
                "data": points if isinstance(points, str) else str(points)
            }
            self.failed_writes.append(failed_record)
            self._save_failed_writes()
    
    def _save_failed_writes(self):
        with open(self.fallback_file, 'w') as f:
            json.dump(self.failed_writes, f, indent=2)
    
    def retry_failed(self):
        if not self.failed_writes:
            return
        
        success = []
        for record in self.failed_writes:
            try:
                self.write_api.write(bucket=self.bucket, record=record["data"])
                success.append(record)
            except Exception as e:
                record["retry_error"] = str(e)
        
        # 移除成功重试的记录
        for record in success:
            self.failed_writes.remove(record)
        
        self._save_failed_writes()

七、性能优化 #

7.1 批量大小优化 #

text
批量大小选择指南:

小批量 (< 1000)
├── 优点:低延迟,实时性好
├── 缺点:吞吐量低,开销大
└── 适合:实时监控,告警场景

中批量 (1000-10000)
├── 优点:平衡延迟和吞吐
├── 缺点:需要合理配置
└── 适合:一般监控场景

大批量 (> 10000)
├── 优点:高吞吐,低开销
├── 缺点:高延迟,内存占用大
└── 适合:历史数据导入,批量处理

推荐配置:
├── 实时监控:batch_size=1000, flush_interval=1000
├── 一般场景:batch_size=5000, flush_interval=5000
└── 批量导入:batch_size=10000, flush_interval=10000

7.2 网络优化 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions

# 启用压缩
client = InfluxDBClient(
    url="http://localhost:8086",
    token="YOUR_TOKEN",
    org="my-org",
    gzip=True  # 启用gzip压缩
)

# 配置批处理
write_options = WriteOptions(
    batch_size=5000,
    flush_interval=5000
)

write_api = client.write_api(write_options=write_options)

7.3 内存优化 #

python
# 使用生成器避免内存溢出
def generate_points(count):
    for i in range(count):
        yield Point("temperature") \
            .tag("location", f"room{i%100}") \
            .field("value", 20+i*0.001)

# 分批处理大数据集
def write_large_dataset(write_api, bucket, total_points, batch_size=5000):
    batch = []
    for point in generate_points(total_points):
        batch.append(point)
        if len(batch) >= batch_size:
            write_api.write(bucket=bucket, record=batch)
            batch = []
    
    # 写入剩余数据
    if batch:
        write_api.write(bucket=bucket, record=batch)

八、监控批量写入 #

8.1 写入统计 #

python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time

class MonitoredBatchWriter:
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.bucket = bucket
        self.total_points = 0
        self.total_batches = 0
        self.start_time = time.time()
        
        write_options = WriteOptions(
            batch_size=5000,
            flush_interval=5000
        )
        self.write_api = self.client.write_api(write_options=write_options)
    
    def write(self, points):
        count = len(points) if isinstance(points, list) else 1
        self.write_api.write(bucket=self.bucket, record=points)
        self.total_points += count
        self.total_batches += 1
    
    def stats(self):
        elapsed = time.time() - self.start_time
        return {
            "total_points": self.total_points,
            "total_batches": self.total_batches,
            "elapsed_seconds": elapsed,
            "points_per_second": self.total_points / elapsed if elapsed > 0 else 0
        }
    
    def close(self):
        self.write_api.close()
        self.client.close()

# 使用示例
writer = MonitoredBatchWriter(
    url="http://localhost:8086",
    token="YOUR_TOKEN",
    org="my-org",
    bucket="my-bucket"
)

# 写入数据
for i in range(50000):
    point = Point("temperature").tag("location", f"room{i%100}").field("value", 20+i*0.001)
    writer.write(point)

# 打印统计
print(writer.stats())
writer.close()

九、完整示例 #

9.1 高性能批量写入器 #

python
#!/usr/bin/env python3
"""
高性能InfluxDB批量写入器
"""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
import time
import logging
from typing import List, Iterator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class HighPerformanceWriter:
    def __init__(
        self,
        url: str,
        token: str,
        org: str,
        bucket: str,
        batch_size: int = 5000,
        flush_interval: int = 5000,
        num_workers: int = 1
    ):
        self.bucket = bucket
        self.clients = []
        self.write_apis = []
        
        for _ in range(num_workers):
            client = InfluxDBClient(url=url, token=token, org=org)
            write_options = WriteOptions(
                batch_size=batch_size,
                flush_interval=flush_interval,
                retry_interval=5000,
                max_retries=5
            )
            write_api = client.write_api(write_options=write_options)
            self.clients.append(client)
            self.write_apis.append(write_api)
        
        self.current_worker = 0
        self.total_points = 0
        self.start_time = time.time()
    
    def write(self, points: List[Point]):
        write_api = self.write_apis[self.current_worker]
        write_api.write(bucket=self.bucket, record=points)
        self.total_points += len(points)
        self.current_worker = (self.current_worker + 1) % len(self.write_apis)
    
    def write_from_iterator(self, points_iter: Iterator[Point]):
        batch = []
        for point in points_iter:
            batch.append(point)
            if len(batch) >= 5000:
                self.write(batch)
                batch = []
        
        if batch:
            self.write(batch)
    
    def stats(self):
        elapsed = time.time() - self.start_time
        return {
            "total_points": self.total_points,
            "elapsed_seconds": round(elapsed, 2),
            "points_per_second": round(self.total_points / elapsed, 2)
        }
    
    def close(self):
        for write_api in self.write_apis:
            write_api.close()
        for client in self.clients:
            client.close()
        
        stats = self.stats()
        logger.info(f"写入完成: {stats}")

# 使用示例
if __name__ == "__main__":
    def generate_test_data(count: int) -> Iterator[Point]:
        for i in range(count):
            yield Point("temperature") \
                .tag("location", f"room{i%100}") \
                .tag("sensor", f"sensor{i%10}") \
                .field("value", 20 + i * 0.001) \
                .field("humidity", 50 + i * 0.01)
    
    writer = HighPerformanceWriter(
        url="http://localhost:8086",
        token="YOUR_TOKEN",
        org="my-org",
        bucket="my-bucket",
        batch_size=5000,
        flush_interval=5000,
        num_workers=2
    )
    
    try:
        writer.write_from_iterator(generate_test_data(100000))
    finally:
        writer.close()

十、总结 #

批量写入要点:

  1. 合理配置批量大小:平衡吞吐量和延迟
  2. 使用客户端批处理:自动缓冲和发送
  3. 并行写入:多线程提高效率
  4. 错误处理:实现重试和降级
  5. 监控统计:跟踪写入性能

下一步,让我们学习基础查询!

最后更新:2026-03-27