RocksDB合并操作 #
一、Merge操作概述 #
1.1 什么是Merge操作 #
Merge操作是RocksDB提供的增量更新机制,允许在不读取原值的情况下进行更新操作。
text
传统更新流程:
Get(key) → 读取原值 → 修改 → Put(key, new_value)
需要两次IO
Merge更新流程:
Merge(key, delta) → 直接合并
只需一次IO
1.2 Merge的优势 #
| 优势 | 说明 |
|---|---|
| 减少IO | 不需要先读取原值 |
| 原子性 | 合并操作是原子的 |
| 高效 | 特别适合计数器、列表追加 |
| 灵活 | 可自定义合并逻辑 |
1.3 基本使用 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
// 设置合并操作符
options.merge_operator.reset(new rocksdb::StringAppendOperator());
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 使用Merge操作
db->Merge(rocksdb::WriteOptions(), "list", "item1");
db->Merge(rocksdb::WriteOptions(), "list", "item2");
db->Merge(rocksdb::WriteOptions(), "list", "item3");
// 读取合并后的值
std::string value;
db->Get(rocksdb::ReadOptions(), "list", &value);
std::cout << "Merged value: " << value << std::endl;
// 输出: item1,item2,item3
delete db;
return 0;
}
二、内置Merge操作符 #
2.1 StringAppendOperator #
cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
// 字符串追加操作符
// 将多个值用分隔符连接
void UseStringAppendOperator() {
rocksdb::Options options;
options.create_if_missing = true;
// 使用逗号作为分隔符
options.merge_operator.reset(new rocksdb::StringAppendOperator(","));
rocksdb::DB* db;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 追加字符串
db->Merge(rocksdb::WriteOptions(), "tags", "tag1");
db->Merge(rocksdb::WriteOptions(), "tags", "tag2");
db->Merge(rocksdb::WriteOptions(), "tags", "tag3");
// 结果: "tag1,tag2,tag3"
delete db;
}
2.2 UInt64AddOperator #
cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/agg_merge.h>
// 64位整数加法操作符
// 用于计数器场景
void UseUInt64AddOperator() {
rocksdb::Options options;
options.create_if_missing = true;
// 使用整数加法操作符
options.merge_operator.reset(new rocksdb::UInt64AddOperator());
rocksdb::DB* db;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 初始化计数器
db->Put(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x01", 8));
// 增量更新
db->Merge(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x05", 8));
db->Merge(rocksdb::WriteOptions(), "counter", rocksdb::Slice("\x00\x00\x00\x00\x00\x00\x00\x03", 8));
// 结果: 1 + 5 + 3 = 9
delete db;
}
三、自定义Merge操作符 #
3.1 实现自定义操作符 #
cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <iostream>
// 自定义计数器合并操作符
class CounterMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
// 合并逻辑
virtual bool Merge(const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
int64_t existing = 0;
int64_t delta = 0;
// 解析现有值
if (existing_value) {
existing = ParseInt64(*existing_value);
}
// 解析增量
delta = ParseInt64(value);
// 计算新值
int64_t result = existing + delta;
*new_value = FormatInt64(result);
return true;
}
virtual const char* Name() const override {
return "CounterMergeOperator";
}
private:
int64_t ParseInt64(const rocksdb::Slice& s) const {
int64_t value = 0;
if (s.size() == 8) {
memcpy(&value, s.data(), 8);
} else {
value = std::stoll(s.ToString());
}
return value;
}
std::string FormatInt64(int64_t value) const {
return std::to_string(value);
}
};
// 使用自定义操作符
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CounterMergeOperator());
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 使用Merge操作
db->Put(rocksdb::WriteOptions(), "counter", "100");
db->Merge(rocksdb::WriteOptions(), "counter", "10");
db->Merge(rocksdb::WriteOptions(), "counter", "5");
std::string value;
db->Get(rocksdb::ReadOptions(), "counter", &value);
std::cout << "Counter: " << value << std::endl; // 输出: 115
delete db;
return 0;
}
3.2 实现列表合并操作符 #
cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <sstream>
// 列表追加操作符
class ListMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
explicit ListMergeOperator(const std::string& delimiter = ",")
: delimiter_(delimiter) {}
virtual bool Merge(const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
if (existing_value) {
*new_value = existing_value->ToString() + delimiter_ + value.ToString();
} else {
*new_value = value.ToString();
}
return true;
}
virtual const char* Name() const override {
return "ListMergeOperator";
}
private:
std::string delimiter_;
};
3.3 实现集合合并操作符 #
cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <unordered_set>
#include <sstream>
// 集合合并操作符(去重)
class SetMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
explicit SetMergeOperator(const std::string& delimiter = ",")
: delimiter_(delimiter) {}
virtual bool Merge(const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
std::unordered_set<std::string> elements;
// 解析现有元素
if (existing_value) {
ParseSet(*existing_value, &elements);
}
// 添加新元素
elements.insert(value.ToString());
// 序列化结果
*new_value = SerializeSet(elements);
return true;
}
virtual const char* Name() const override {
return "SetMergeOperator";
}
private:
std::string delimiter_;
void ParseSet(const rocksdb::Slice& s,
std::unordered_set<std::string>* elements) const {
std::istringstream iss(s.ToString());
std::string item;
while (std::getline(iss, item, delimiter_[0])) {
if (!item.empty()) {
elements->insert(item);
}
}
}
std::string SerializeSet(const std::unordered_set<std::string>& elements) const {
std::string result;
bool first = true;
for (const auto& elem : elements) {
if (!first) {
result += delimiter_;
}
result += elem;
first = false;
}
return result;
}
};
四、高级Merge操作符 #
4.1 实现通用Merge操作符 #
cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
#include <memory>
// 操作类型
enum class MergeOp {
ADD,
SUB,
SET,
APPEND,
DELETE
};
// 通用合并操作符
class GenericMergeOperator : public rocksdb::MergeOperator {
public:
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
std::string result;
// 处理现有值
if (merge_in.existing_value) {
result = merge_in.existing_value->ToString();
}
// 处理所有合并操作
for (const auto& operand : merge_in.operand_list) {
ApplyOperation(operand, &result);
}
merge_out->new_value = result;
return true;
}
virtual bool PartialMergeMulti(const rocksdb::Slice& key,
const std::vector<rocksdb::Slice>& operand_list,
std::string* new_value,
rocksdb::Logger* logger) const override {
std::string result;
for (const auto& operand : operand_list) {
ApplyOperation(operand, &result);
}
*new_value = result;
return true;
}
virtual const char* Name() const override {
return "GenericMergeOperator";
}
private:
void ApplyOperation(const rocksdb::Slice& operand, std::string* result) const {
// 解析操作类型和值
if (operand.size() < 2) return;
char op = operand[0];
std::string value = operand.ToString().substr(1);
switch (op) {
case '+': // 加法
*result = std::to_string(std::stoll(*result) + std::stoll(value));
break;
case '-': // 减法
*result = std::to_string(std::stoll(*result) - std::stoll(value));
break;
case '=': // 设置
*result = value;
break;
case 'A': // 追加
*result += value;
break;
}
}
};
4.2 JSON合并操作符 #
cpp
#include <rocksdb/merge_operator.h>
#include <rocksdb/slice.h>
#include <string>
// 简单JSON合并操作符
class JsonMergeOperator : public rocksdb::AssociativeMergeOperator {
public:
virtual bool Merge(const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
// 简化实现:合并JSON字段
// 实际应用中应使用JSON库
std::string existing = existing_value ? existing_value->ToString() : "{}";
std::string update = value.ToString();
// 简单合并逻辑(实际应使用JSON库)
if (existing == "{}") {
*new_value = update;
} else {
// 移除结尾的 }
existing.pop_back();
// 移除开头的 {
update = update.substr(1);
*new_value = existing + "," + update;
}
return true;
}
virtual const char* Name() const override {
return "JsonMergeOperator";
}
};
五、Merge与WriteBatch #
5.1 批量Merge操作 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/merge_operator.h>
void BatchMerge(rocksdb::DB* db) {
rocksdb::WriteBatch batch;
// 批量Merge操作
batch.Merge("counter1", "10");
batch.Merge("counter2", "20");
batch.Merge("counter3", "30");
// 原子执行
db->Write(rocksdb::WriteOptions(), &batch);
}
5.2 跨列族Merge #
cpp
#include <rocksdb/db.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/column_family.h>
void CrossColumnFamilyMerge(
rocksdb::DB* db,
rocksdb::ColumnFamilyHandle* cf1,
rocksdb::ColumnFamilyHandle* cf2) {
rocksdb::WriteBatch batch;
// 跨列族Merge
batch.Merge(cf1, "key1", "value1");
batch.Merge(cf2, "key2", "value2");
db->Write(rocksdb::WriteOptions(), &batch);
}
六、实际应用示例 #
6.1 计数器服务 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <string>
class CounterService {
public:
CounterService(rocksdb::DB* db) : db_(db) {}
void Increment(const std::string& counter_name, int64_t delta) {
std::string delta_str = std::to_string(delta);
db_->Merge(rocksdb::WriteOptions(), counter_name, delta_str);
}
int64_t Get(const std::string& counter_name) {
std::string value;
rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), counter_name, &value);
if (status.ok()) {
return std::stoll(value);
}
return 0;
}
void Reset(const std::string& counter_name) {
db_->Put(rocksdb::WriteOptions(), counter_name, "0");
}
private:
rocksdb::DB* db_;
};
6.2 标签系统 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <string>
#include <vector>
#include <sstream>
class TagService {
public:
TagService(rocksdb::DB* db) : db_(db) {}
void AddTag(const std::string& entity_id, const std::string& tag) {
db_->Merge(rocksdb::WriteOptions(), "tags:" + entity_id, tag);
}
std::vector<std::string> GetTags(const std::string& entity_id) {
std::string value;
rocksdb::Status status = db_->Get(rocksdb::ReadOptions(),
"tags:" + entity_id, &value);
std::vector<std::string> tags;
if (status.ok()) {
std::istringstream iss(value);
std::string tag;
while (std::getline(iss, tag, ',')) {
if (!tag.empty()) {
tags.push_back(tag);
}
}
}
return tags;
}
private:
rocksdb::DB* db_;
};
七、最佳实践 #
7.1 Merge使用建议 #
| 建议 | 说明 |
|---|---|
| 适合场景 | 计数器、列表追加、集合合并 |
| 避免场景 | 需要读取原值的复杂操作 |
| 性能优化 | 批量Merge减少IO |
| 错误处理 | 实现完整的错误处理逻辑 |
| 测试验证 | 充分测试合并逻辑正确性 |
7.2 性能考虑 #
text
Merge性能特点:
1. 写入性能
- 单次Merge只需一次IO
- 批量Merge更高效
2. 读取性能
- 需要执行合并逻辑
- 合并链越长,读取越慢
3. Compaction影响
- Compaction时会执行合并
- 减少合并链长度
4. 建议
- 定期触发Compaction
- 避免过长的合并链
八、总结 #
8.1 Merge API速查 #
| 操作 | 方法 | 说明 |
|---|---|---|
| 执行Merge | db->Merge(WriteOptions(), key, value) |
执行合并操作 |
| 设置操作符 | options.merge_operator.reset(new MyOperator()) |
设置自定义操作符 |
| 批量Merge | WriteBatch::Merge(key, value) |
批量合并操作 |
8.2 关键要点 #
- 减少IO:Merge操作不需要先读取原值
- 原子性:合并操作是原子的
- 自定义逻辑:可实现任意合并逻辑
- 性能考虑:注意合并链长度
- 测试验证:充分测试合并正确性
下一步,让我们学习事务处理!
最后更新:2026-03-27