RocksDB键值操作 #
一、写入操作 #
1.1 基本写入 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 基本写入
status = db->Put(rocksdb::WriteOptions(), "key1", "value1");
if (status.ok()) {
std::cout << "Write successful!" << std::endl;
} else {
std::cerr << "Write failed: " << status.ToString() << std::endl;
}
delete db;
return 0;
}
1.2 WriteOptions配置 #
cpp
rocksdb::WriteOptions write_options;
// 同步写入 - 确保数据持久化到磁盘
write_options.sync = true;
// 禁用WAL - 提高性能但可能丢失数据
write_options.disableWAL = false;
// 低优先级写入 - 减少对读取的影响
write_options.low_pri = false;
// 写入超时(微秒)
write_options.timeout_hint_us = 0;
// 使用写入
db->Put(write_options, "key", "value");
1.3 同步vs异步写入 #
cpp
// 异步写入(默认)- 高性能
rocksdb::WriteOptions async_options;
async_options.sync = false; // 数据可能丢失
db->Put(async_options, "key1", "value1");
// 同步写入 - 高可靠性
rocksdb::WriteOptions sync_options;
sync_options.sync = true; // 确保持久化
db->Put(sync_options, "key2", "value2");
// 禁用WAL - 最高性能
rocksdb::WriteOptions no_wal_options;
no_wal_options.disableWAL = true;
db->Put(no_wal_options, "key3", "value3");
1.4 写入性能对比 #
| 写入模式 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|
| sync=false | 极高 | 低 | 缓存数据 |
| sync=true | 中等 | 高 | 关键数据 |
| disableWAL=true | 最高 | 最低 | 临时数据 |
二、读取操作 #
2.1 基本读取 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 读取数据
std::string value;
status = db->Get(rocksdb::ReadOptions(), "key1", &value);
if (status.ok()) {
std::cout << "Value: " << value << std::endl;
} else if (status.IsNotFound()) {
std::cout << "Key not found!" << std::endl;
} else {
std::cerr << "Read error: " << status.ToString() << std::endl;
}
delete db;
return 0;
}
2.2 ReadOptions配置 #
cpp
rocksdb::ReadOptions read_options;
// 验证校验和
read_options.verify_checksums = true;
// 填充块缓存
read_options.fill_cache = true;
// 使用快照读取
read_options.snapshot = nullptr;
// 读取超时(微秒)
read_options.timeout_us = 0;
// 异步IO
read_options.async_io = false;
// 设置读取上限
read_options.readahead_size = 0;
// 使用读取
std::string value;
db->Get(read_options, "key", &value);
2.3 检查键是否存在 #
cpp
bool KeyExists(rocksdb::DB* db, const std::string& key) {
std::string value;
rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &value);
return status.ok();
}
// 使用示例
if (KeyExists(db, "user:1001")) {
std::cout << "Key exists!" << std::endl;
}
2.4 获取值或默认值 #
cpp
std::string GetOrDefault(rocksdb::DB* db,
const std::string& key,
const std::string& default_value) {
std::string value;
rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &value);
return status.ok() ? value : default_value;
}
// 使用示例
std::string config = GetOrDefault(db, "config:timeout", "30");
std::cout << "Timeout: " << config << std::endl;
三、删除操作 #
3.1 基本删除 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入数据
db->Put(rocksdb::WriteOptions(), "key1", "value1");
// 删除数据
rocksdb::Status status = db->Delete(rocksdb::WriteOptions(), "key1");
if (status.ok()) {
std::cout << "Delete successful!" << std::endl;
}
// 验证删除
std::string value;
status = db->Get(rocksdb::ReadOptions(), "key1", &value);
if (status.IsNotFound()) {
std::cout << "Key has been deleted!" << std::endl;
}
delete db;
return 0;
}
3.2 SingleDelete #
SingleDelete是一种优化的删除操作,适用于每个键只写入一次的场景。
cpp
// 适用于:每个key只会被Put一次,然后被删除
// 不适用于:同一个key被多次Put
// 正确使用
db->Put(rocksdb::WriteOptions(), "key1", "value1");
db->SingleDelete(rocksdb::WriteOptions(), "key1"); // 高效删除
// 错误使用
db->Put(rocksdb::WriteOptions(), "key2", "value1");
db->Put(rocksdb::WriteOptions(), "key2", "value2");
db->SingleDelete(rocksdb::WriteOptions(), "key2"); // 可能导致数据不一致
3.3 DeleteRange #
删除指定范围内的所有键。
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入测试数据
for (int i = 0; i < 100; i++) {
char key[20];
sprintf(key, "key_%03d", i);
db->Put(rocksdb::WriteOptions(), key, std::to_string(i));
}
// 删除范围 [key_020, key_040)
rocksdb::Status status = db->DeleteRange(
rocksdb::WriteOptions(),
"key_020", // 开始键(包含)
"key_040" // 结束键(不包含)
);
if (status.ok()) {
std::cout << "DeleteRange successful!" << std::endl;
}
// 验证删除
std::string value;
status = db->Get(rocksdb::ReadOptions(), "key_025", &value);
if (status.IsNotFound()) {
std::cout << "key_025 has been deleted!" << std::endl;
}
status = db->Get(rocksdb::ReadOptions(), "key_050", &value);
if (status.ok()) {
std::cout << "key_050 still exists: " << value << std::endl;
}
delete db;
return 0;
}
四、批量读取 #
4.1 MultiGet #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
#include <vector>
int main() {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::DB::Open(options, "/tmp/testdb", &db);
// 写入测试数据
db->Put(rocksdb::WriteOptions(), "key1", "value1");
db->Put(rocksdb::WriteOptions(), "key2", "value2");
db->Put(rocksdb::WriteOptions(), "key3", "value3");
db->Put(rocksdb::WriteOptions(), "key4", "value4");
// 准备键列表
std::vector<std::string> keys = {"key1", "key2", "key3", "key5"};
std::vector<rocksdb::Slice> key_slices;
for (const auto& key : keys) {
key_slices.push_back(key);
}
// 批量读取
std::vector<std::string> values;
std::vector<rocksdb::Status> statuses;
statuses = db->MultiGet(rocksdb::ReadOptions(), key_slices, &values);
// 处理结果
for (size_t i = 0; i < keys.size(); i++) {
if (statuses[i].ok()) {
std::cout << keys[i] << " = " << values[i] << std::endl;
} else {
std::cout << keys[i] << " not found" << std::endl;
}
}
delete db;
return 0;
}
4.2 MultiGet性能优化 #
cpp
// 使用PinnableSlice减少内存拷贝
std::vector<rocksdb::PinnableSlice> values(keys.size());
std::vector<rocksdb::Status> statuses(keys.size());
db->MultiGet(rocksdb::ReadOptions(),
rocksdb::ColumnFamilyHandle(),
keys.size(),
key_slices.data(),
values.data(),
statuses.data());
for (size_t i = 0; i < keys.size(); i++) {
if (statuses[i].ok()) {
std::cout << keys[i] << " = " << values[i].ToString() << std::endl;
}
values[i].Reset(); // 释放内存
}
五、键值编码 #
5.1 字符串键 #
cpp
// 简单字符串键
db->Put(rocksdb::WriteOptions(), "user:1001:name", "Alice");
db->Put(rocksdb::WriteOptions(), "user:1001:email", "alice@example.com");
// 带前缀的键
std::string MakeUserKey(int user_id, const std::string& field) {
return "user:" + std::to_string(user_id) + ":" + field;
}
db->Put(rocksdb::WriteOptions(), MakeUserKey(1001, "name"), "Alice");
db->Put(rocksdb::WriteOptions(), MakeUserKey(1001, "age"), "25");
5.2 二进制键 #
cpp
#include <rocksdb/slice.h>
// 使用Slice处理二进制数据
void PutBinary(rocksdb::DB* db, const std::string& key, const void* data, size_t size) {
rocksdb::Slice value_slice(reinterpret_cast<const char*>(data), size);
db->Put(rocksdb::WriteOptions(), key, value_slice);
}
// 读取二进制数据
std::string value;
db->Get(rocksdb::ReadOptions(), "binary_key", &value);
const void* data = value.data();
size_t size = value.size();
5.3 复合键编码 #
cpp
#include <rocksdb/db.h>
#include <sstream>
#include <iomanip>
// 方法1:字符串拼接
std::string MakeCompositeKey(const std::string& prefix, int id, int timestamp) {
std::ostringstream oss;
oss << prefix << ":"
<< std::setw(10) << std::setfill('0') << id << ":"
<< std::setw(15) << std::setfill('0') << timestamp;
return oss.str();
}
// 使用
std::string key = MakeCompositeKey("event", 12345, 1699900000);
db->Put(rocksdb::WriteOptions(), key, "event_data");
// 方法2:固定长度编码
std::string EncodeKey(uint32_t user_id, uint32_t item_id) {
std::string key(8, '\0');
// 大端序编码
key[0] = (user_id >> 24) & 0xFF;
key[1] = (user_id >> 16) & 0xFF;
key[2] = (user_id >> 8) & 0xFF;
key[3] = user_id & 0xFF;
key[4] = (item_id >> 24) & 0xFF;
key[5] = (item_id >> 16) & 0xFF;
key[6] = (item_id >> 8) & 0xFF;
key[7] = item_id & 0xFF;
return key;
}
六、值编码 #
6.1 字符串值 #
cpp
// 简单字符串
db->Put(rocksdb::WriteOptions(), "config:app_name", "MyApplication");
// JSON格式
#include <sstream>
std::string MakeUserJson(const std::string& name, int age, const std::string& email) {
std::ostringstream oss;
oss << "{\"name\":\"" << name << "\","
<< "\"age\":" << age << ","
<< "\"email\":\"" << email << "\"}";
return oss.str();
}
db->Put(rocksdb::WriteOptions(), "user:1001", MakeUserJson("Alice", 25, "alice@example.com"));
6.2 二进制值序列化 #
cpp
#include <rocksdb/db.h>
#include <cstring>
// 简单结构体序列化
struct UserInfo {
uint32_t id;
uint32_t age;
char name[32];
char email[64];
};
void PutUser(rocksdb::DB* db, const UserInfo& user) {
std::string key = "user:" + std::to_string(user.id);
rocksdb::Slice value(reinterpret_cast<const char*>(&user), sizeof(user));
db->Put(rocksdb::WriteOptions(), key, value);
}
bool GetUser(rocksdb::DB* db, uint32_t user_id, UserInfo* user) {
std::string key = "user:" + std::to_string(user_id);
std::string value;
rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &value);
if (status.ok() && value.size() == sizeof(UserInfo)) {
std::memcpy(user, value.data(), sizeof(UserInfo));
return true;
}
return false;
}
6.3 使用Protocol Buffers #
cpp
// user.proto
// message User {
// int32 id = 1;
// string name = 2;
// int32 age = 3;
// string email = 4;
// }
#include "user.pb.h"
void PutUserPB(rocksdb::DB* db, const User& user) {
std::string key = "user:" + std::to_string(user.id());
std::string value;
user.SerializeToString(&value);
db->Put(rocksdb::WriteOptions(), key, value);
}
bool GetUserPB(rocksdb::DB* db, uint32_t user_id, User* user) {
std::string key = "user:" + std::to_string(user_id);
std::string value;
rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &value);
if (status.ok()) {
return user->ParseFromString(value);
}
return false;
}
七、条件操作 #
7.1 GetForUpdate #
cpp
#include <rocksdb/db.h>
#include <rocksdb/transaction.h>
#include <rocksdb/utilities/transaction_db.h>
int main() {
rocksdb::TransactionDB* txn_db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::TransactionDBOptions txn_options;
rocksdb::Status status = rocksdb::TransactionDB::Open(
options, txn_options, "/tmp/txn_db", &txn_db);
// 创建事务
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 读取并锁定键
std::string value;
status = txn->GetForUpdate(rocksdb::ReadOptions(), "counter", &value);
if (status.ok()) {
int counter = std::stoi(value);
counter++;
txn->Put("counter", std::to_string(counter));
} else if (status.IsNotFound()) {
txn->Put("counter", "1");
}
// 提交事务
status = txn->Commit();
if (!status.ok()) {
std::cerr << "Commit failed: " << status.ToString() << std::endl;
}
delete txn;
delete txn_db;
return 0;
}
八、统计操作 #
8.1 获取键数量 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <iostream>
uint64_t GetApproximateKeys(rocksdb::DB* db) {
uint64_t count = 0;
rocksdb::Range range("", "~"); // 全范围
db->GetApproximateSizes(&range, 1, &count);
return count;
}
// 使用精确计数(较慢)
uint64_t GetExactKeyCount(rocksdb::DB* db) {
uint64_t count = 0;
rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
count++;
}
delete it;
return count;
}
8.2 获取数据库大小 #
cpp
#include <rocksdb/db.h>
#include <iostream>
void PrintDatabaseStats(rocksdb::DB* db) {
uint64_t total_size = 0;
uint64_t num_keys = 0;
// 获取各层大小
for (int level = 0; level < 7; level++) {
uint64_t level_size = 0;
db->GetIntProperty("rocksdb.num-files-at-level" + std::to_string(level), &level_size);
std::cout << "Level " << level << " files: " << level_size << std::endl;
}
// 获取总大小
db->GetIntProperty("rocksdb.estimate-num-keys", &num_keys);
db->GetIntProperty("rocksdb.total-sst-files-size", &total_size);
std::cout << "Total keys: " << num_keys << std::endl;
std::cout << "Total size: " << total_size << " bytes" << std::endl;
}
九、原子操作封装 #
9.1 原子计数器 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/transaction_db.h>
#include <string>
class AtomicCounter {
public:
AtomicCounter(rocksdb::TransactionDB* db, const std::string& key)
: db_(db), key_(key) {}
int64_t Increment(int64_t delta = 1) {
rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
std::string value;
int64_t counter = 0;
rocksdb::Status status = txn->GetForUpdate(rocksdb::ReadOptions(), key_, &value);
if (status.ok()) {
counter = std::stoll(value);
}
counter += delta;
txn->Put(key_, std::to_string(counter));
status = txn->Commit();
delete txn;
return status.ok() ? counter : -1;
}
int64_t Get() {
std::string value;
rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key_, &value);
return status.ok() ? std::stoll(value) : 0;
}
private:
rocksdb::TransactionDB* db_;
std::string key_;
};
十、总结 #
10.1 操作速查表 #
| 操作 | 方法 | 说明 |
|---|---|---|
| 写入 | Put(WriteOptions(), key, value) |
写入单个键值 |
| 读取 | Get(ReadOptions(), key, &value) |
读取单个值 |
| 删除 | Delete(WriteOptions(), key) |
删除键 |
| 范围删除 | DeleteRange(WriteOptions(), start, end) |
删除范围内键 |
| 批量读取 | MultiGet(ReadOptions(), keys, &values) |
批量读取 |
| 单删除 | SingleDelete(WriteOptions(), key) |
优化删除 |
10.2 最佳实践 #
- 选择合适的写入模式:根据可靠性需求选择sync选项
- 使用批量操作:减少IO次数,提高性能
- 合理编码键值:设计高效的键值编码方案
- 使用快照读取:实现一致性读
- 处理所有错误状态:检查Status返回值
下一步,让我们学习批量写入操作!
最后更新:2026-03-27