RocksDB批量写入 #
一、WriteBatch基础 #
1.1 什么是WriteBatch #
WriteBatch是RocksDB提供的原子批量写入机制,可以将多个写操作打包成一个原子操作执行。
text
WriteBatch特点:
├── 原子性 - 全部成功或全部失败
├── 高性能 - 减少IO次数
├── 有序性 - 操作按顺序执行
└── 一致性 - 保证数据一致性
1.2 基本使用 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/write_batch.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 创建WriteBatch
rocksdb::WriteBatch batch;
// 添加操作到batch
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Put("key3", "value3");
batch.Delete("key1"); // 可以包含删除操作
// 原子执行
status = db->Write(rocksdb::WriteOptions(), &batch);
if (status.ok()) {
std::cout << "Batch write successful!" << std::endl;
} else {
std::cerr << "Batch write failed: " << status.ToString() << std::endl;
}
delete db;
return 0;
}
1.3 WriteBatch操作类型 #
cpp
rocksdb::WriteBatch batch;
// Put操作
batch.Put("key1", "value1");
// Delete操作
batch.Delete("key2");
// SingleDelete操作
batch.SingleDelete("key3");
// DeleteRange操作
batch.DeleteRange("start_key", "end_key");
// Merge操作
batch.Merge("key4", "merge_value");
// 嵌套WriteBatch
rocksdb::WriteBatch sub_batch;
sub_batch.Put("key5", "value5");
batch.PutLogData(sub_batch.Data()); // 添加日志数据
二、批量写入场景 #
2.1 批量数据导入 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>
#include <iostream>
void BatchImport(rocksdb::DB* db,
const std::vector<std::pair<std::string, std::string>>& data,
size_t batch_size = 1000) {
rocksdb::WriteBatch batch;
size_t count = 0;
for (const auto& [key, value] : data) {
batch.Put(key, value);
count++;
// 达到批次大小时提交
if (count >= batch_size) {
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) {
std::cerr << "Batch write failed: " << status.ToString() << std::endl;
return;
}
batch.Clear();
count = 0;
}
}
// 写入剩余数据
if (count > 0) {
db->Write(rocksdb::WriteOptions(), &batch);
}
}
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 准备大量数据
std::vector<std::pair<std::string, std::string>> data;
for (int i = 0; i < 100000; i++) {
data.push_back({"key_" + std::to_string(i), "value_" + std::to_string(i)});
}
// 批量导入
BatchImport(db, data, 1000);
std::cout << "Import completed!" << std::endl;
delete db;
return 0;
}
2.2 原子更新多个键 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
// 原子转账操作
bool Transfer(rocksdb::DB* db,
const std::string& from_account,
const std::string& to_account,
int amount) {
// 读取当前余额
std::string from_balance_str, to_balance_str;
rocksdb::Status status;
status = db->Get(rocksdb::ReadOptions(), from_account, &from_balance_str);
if (!status.ok()) return false;
status = db->Get(rocksdb::ReadOptions(), to_account, &to_balance_str);
if (!status.ok()) return false;
int from_balance = std::stoi(from_balance_str);
int to_balance = std::stoi(to_balance_str);
// 检查余额
if (from_balance < amount) {
return false;
}
// 原子更新
rocksdb::WriteBatch batch;
batch.Put(from_account, std::to_string(from_balance - amount));
batch.Put(to_account, std::to_string(to_balance + amount));
status = db->Write(rocksdb::WriteOptions(), &batch);
return status.ok();
}
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/bank_db", &db);
// 初始化账户
db->Put(rocksdb::WriteOptions(), "account:A", "1000");
db->Put(rocksdb::WriteOptions(), "account:B", "500");
// 转账
if (Transfer(db, "account:A", "account:B", 200)) {
std::cout << "Transfer successful!" << std::endl;
}
delete db;
return 0;
}
2.3 批量删除 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>
void BatchDelete(rocksdb::DB* db, const std::vector<std::string>& keys) {
rocksdb::WriteBatch batch;
for (const auto& key : keys) {
batch.Delete(key);
}
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) {
std::cerr << "Batch delete failed: " << status.ToString() << std::endl;
}
}
// 使用示例
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入测试数据
for (int i = 0; i < 100; i++) {
db->Put(rocksdb::WriteOptions(), "key_" + std::to_string(i), "value");
}
// 批量删除
std::vector<std::string> keys_to_delete;
for (int i = 20; i < 40; i++) {
keys_to_delete.push_back("key_" + std::to_string(i));
}
BatchDelete(db, keys_to_delete);
delete db;
return 0;
}
三、WriteBatch高级用法 #
3.1 设置回调 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/write_batch_base.h>
class BatchHandler : public rocksdb::WriteBatch::Handler {
public:
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
std::cout << "Put: " << key.ToString() << " = " << value.ToString() << std::endl;
return rocksdb::Status::OK();
}
rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
std::cout << "Delete: " << key.ToString() << std::endl;
return rocksdb::Status::OK();
}
};
void IterateBatch(const rocksdb::WriteBatch& batch) {
BatchHandler handler;
batch.Iterate(&handler);
}
3.2 获取批次大小 #
cpp
#include <rocksdb/write_batch.h>
void PrintBatchInfo(const rocksdb::WriteBatch& batch) {
std::cout << "Batch size: " << batch.GetDataSize() << " bytes" << std::endl;
std::cout << "Count: " << batch.Count() << std::endl;
}
3.3 合并WriteBatch #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
void MergeBatches(rocksdb::DB* db) {
rocksdb::WriteBatch batch1;
batch1.Put("key1", "value1");
batch1.Put("key2", "value2");
rocksdb::WriteBatch batch2;
batch2.Put("key3", "value3");
batch2.Delete("key1");
// 合并批次
rocksdb::WriteBatch merged_batch;
merged_batch.Put("key1", "value1");
merged_batch.Put("key2", "value2");
merged_batch.Put("key3", "value3");
merged_batch.Delete("key1");
db->Write(rocksdb::WriteOptions(), &merged_batch);
}
四、性能优化 #
4.1 选择合适的批次大小 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <chrono>
// 测试不同批次大小的性能
void BenchmarkBatchSizes(rocksdb::DB* db, int total_records) {
std::vector<size_t> batch_sizes = {1, 10, 100, 1000, 10000};
for (size_t batch_size : batch_sizes) {
auto start = std::chrono::high_resolution_clock::now();
rocksdb::WriteBatch batch;
for (int i = 0; i < total_records; i++) {
batch.Put("key_" + std::to_string(i), "value_" + std::to_string(i));
if (batch.Count() >= batch_size) {
db->Write(rocksdb::WriteOptions(), &batch);
batch.Clear();
}
}
// 写入剩余数据
if (batch.Count() > 0) {
db->Write(rocksdb::WriteOptions(), &batch);
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Batch size " << batch_size << ": "
<< duration.count() << " ms" << std::endl;
}
}
4.2 批次大小建议 #
| 场景 | 推荐批次大小 | 说明 |
|---|---|---|
| 实时写入 | 1-100 | 低延迟优先 |
| 批量导入 | 1000-10000 | 高吞吐优先 |
| 混合场景 | 100-1000 | 平衡延迟和吞吐 |
4.3 禁用WAL提高性能 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
void BatchImportNoWAL(rocksdb::DB* db,
const std::vector<std::pair<std::string, std::string>>& data) {
rocksdb::WriteBatch batch;
for (const auto& [key, value] : data) {
batch.Put(key, value);
}
// 禁用WAL提高性能(数据可能丢失)
rocksdb::WriteOptions write_options;
write_options.disableWAL = true;
write_options.sync = false;
db->Write(write_options, &batch);
}
五、错误处理 #
5.1 处理写入失败 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <iostream>
bool WriteWithRetry(rocksdb::DB* db, rocksdb::WriteBatch& batch, int max_retries = 3) {
for (int i = 0; i < max_retries; i++) {
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
if (status.ok()) {
return true;
}
std::cerr << "Write attempt " << (i + 1) << " failed: "
<< status.ToString() << std::endl;
if (status.IsIOError()) {
// IO错误,可能需要等待
std::this_thread::sleep_for(std::chrono::milliseconds(100 * (i + 1)));
} else if (status.IsCorruption()) {
// 数据损坏,不应重试
return false;
}
}
return false;
}
5.2 批次过大处理 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
void WriteLargeBatch(rocksdb::DB* db, rocksdb::WriteBatch& batch) {
// 检查批次大小
const size_t MAX_BATCH_SIZE = 4 * 1024 * 1024; // 4MB
if (batch.GetDataSize() > MAX_BATCH_SIZE) {
std::cerr << "Warning: Batch size (" << batch.GetDataSize()
<< ") exceeds recommended limit" << std::endl;
}
rocksdb::WriteOptions options;
options.sync = false; // 大批次建议异步写入
rocksdb::Status status = db->Write(options, &batch);
if (!status.ok()) {
std::cerr << "Write failed: " << status.ToString() << std::endl;
}
}
六、实际应用示例 #
6.1 批量用户数据更新 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>
#include <string>
struct User {
int id;
std::string name;
int age;
std::string email;
};
class UserRepository {
public:
UserRepository(rocksdb::DB* db) : db_(db) {}
// 批量保存用户
void BatchSaveUsers(const std::vector<User>& users) {
rocksdb::WriteBatch batch;
for (const auto& user : users) {
std::string key = "user:" + std::to_string(user.id);
std::string value = SerializeUser(user);
batch.Put(key, value);
// 更新索引
batch.Put("index:name:" + user.name, std::to_string(user.id));
batch.Put("index:email:" + user.email, std::to_string(user.id));
}
rocksdb::Status status = db_->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) {
throw std::runtime_error("Batch save failed: " + status.ToString());
}
}
// 批量删除用户
void BatchDeleteUsers(const std::vector<int>& user_ids) {
rocksdb::WriteBatch batch;
for (int user_id : user_ids) {
std::string key = "user:" + std::to_string(user_id);
// 先读取用户信息以删除索引
std::string value;
rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
if (status.ok()) {
User user = DeserializeUser(value);
batch.Delete("index:name:" + user.name);
batch.Delete("index:email:" + user.email);
}
batch.Delete(key);
}
db_->Write(rocksdb::WriteOptions(), &batch);
}
private:
rocksdb::DB* db_;
std::string SerializeUser(const User& user) {
return std::to_string(user.id) + "|" + user.name + "|" +
std::to_string(user.age) + "|" + user.email;
}
User DeserializeUser(const std::string& value) {
User user;
// 解析逻辑...
return user;
}
};
6.2 时序数据批量写入 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <chrono>
#include <iomanip>
#include <sstream>
class TimeSeriesDB {
public:
TimeSeriesDB(rocksdb::DB* db) : db_(db) {}
// 批量写入时序数据
void WriteMetrics(const std::string& metric_name,
const std::vector<std::pair<int64_t, double>>& points) {
rocksdb::WriteBatch batch;
for (const auto& [timestamp, value] : points) {
std::string key = EncodeKey(metric_name, timestamp);
std::string value_str = std::to_string(value);
batch.Put(key, value_str);
}
db_->Write(rocksdb::WriteOptions(), &batch);
}
// 批量写入多个指标
void WriteMultipleMetrics(
const std::vector<std::pair<std::string, double>>& metrics,
int64_t timestamp) {
rocksdb::WriteBatch batch;
for (const auto& [name, value] : metrics) {
std::string key = EncodeKey(name, timestamp);
batch.Put(key, std::to_string(value));
}
db_->Write(rocksdb::WriteOptions(), &batch);
}
private:
rocksdb::DB* db_;
std::string EncodeKey(const std::string& metric, int64_t timestamp) {
std::ostringstream oss;
oss << metric << ":" << std::setw(20) << std::setfill('0') << timestamp;
return oss.str();
}
};
七、总结 #
7.1 WriteBatch优势 #
| 优势 | 说明 |
|---|---|
| 原子性 | 所有操作要么全部成功,要么全部失败 |
| 高性能 | 减少IO次数,提高写入吞吐 |
| 一致性 | 保证数据状态一致 |
| 灵活性 | 支持多种操作类型组合 |
7.2 最佳实践 #
- 选择合适的批次大小:根据场景平衡延迟和吞吐
- 处理写入失败:实现重试机制
- 监控批次大小:避免批次过大
- 合理使用WAL:根据可靠性需求选择
- 批量操作原子性:利用WriteBatch保证一致性
下一步,让我们学习迭代器的使用!
最后更新:2026-03-27