RocksDB批量写入 #

一、WriteBatch基础 #

1.1 什么是WriteBatch #

WriteBatch是RocksDB提供的原子批量写入机制,可以将多个写操作打包成一个原子操作执行。

text
WriteBatch特点:
├── 原子性 - 全部成功或全部失败
├── 高性能 - 减少IO次数
├── 有序性 - 操作按顺序执行
└── 一致性 - 保证数据一致性

1.2 基本使用 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/write_batch.h>
#include <iostream>

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
    if (!status.ok()) {
        std::cerr << "Open failed: " << status.ToString() << std::endl;
        return 1;
    }
    
    // 创建WriteBatch
    rocksdb::WriteBatch batch;
    
    // 添加操作到batch
    batch.Put("key1", "value1");
    batch.Put("key2", "value2");
    batch.Put("key3", "value3");
    batch.Delete("key1");  // 可以包含删除操作
    
    // 原子执行
    status = db->Write(rocksdb::WriteOptions(), &batch);
    
    if (status.ok()) {
        std::cout << "Batch write successful!" << std::endl;
    } else {
        std::cerr << "Batch write failed: " << status.ToString() << std::endl;
    }
    
    delete db;
    return 0;
}

1.3 WriteBatch操作类型 #

cpp
rocksdb::WriteBatch batch;

// Put操作
batch.Put("key1", "value1");

// Delete操作
batch.Delete("key2");

// SingleDelete操作
batch.SingleDelete("key3");

// DeleteRange操作
batch.DeleteRange("start_key", "end_key");

// Merge操作
batch.Merge("key4", "merge_value");

// 嵌套WriteBatch
rocksdb::WriteBatch sub_batch;
sub_batch.Put("key5", "value5");
batch.PutLogData(sub_batch.Data());  // 添加日志数据

二、批量写入场景 #

2.1 批量数据导入 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>
#include <iostream>

void BatchImport(rocksdb::DB* db, 
                 const std::vector<std::pair<std::string, std::string>>& data,
                 size_t batch_size = 1000) {
    rocksdb::WriteBatch batch;
    size_t count = 0;
    
    for (const auto& [key, value] : data) {
        batch.Put(key, value);
        count++;
        
        // 达到批次大小时提交
        if (count >= batch_size) {
            rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
            if (!status.ok()) {
                std::cerr << "Batch write failed: " << status.ToString() << std::endl;
                return;
            }
            batch.Clear();
            count = 0;
        }
    }
    
    // 写入剩余数据
    if (count > 0) {
        db->Write(rocksdb::WriteOptions(), &batch);
    }
}

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 准备大量数据
    std::vector<std::pair<std::string, std::string>> data;
    for (int i = 0; i < 100000; i++) {
        data.push_back({"key_" + std::to_string(i), "value_" + std::to_string(i)});
    }
    
    // 批量导入
    BatchImport(db, data, 1000);
    
    std::cout << "Import completed!" << std::endl;
    
    delete db;
    return 0;
}

2.2 原子更新多个键 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>

// 原子转账操作
bool Transfer(rocksdb::DB* db, 
              const std::string& from_account,
              const std::string& to_account,
              int amount) {
    // 读取当前余额
    std::string from_balance_str, to_balance_str;
    rocksdb::Status status;
    
    status = db->Get(rocksdb::ReadOptions(), from_account, &from_balance_str);
    if (!status.ok()) return false;
    
    status = db->Get(rocksdb::ReadOptions(), to_account, &to_balance_str);
    if (!status.ok()) return false;
    
    int from_balance = std::stoi(from_balance_str);
    int to_balance = std::stoi(to_balance_str);
    
    // 检查余额
    if (from_balance < amount) {
        return false;
    }
    
    // 原子更新
    rocksdb::WriteBatch batch;
    batch.Put(from_account, std::to_string(from_balance - amount));
    batch.Put(to_account, std::to_string(to_balance + amount));
    
    status = db->Write(rocksdb::WriteOptions(), &batch);
    return status.ok();
}

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    rocksdb::DB::Open(options, "/tmp/bank_db", &db);
    
    // 初始化账户
    db->Put(rocksdb::WriteOptions(), "account:A", "1000");
    db->Put(rocksdb::WriteOptions(), "account:B", "500");
    
    // 转账
    if (Transfer(db, "account:A", "account:B", 200)) {
        std::cout << "Transfer successful!" << std::endl;
    }
    
    delete db;
    return 0;
}

2.3 批量删除 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>

void BatchDelete(rocksdb::DB* db, const std::vector<std::string>& keys) {
    rocksdb::WriteBatch batch;
    
    for (const auto& key : keys) {
        batch.Delete(key);
    }
    
    rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
    if (!status.ok()) {
        std::cerr << "Batch delete failed: " << status.ToString() << std::endl;
    }
}

// 使用示例
int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 写入测试数据
    for (int i = 0; i < 100; i++) {
        db->Put(rocksdb::WriteOptions(), "key_" + std::to_string(i), "value");
    }
    
    // 批量删除
    std::vector<std::string> keys_to_delete;
    for (int i = 20; i < 40; i++) {
        keys_to_delete.push_back("key_" + std::to_string(i));
    }
    BatchDelete(db, keys_to_delete);
    
    delete db;
    return 0;
}

三、WriteBatch高级用法 #

3.1 设置回调 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/write_batch_base.h>

class BatchHandler : public rocksdb::WriteBatch::Handler {
public:
    rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
                          const rocksdb::Slice& value) override {
        std::cout << "Put: " << key.ToString() << " = " << value.ToString() << std::endl;
        return rocksdb::Status::OK();
    }
    
    rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
        std::cout << "Delete: " << key.ToString() << std::endl;
        return rocksdb::Status::OK();
    }
};

void IterateBatch(const rocksdb::WriteBatch& batch) {
    BatchHandler handler;
    batch.Iterate(&handler);
}

3.2 获取批次大小 #

cpp
#include <rocksdb/write_batch.h>

void PrintBatchInfo(const rocksdb::WriteBatch& batch) {
    std::cout << "Batch size: " << batch.GetDataSize() << " bytes" << std::endl;
    std::cout << "Count: " << batch.Count() << std::endl;
}

3.3 合并WriteBatch #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>

void MergeBatches(rocksdb::DB* db) {
    rocksdb::WriteBatch batch1;
    batch1.Put("key1", "value1");
    batch1.Put("key2", "value2");
    
    rocksdb::WriteBatch batch2;
    batch2.Put("key3", "value3");
    batch2.Delete("key1");
    
    // 合并批次
    rocksdb::WriteBatch merged_batch;
    merged_batch.Put("key1", "value1");
    merged_batch.Put("key2", "value2");
    merged_batch.Put("key3", "value3");
    merged_batch.Delete("key1");
    
    db->Write(rocksdb::WriteOptions(), &merged_batch);
}

四、性能优化 #

4.1 选择合适的批次大小 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <chrono>

// 测试不同批次大小的性能
void BenchmarkBatchSizes(rocksdb::DB* db, int total_records) {
    std::vector<size_t> batch_sizes = {1, 10, 100, 1000, 10000};
    
    for (size_t batch_size : batch_sizes) {
        auto start = std::chrono::high_resolution_clock::now();
        
        rocksdb::WriteBatch batch;
        for (int i = 0; i < total_records; i++) {
            batch.Put("key_" + std::to_string(i), "value_" + std::to_string(i));
            
            if (batch.Count() >= batch_size) {
                db->Write(rocksdb::WriteOptions(), &batch);
                batch.Clear();
            }
        }
        
        // 写入剩余数据
        if (batch.Count() > 0) {
            db->Write(rocksdb::WriteOptions(), &batch);
        }
        
        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        
        std::cout << "Batch size " << batch_size << ": " 
                  << duration.count() << " ms" << std::endl;
    }
}

4.2 批次大小建议 #

场景 推荐批次大小 说明
实时写入 1-100 低延迟优先
批量导入 1000-10000 高吞吐优先
混合场景 100-1000 平衡延迟和吞吐

4.3 禁用WAL提高性能 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>

void BatchImportNoWAL(rocksdb::DB* db, 
                      const std::vector<std::pair<std::string, std::string>>& data) {
    rocksdb::WriteBatch batch;
    
    for (const auto& [key, value] : data) {
        batch.Put(key, value);
    }
    
    // 禁用WAL提高性能(数据可能丢失)
    rocksdb::WriteOptions write_options;
    write_options.disableWAL = true;
    write_options.sync = false;
    
    db->Write(write_options, &batch);
}

五、错误处理 #

5.1 处理写入失败 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <iostream>

bool WriteWithRetry(rocksdb::DB* db, rocksdb::WriteBatch& batch, int max_retries = 3) {
    for (int i = 0; i < max_retries; i++) {
        rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
        
        if (status.ok()) {
            return true;
        }
        
        std::cerr << "Write attempt " << (i + 1) << " failed: " 
                  << status.ToString() << std::endl;
        
        if (status.IsIOError()) {
            // IO错误,可能需要等待
            std::this_thread::sleep_for(std::chrono::milliseconds(100 * (i + 1)));
        } else if (status.IsCorruption()) {
            // 数据损坏,不应重试
            return false;
        }
    }
    
    return false;
}

5.2 批次过大处理 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>

void WriteLargeBatch(rocksdb::DB* db, rocksdb::WriteBatch& batch) {
    // 检查批次大小
    const size_t MAX_BATCH_SIZE = 4 * 1024 * 1024;  // 4MB
    
    if (batch.GetDataSize() > MAX_BATCH_SIZE) {
        std::cerr << "Warning: Batch size (" << batch.GetDataSize() 
                  << ") exceeds recommended limit" << std::endl;
    }
    
    rocksdb::WriteOptions options;
    options.sync = false;  // 大批次建议异步写入
    
    rocksdb::Status status = db->Write(options, &batch);
    if (!status.ok()) {
        std::cerr << "Write failed: " << status.ToString() << std::endl;
    }
}

六、实际应用示例 #

6.1 批量用户数据更新 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <vector>
#include <string>

struct User {
    int id;
    std::string name;
    int age;
    std::string email;
};

class UserRepository {
public:
    UserRepository(rocksdb::DB* db) : db_(db) {}
    
    // 批量保存用户
    void BatchSaveUsers(const std::vector<User>& users) {
        rocksdb::WriteBatch batch;
        
        for (const auto& user : users) {
            std::string key = "user:" + std::to_string(user.id);
            std::string value = SerializeUser(user);
            batch.Put(key, value);
            
            // 更新索引
            batch.Put("index:name:" + user.name, std::to_string(user.id));
            batch.Put("index:email:" + user.email, std::to_string(user.id));
        }
        
        rocksdb::Status status = db_->Write(rocksdb::WriteOptions(), &batch);
        if (!status.ok()) {
            throw std::runtime_error("Batch save failed: " + status.ToString());
        }
    }
    
    // 批量删除用户
    void BatchDeleteUsers(const std::vector<int>& user_ids) {
        rocksdb::WriteBatch batch;
        
        for (int user_id : user_ids) {
            std::string key = "user:" + std::to_string(user_id);
            
            // 先读取用户信息以删除索引
            std::string value;
            rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
            
            if (status.ok()) {
                User user = DeserializeUser(value);
                batch.Delete("index:name:" + user.name);
                batch.Delete("index:email:" + user.email);
            }
            
            batch.Delete(key);
        }
        
        db_->Write(rocksdb::WriteOptions(), &batch);
    }

private:
    rocksdb::DB* db_;
    
    std::string SerializeUser(const User& user) {
        return std::to_string(user.id) + "|" + user.name + "|" + 
               std::to_string(user.age) + "|" + user.email;
    }
    
    User DeserializeUser(const std::string& value) {
        User user;
        // 解析逻辑...
        return user;
    }
};

6.2 时序数据批量写入 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <chrono>
#include <iomanip>
#include <sstream>

class TimeSeriesDB {
public:
    TimeSeriesDB(rocksdb::DB* db) : db_(db) {}
    
    // 批量写入时序数据
    void WriteMetrics(const std::string& metric_name,
                      const std::vector<std::pair<int64_t, double>>& points) {
        rocksdb::WriteBatch batch;
        
        for (const auto& [timestamp, value] : points) {
            std::string key = EncodeKey(metric_name, timestamp);
            std::string value_str = std::to_string(value);
            batch.Put(key, value_str);
        }
        
        db_->Write(rocksdb::WriteOptions(), &batch);
    }
    
    // 批量写入多个指标
    void WriteMultipleMetrics(
        const std::vector<std::pair<std::string, double>>& metrics,
        int64_t timestamp) {
        
        rocksdb::WriteBatch batch;
        
        for (const auto& [name, value] : metrics) {
            std::string key = EncodeKey(name, timestamp);
            batch.Put(key, std::to_string(value));
        }
        
        db_->Write(rocksdb::WriteOptions(), &batch);
    }

private:
    rocksdb::DB* db_;
    
    std::string EncodeKey(const std::string& metric, int64_t timestamp) {
        std::ostringstream oss;
        oss << metric << ":" << std::setw(20) << std::setfill('0') << timestamp;
        return oss.str();
    }
};

七、总结 #

7.1 WriteBatch优势 #

优势 说明
原子性 所有操作要么全部成功,要么全部失败
高性能 减少IO次数,提高写入吞吐
一致性 保证数据状态一致
灵活性 支持多种操作类型组合

7.2 最佳实践 #

  1. 选择合适的批次大小:根据场景平衡延迟和吞吐
  2. 处理写入失败:实现重试机制
  3. 监控批次大小:避免批次过大
  4. 合理使用WAL:根据可靠性需求选择
  5. 批量操作原子性:利用WriteBatch保证一致性

下一步,让我们学习迭代器的使用!

最后更新:2026-03-27