RocksDB合并操作 #

一、Merge操作概述 #

1.1 什么是Merge操作 #

Merge操作是RocksDB提供的增量更新机制,允许在不读取原值的情况下进行更新操作。

text
传统更新流程:
Get(key) → 读取原值 → 修改 → Put(key, new_value)
         需要两次IO

Merge更新流程:
Merge(key, delta) → 直接合并
         只需一次IO

1.2 Merge的优势 #

优势 说明
减少IO 不需要先读取原值
原子性 合并操作是原子的
高效 特别适合计数器、列表追加
灵活 可自定义合并逻辑

1.3 基本使用 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <iostream>

int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    // 设置合并操作符
    options.merge_operator.reset(new rocksdb::StringAppendOperator());
    
    rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
    if (!status.ok()) {
        std::cerr << "Open failed: " << status.ToString() << std::endl;
        return 1;
    }
    
    // 使用Merge操作
    db->Merge(rocksdb::WriteOptions(), "list", "item1");
    db->Merge(rocksdb::WriteOptions(), "list", "item2");
    db->Merge(rocksdb::WriteOptions(), "list", "item3");
    
    // 读取合并后的值
    std::string value;
    db->Get(rocksdb::ReadOptions(), "list", &value);
    
    std::cout << "Merged value: " << value << std::endl;
    // 输出: item1,item2,item3
    
    delete db;
    return 0;
}

二、内置Merge操作符 #

2.1 StringAppendOperator #

cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>

// 字符串追加操作符
// 将多个值用分隔符连接

void UseStringAppendOperator() {
    rocksdb::Options options;
    options.create_if_missing = true;
    
    // 使用逗号作为分隔符
    options.merge_operator.reset(new rocksdb::StringAppendOperator(","));
    
    rocksdb::DB* db;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 追加字符串
    db->Merge(rocksdb::WriteOptions(), "tags", "tag1");
    db->Merge(rocksdb::WriteOptions(), "tags", "tag2");
    db->Merge(rocksdb::WriteOptions(), "tags", "tag3");
    
    // 结果: "tag1,tag2,tag3"
    
    delete db;
}

2.2 UInt64AddOperator #

cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/agg_merge.h>

// 64位整数加法操作符
// 用于计数器场景

void UseUInt64AddOperator() {
    rocksdb::Options options;
    options.create_if_missing = true;
    
    // 使用整数加法操作符
    options.merge_operator.reset(new rocksdb::UInt64AddOperator());
    
    rocksdb::DB* db;
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 初始化计数器
    db->Put(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x01", 8));
    
    // 增量更新
    db->Merge(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x05", 8));
    db->Merge(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x03", 8));
    
    // 结果: 1 + 5 + 3 = 9
    
    delete db;
}

三、自定义Merge操作符 #

3.1 实现自定义操作符 #

cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <iostream>

// 自定义计数器合并操作符
class CounterMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
    // 合并逻辑
    virtual bool Merge(const rocksdb::Slice& key,
                       const rocksdb::Slice* existing_value,
                       const rocksdb::Slice& value,
                       std::string* new_value,
                       rocksdb::Logger* logger) const override {
        
        int64_t existing = 0;
        int64_t delta = 0;
        
        // 解析现有值
        if (existing_value) {
            existing = ParseInt64(*existing_value);
        }
        
        // 解析增量
        delta = ParseInt64(value);
        
        // 计算新值
        int64_t result = existing + delta;
        *new_value = FormatInt64(result);
        
        return true;
    }
    
    virtual const char* Name() const override {
        return "CounterMergeOperator";
    }

private:
    int64_t ParseInt64(const rocksdb::Slice& s) const {
        int64_t value = 0;
        if (s.size() == 8) {
            memcpy(&value, s.data(), 8);
        } else {
            value = std::stoll(s.ToString());
        }
        return value;
    }
    
    std::string FormatInt64(int64_t value) const {
        return std::to_string(value);
    }
};

// 使用自定义操作符
int main() {
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    options.merge_operator.reset(new CounterMergeOperator());
    
    rocksdb::DB::Open(options, "/tmp/testdb", &db);
    
    // 使用Merge操作
    db->Put(rocksdb::WriteOptions(), "counter", "100");
    db->Merge(rocksdb::WriteOptions(), "counter", "10");
    db->Merge(rocksdb::WriteOptions(), "counter", "5");
    
    std::string value;
    db->Get(rocksdb::ReadOptions(), "counter", &value);
    
    std::cout << "Counter: " << value << std::endl;  // 输出: 115
    
    delete db;
    return 0;
}

3.2 实现列表合并操作符 #

cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <sstream>

// 列表追加操作符
class ListMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
    explicit ListMergeOperator(const std::string& delimiter = ",") 
        : delimiter_(delimiter) {}
    
    virtual bool Merge(const rocksdb::Slice& key,
                       const rocksdb::Slice* existing_value,
                       const rocksdb::Slice& value,
                       std::string* new_value,
                       rocksdb::Logger* logger) const override {
        
        if (existing_value) {
            *new_value = existing_value->ToString() + delimiter_ + value.ToString();
        } else {
            *new_value = value.ToString();
        }
        
        return true;
    }
    
    virtual const char* Name() const override {
        return "ListMergeOperator";
    }

private:
    std::string delimiter_;
};

3.3 实现集合合并操作符 #

cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <unordered_set>
#include <sstream>

// 集合合并操作符(去重)
class SetMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
    explicit SetMergeOperator(const std::string& delimiter = ",") 
        : delimiter_(delimiter) {}
    
    virtual bool Merge(const rocksdb::Slice& key,
                       const rocksdb::Slice* existing_value,
                       const rocksdb::Slice& value,
                       std::string* new_value,
                       rocksdb::Logger* logger) const override {
        
        std::unordered_set<std::string> elements;
        
        // 解析现有元素
        if (existing_value) {
            ParseSet(*existing_value, &elements);
        }
        
        // 添加新元素
        elements.insert(value.ToString());
        
        // 序列化结果
        *new_value = SerializeSet(elements);
        
        return true;
    }
    
    virtual const char* Name() const override {
        return "SetMergeOperator";
    }

private:
    std::string delimiter_;
    
    void ParseSet(const rocksdb::Slice& s, 
                  std::unordered_set<std::string>* elements) const {
        std::istringstream iss(s.ToString());
        std::string item;
        while (std::getline(iss, item, delimiter_[0])) {
            if (!item.empty()) {
                elements->insert(item);
            }
        }
    }
    
    std::string SerializeSet(const std::unordered_set<std::string>& elements) const {
        std::string result;
        bool first = true;
        for (const auto& elem : elements) {
            if (!first) {
                result += delimiter_;
            }
            result += elem;
            first = false;
        }
        return result;
    }
};

四、高级Merge操作符 #

4.1 实现通用Merge操作符 #

cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <memory>

// 操作类型
enum class MergeOp {
    ADD,
    SUB,
    SET,
    APPEND,
    DELETE
};

// 通用合并操作符
class GenericMergeOperator : public rocksdb::MergeOperator {
public:
    virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                             MergeOperationOutput* merge_out) const override {
        
        std::string result;
        
        // 处理现有值
        if (merge_in.existing_value) {
            result = merge_in.existing_value->ToString();
        }
        
        // 处理所有合并操作
        for (const auto& operand : merge_in.operand_list) {
            ApplyOperation(operand, &result);
        }
        
        merge_out->new_value = result;
        return true;
    }
    
    virtual bool PartialMergeMulti(const rocksdb::Slice& key,
                                   const std::vector<rocksdb::Slice>& operand_list,
                                   std::string* new_value,
                                   rocksdb::Logger* logger) const override {
        
        std::string result;
        for (const auto& operand : operand_list) {
            ApplyOperation(operand, &result);
        }
        *new_value = result;
        return true;
    }
    
    virtual const char* Name() const override {
        return "GenericMergeOperator";
    }

private:
    void ApplyOperation(const rocksdb::Slice& operand, std::string* result) const {
        // 解析操作类型和值
        if (operand.size() < 2) return;
        
        char op = operand[0];
        std::string value = operand.ToString().substr(1);
        
        switch (op) {
            case '+':  // 加法
                *result = std::to_string(std::stoll(*result) + std::stoll(value));
                break;
            case '-':  // 减法
                *result = std::to_string(std::stoll(*result) - std::stoll(value));
                break;
            case '=':  // 设置
                *result = value;
                break;
            case 'A':  // 追加
                *result += value;
                break;
        }
    }
};

4.2 JSON合并操作符 #

cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>

// 简单JSON合并操作符
class JsonMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
    virtual bool Merge(const rocksdb::Slice& key,
                       const rocksdb::Slice* existing_value,
                       const rocksdb::Slice& value,
                       std::string* new_value,
                       rocksdb::Logger* logger) const override {
        
        // 简化实现:合并JSON字段
        // 实际应用中应使用JSON库
        
        std::string existing = existing_value ? existing_value->ToString() : "{}";
        std::string update = value.ToString();
        
        // 简单合并逻辑(实际应使用JSON库)
        if (existing == "{}") {
            *new_value = update;
        } else {
            // 移除结尾的 }
            existing.pop_back();
            // 移除开头的 {
            update = update.substr(1);
            *new_value = existing + "," + update;
        }
        
        return true;
    }
    
    virtual const char* Name() const override {
        return "JsonMergeOperator";
    }
};

五、Merge与WriteBatch #

5.1 批量Merge操作 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/merge_operator.h>

void BatchMerge(rocksdb::DB* db) {
    rocksdb::WriteBatch batch;
    
    // 批量Merge操作
    batch.Merge("counter1", "10");
    batch.Merge("counter2", "20");
    batch.Merge("counter3", "30");
    
    // 原子执行
    db->Write(rocksdb::WriteOptions(), &batch);
}

5.2 跨列族Merge #

cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/column_family.h>

void CrossColumnFamilyMerge(
    rocksdb::DB* db,
    rocksdb::ColumnFamilyHandle* cf1,
    rocksdb::ColumnFamilyHandle* cf2) {
    
    rocksdb::WriteBatch batch;
    
    // 跨列族Merge
    batch.Merge(cf1, "key1", "value1");
    batch.Merge(cf2, "key2", "value2");
    
    db->Write(rocksdb::WriteOptions(), &batch);
}

六、实际应用示例 #

6.1 计数器服务 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <string>

class CounterService {
public:
    CounterService(rocksdb::DB* db) : db_(db) {}
    
    void Increment(const std::string& counter_name, int64_t delta) {
        std::string delta_str = std::to_string(delta);
        db_->Merge(rocksdb::WriteOptions(), counter_name, delta_str);
    }
    
    int64_t Get(const std::string& counter_name) {
        std::string value;
        rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), counter_name, &value);
        
        if (status.ok()) {
            return std::stoll(value);
        }
        return 0;
    }
    
    void Reset(const std::string& counter_name) {
        db_->Put(rocksdb::WriteOptions(), counter_name, "0");
    }

private:
    rocksdb::DB* db_;
};

6.2 标签系统 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <string>
#include <vector>
#include <sstream>

class TagService {
public:
    TagService(rocksdb::DB* db) : db_(db) {}
    
    void AddTag(const std::string& entity_id, const std::string& tag) {
        db_->Merge(rocksdb::WriteOptions(), "tags:" + entity_id, tag);
    }
    
    std::vector<std::string> GetTags(const std::string& entity_id) {
        std::string value;
        rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), 
                                          "tags:" + entity_id, &value);
        
        std::vector<std::string> tags;
        if (status.ok()) {
            std::istringstream iss(value);
            std::string tag;
            while (std::getline(iss, tag, ',')) {
                if (!tag.empty()) {
                    tags.push_back(tag);
                }
            }
        }
        return tags;
    }

private:
    rocksdb::DB* db_;
};

七、最佳实践 #

7.1 Merge使用建议 #

建议 说明
适合场景 计数器、列表追加、集合合并
避免场景 需要读取原值的复杂操作
性能优化 批量Merge减少IO
错误处理 实现完整的错误处理逻辑
测试验证 充分测试合并逻辑正确性

7.2 性能考虑 #

text
Merge性能特点:

1. 写入性能
   - 单次Merge只需一次IO
   - 批量Merge更高效

2. 读取性能
   - 需要执行合并逻辑
   - 合并链越长,读取越慢

3. Compaction影响
   - Compaction时会执行合并
   - 减少合并链长度

4. 建议
   - 定期触发Compaction
   - 避免过长的合并链

八、总结 #

8.1 Merge API速查 #

操作 方法 说明
执行Merge db->Merge(WriteOptions(), key, value) 执行合并操作
设置操作符 options.merge_operator.reset(new MyOperator()) 设置自定义操作符
批量Merge WriteBatch::Merge(key, value) 批量合并操作

8.2 关键要点 #

  1. 减少IO:Merge操作不需要先读取原值
  2. 原子性:合并操作是原子的
  3. 自定义逻辑:可实现任意合并逻辑
  4. 性能考虑:注意合并链长度
  5. 测试验证:充分测试合并正确性

下一步,让我们学习事务处理!

最后更新:2026-03-27