批量写入 #
一、批量写入概述 #
1.1 为什么需要批量写入 #
text
单条写入 vs 批量写入:
单条写入
├── 每次HTTP请求开销
├── 网络延迟累积
├── 服务器处理开销大
└── 吞吐量低
批量写入
├── 减少HTTP请求次数
├── 降低网络延迟影响
├── 服务器批量处理
└── 吞吐量高
性能对比:
单条写入:~100 points/s
批量写入:~100,000 points/s
1.2 批量写入原理 #
text
批量写入流程:
客户端
├── 收集数据点
├── 缓冲队列
├── 达到阈值触发
└── 发送批量请求
│
▼
服务端
├── 接收批量数据
├── 解析行协议
├── 批量索引
└── 批量写入存储
二、HTTP批量写入 #
2.1 多行数据格式 #
bash
# 批量写入多行数据
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--header "Content-Type: text/plain" \
--data-binary '
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000
cpu,host=server01 usage=78.5 1704067200000000000
cpu,host=server02 usage=65.2 1704067200000000000
memory,host=server01 used=8192i 1704067200000000000
'
2.2 批量大小限制 #
text
批量大小限制:
请求体大小
├── 默认最大:无明确限制
├── 建议:1-10MB
└── 过大会导致超时
数据点数量
├── 建议:5000-10000条/批
├── 最大:取决于服务器配置
└── 超时时间:默认30秒
网络考虑
├── 带宽限制
├── 延迟影响
└── 重传开销
2.3 从文件批量写入 #
bash
# 创建数据文件
cat > data.txt << 'EOF'
temperature,location=room1 value=23.5 1704067200000000000
temperature,location=room2 value=24.0 1704067200000000000
temperature,location=room3 value=22.8 1704067200000000000
cpu,host=server01 usage=78.5 1704067200000000000
cpu,host=server02 usage=65.2 1704067200000000000
EOF
# 从文件写入
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--data-binary @data.txt
# 压缩文件写入
gzip data.txt
curl -X POST "http://localhost:8086/api/v2/write?org=my-org&bucket=my-bucket" \
--header "Authorization: Token YOUR_TOKEN" \
--header "Content-Encoding: gzip" \
--data-binary @data.txt.gz
三、客户端批处理配置 #
3.1 Python批处理配置 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions, WriteType
client = InfluxDBClient(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my-org"
)
# 配置批处理选项
write_options = WriteOptions(
batch_size=5000, # 每批数据点数量
flush_interval=5000, # 刷新间隔(毫秒)
jitter_interval=2000, # 抖动间隔(毫秒)
retry_interval=5000, # 重试间隔(毫秒)
max_retries=5, # 最大重试次数
max_retry_delay=30000, # 最大重试延迟(毫秒)
exponential_base=2, # 指数退避基数
write_type=WriteType.batching # 写入类型
)
write_api = client.write_api(write_options=write_options)
# 写入大量数据
for i in range(50000):
point = Point("temperature") \
.tag("location", f"room{i % 100}") \
.field("value", 20 + i * 0.001)
write_api.write(bucket="my-bucket", record=point)
# 确保所有数据刷新
write_api.close()
client.close()
3.2 批处理参数详解 #
text
批处理参数说明:
batch_size
├── 每批数据点数量
├── 达到数量立即发送
├── 推荐值:1000-10000
└── 过大可能导致超时
flush_interval
├── 刷新间隔(毫秒)
├── 定时发送未满批次
├── 推荐值:1000-10000
└── 平衡实时性和效率
jitter_interval
├── 抖动间隔(毫秒)
├── 随机化发送时间
├── 避免同时发送
└── 推荐值:0-2000
retry_interval
├── 重试间隔(毫秒)
├── 失败后等待时间
├── 推荐值:5000
└── 配合指数退避
max_retries
├── 最大重试次数
├── 放弃前的重试次数
├── 推荐值:3-5
└── 过多会延迟错误发现
exponential_base
├── 指数退避基数
├── 重试间隔倍数
├── 推荐值:2
└── 避免服务器过载
3.3 Java批处理配置 #
java
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.write.Point;
import com.influxdb.client.write.events.WriteSuccessEvent;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class BatchWriteExample {
public static void main(String[] args) {
InfluxDBClient client = InfluxDBClientFactory.create(
"http://localhost:8086",
"YOUR_TOKEN".toCharArray()
);
// 创建批处理写入API
WriteApi writeApi = client.makeWriteApi();
// 监听写入成功事件
writeApi.listenEvents(WriteSuccessEvent.class, event -> {
System.out.println("写入成功: " + event.getBatchSize() + " 条数据");
});
// 批量写入
List<Point> points = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
Point point = Point.measurement("temperature")
.addTag("location", "room" + (i % 100))
.addField("value", 20 + i * 0.001)
.time(Instant.now(), WritePrecision.NS);
points.add(point);
}
writeApi.writePoints("my-bucket", "my-org", points);
// 关闭
writeApi.close();
client.close();
}
}
3.4 Go批处理配置 #
go
package main
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
func main() {
// 创建客户端
client := influxdb2.NewClientWithOptions(
"http://localhost:8086",
"YOUR_TOKEN",
influxdb2.DefaultOptions().
SetBatchSize(5000).
SetFlushInterval(5000),
)
defer client.Close()
// 获取写入API
writeAPI := client.WriteAPI("my-org", "my-bucket")
// 监听错误
errorsCh := writeAPI.Errors()
go func() {
for err := range errorsCh {
fmt.Printf("写入错误: %v\n", err)
}
}()
// 批量写入
for i := 0; i < 50000; i++ {
point := write.NewPoint(
"temperature",
map[string]string{"location": fmt.Sprintf("room%d", i%100)},
map[string]interface{}{"value": 20 + float64(i)*0.001},
time.Now(),
)
writeAPI.WritePoint(point)
}
// 等待刷新
writeAPI.Flush()
}
四、批量写入策略 #
4.1 时间窗口策略 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
# 时间窗口策略:每5秒或1000条发送
write_options = WriteOptions(
batch_size=1000,
flush_interval=5000
)
write_api = client.write_api(write_options=write_options)
# 持续写入
while True:
# 生成数据点
point = generate_data_point()
write_api.write(bucket="my-bucket", record=point)
# 5秒内自动发送,或达到1000条发送
4.2 数量优先策略 #
python
# 数量优先:达到数量立即发送
write_options = WriteOptions(
batch_size=5000,
flush_interval=60000 # 60秒超时
)
write_api = client.write_api(write_options=write_options)
# 快速写入大量数据
for i in range(100000):
point = generate_data_point()
write_api.write(bucket="my-bucket", record=point)
# 达到5000条立即发送
4.3 混合策略 #
python
# 混合策略:平衡实时性和效率
write_options = WriteOptions(
batch_size=2000, # 2000条触发
flush_interval=3000, # 或3秒触发
jitter_interval=500 # 添加抖动
)
write_api = client.write_api(write_options=write_options)
五、并行批量写入 #
5.1 多线程写入 #
python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
import threading
import queue
class BatchWriter:
def __init__(self, url, token, org, bucket, num_workers=4):
self.bucket = bucket
self.queue = queue.Queue()
self.workers = []
for i in range(num_workers):
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(
write_options=WriteOptions(batch_size=5000, flush_interval=5000)
)
worker = threading.Thread(
target=self._worker,
args=(write_api,)
)
worker.daemon = True
worker.start()
self.workers.append((worker, client, write_api))
def _worker(self, write_api):
while True:
points = self.queue.get()
if points is None:
break
write_api.write(bucket=self.bucket, record=points)
def write(self, points):
self.queue.put(points)
def close(self):
for _ in self.workers:
self.queue.put(None)
for worker, client, write_api in self.workers:
worker.join()
write_api.close()
client.close()
# 使用示例
writer = BatchWriter(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my-org",
bucket="my-bucket",
num_workers=4
)
# 并行写入
for i in range(100000):
point = Point("temperature").tag("location", f"room{i%100}").field("value", 20+i*0.001)
writer.write(point)
writer.close()
5.2 异步写入 #
python
import asyncio
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
async def write_batch(write_api, bucket, points):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: write_api.write(bucket=bucket, record=points)
)
async def main():
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
write_api = client.write_api(write_options=ASYNCHRONOUS)
# 创建多个写入任务
tasks = []
for batch_num in range(10):
points = [
Point("temperature")
.tag("location", f"room{i}")
.field("value", 20+i*0.1)
for i in range(1000)
]
tasks.append(write_batch(write_api, "my-bucket", points))
# 并行执行
await asyncio.gather(*tasks)
client.close()
asyncio.run(main())
六、错误处理和重试 #
6.1 重试机制 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = InfluxDBClient(url="http://localhost:8086", token="YOUR_TOKEN", org="my-org")
# 配置重试选项
write_options = WriteOptions(
batch_size=5000,
flush_interval=5000,
retry_interval=5000,
max_retries=5,
max_retry_delay=30000,
exponential_base=2
)
write_api = client.write_api(write_options=write_options)
# 监听错误
def on_error(e):
logger.error(f"写入失败: {e}")
# 可以添加自定义错误处理逻辑
# 写入数据
try:
write_api.write(bucket="my-bucket", record="temperature value=23.5")
except Exception as e:
logger.error(f"写入异常: {e}")
# 手动重试逻辑
6.2 失败数据处理 #
python
import json
from datetime import datetime
class BatchWriterWithFallback:
def __init__(self, client, bucket, fallback_file="failed_writes.json"):
self.client = client
self.bucket = bucket
self.fallback_file = fallback_file
self.write_api = client.write_api()
self.failed_writes = []
def write(self, points):
try:
self.write_api.write(bucket=self.bucket, record=points)
except Exception as e:
# 记录失败数据
failed_record = {
"timestamp": datetime.now().isoformat(),
"error": str(e),
"data": points if isinstance(points, str) else str(points)
}
self.failed_writes.append(failed_record)
self._save_failed_writes()
def _save_failed_writes(self):
with open(self.fallback_file, 'w') as f:
json.dump(self.failed_writes, f, indent=2)
def retry_failed(self):
if not self.failed_writes:
return
success = []
for record in self.failed_writes:
try:
self.write_api.write(bucket=self.bucket, record=record["data"])
success.append(record)
except Exception as e:
record["retry_error"] = str(e)
# 移除成功重试的记录
for record in success:
self.failed_writes.remove(record)
self._save_failed_writes()
七、性能优化 #
7.1 批量大小优化 #
text
批量大小选择指南:
小批量 (< 1000)
├── 优点:低延迟,实时性好
├── 缺点:吞吐量低,开销大
└── 适合:实时监控,告警场景
中批量 (1000-10000)
├── 优点:平衡延迟和吞吐
├── 缺点:需要合理配置
└── 适合:一般监控场景
大批量 (> 10000)
├── 优点:高吞吐,低开销
├── 缺点:高延迟,内存占用大
└── 适合:历史数据导入,批量处理
推荐配置:
├── 实时监控:batch_size=1000, flush_interval=1000
├── 一般场景:batch_size=5000, flush_interval=5000
└── 批量导入:batch_size=10000, flush_interval=10000
7.2 网络优化 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
# 启用压缩
client = InfluxDBClient(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my-org",
gzip=True # 启用gzip压缩
)
# 配置批处理
write_options = WriteOptions(
batch_size=5000,
flush_interval=5000
)
write_api = client.write_api(write_options=write_options)
7.3 内存优化 #
python
# 使用生成器避免内存溢出
def generate_points(count):
for i in range(count):
yield Point("temperature") \
.tag("location", f"room{i%100}") \
.field("value", 20+i*0.001)
# 分批处理大数据集
def write_large_dataset(write_api, bucket, total_points, batch_size=5000):
batch = []
for point in generate_points(total_points):
batch.append(point)
if len(batch) >= batch_size:
write_api.write(bucket=bucket, record=batch)
batch = []
# 写入剩余数据
if batch:
write_api.write(bucket=bucket, record=batch)
八、监控批量写入 #
8.1 写入统计 #
python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import WriteOptions
import time
class MonitoredBatchWriter:
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.total_points = 0
self.total_batches = 0
self.start_time = time.time()
write_options = WriteOptions(
batch_size=5000,
flush_interval=5000
)
self.write_api = self.client.write_api(write_options=write_options)
def write(self, points):
count = len(points) if isinstance(points, list) else 1
self.write_api.write(bucket=self.bucket, record=points)
self.total_points += count
self.total_batches += 1
def stats(self):
elapsed = time.time() - self.start_time
return {
"total_points": self.total_points,
"total_batches": self.total_batches,
"elapsed_seconds": elapsed,
"points_per_second": self.total_points / elapsed if elapsed > 0 else 0
}
def close(self):
self.write_api.close()
self.client.close()
# 使用示例
writer = MonitoredBatchWriter(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my-org",
bucket="my-bucket"
)
# 写入数据
for i in range(50000):
point = Point("temperature").tag("location", f"room{i%100}").field("value", 20+i*0.001)
writer.write(point)
# 打印统计
print(writer.stats())
writer.close()
九、完整示例 #
9.1 高性能批量写入器 #
python
#!/usr/bin/env python3
"""
高性能InfluxDB批量写入器
"""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import WriteOptions
import time
import logging
from typing import List, Iterator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HighPerformanceWriter:
def __init__(
self,
url: str,
token: str,
org: str,
bucket: str,
batch_size: int = 5000,
flush_interval: int = 5000,
num_workers: int = 1
):
self.bucket = bucket
self.clients = []
self.write_apis = []
for _ in range(num_workers):
client = InfluxDBClient(url=url, token=token, org=org)
write_options = WriteOptions(
batch_size=batch_size,
flush_interval=flush_interval,
retry_interval=5000,
max_retries=5
)
write_api = client.write_api(write_options=write_options)
self.clients.append(client)
self.write_apis.append(write_api)
self.current_worker = 0
self.total_points = 0
self.start_time = time.time()
def write(self, points: List[Point]):
write_api = self.write_apis[self.current_worker]
write_api.write(bucket=self.bucket, record=points)
self.total_points += len(points)
self.current_worker = (self.current_worker + 1) % len(self.write_apis)
def write_from_iterator(self, points_iter: Iterator[Point]):
batch = []
for point in points_iter:
batch.append(point)
if len(batch) >= 5000:
self.write(batch)
batch = []
if batch:
self.write(batch)
def stats(self):
elapsed = time.time() - self.start_time
return {
"total_points": self.total_points,
"elapsed_seconds": round(elapsed, 2),
"points_per_second": round(self.total_points / elapsed, 2)
}
def close(self):
for write_api in self.write_apis:
write_api.close()
for client in self.clients:
client.close()
stats = self.stats()
logger.info(f"写入完成: {stats}")
# 使用示例
if __name__ == "__main__":
def generate_test_data(count: int) -> Iterator[Point]:
for i in range(count):
yield Point("temperature") \
.tag("location", f"room{i%100}") \
.tag("sensor", f"sensor{i%10}") \
.field("value", 20 + i * 0.001) \
.field("humidity", 50 + i * 0.01)
writer = HighPerformanceWriter(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my-org",
bucket="my-bucket",
batch_size=5000,
flush_interval=5000,
num_workers=2
)
try:
writer.write_from_iterator(generate_test_data(100000))
finally:
writer.close()
十、总结 #
批量写入要点:
- 合理配置批量大小:平衡吞吐量和延迟
- 使用客户端批处理:自动缓冲和发送
- 并行写入:多线程提高效率
- 错误处理:实现重试和降级
- 监控统计:跟踪写入性能
下一步,让我们学习基础查询!
最后更新:2026-03-27