RocksDB事务 #
一、事务概述 #
1.1 RocksDB事务特性 #
RocksDB提供了简单的事务支持,具有以下特性:
text
事务特性:
├── 原子性(Atomicity) - 全部成功或全部失败
├── 一致性(Consistency) - 数据状态一致
├── 隔离性(Isolation) - 支持快照隔离
└── 持久性(Durability) - WAL保证持久化
1.2 事务类型 #
| 类型 | 说明 | 适用场景 |
|---|---|---|
| TransactionDB | 支持事务和并发控制 | 需要ACID保证 |
| OptimisticTransactionDB | 乐观锁,提交时检测冲突 | 冲突较少的场景 |
1.3 基本使用 #
cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>
int main() {
rocksdb::TransactionDB* txn_db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::TransactionDBOptions txn_options;
rocksdb::Status status = rocksdb::TransactionDB::Open(
options, txn_options, "/tmp/txn_db", &txn_db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return 1;
}
// 创建事务
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 在事务中执行操作
txn->Put("key1", "value1");
txn->Put("key2", "value2");
txn->Delete("key3");
// 提交事务
status = txn->Commit();
if (status.ok()) {
std::cout << "Transaction committed successfully!" << std::endl;
} else {
std::cerr << "Commit failed: " << status.ToString() << std::endl;
}
delete txn;
delete txn_db;
return 0;
}
二、TransactionDB #
2.1 创建TransactionDB #
cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/transaction_db.h>
rocksdb::TransactionDB* CreateTransactionDB(const std::string& db_path) {
rocksdb::Options options;
options.create_if_missing = true;
// 事务数据库选项
rocksdb::TransactionDBOptions txn_options;
txn_options.max_num_locks = 1000000; // 最大锁数量
txn_options.num_stripes = 16; // 锁分片数
txn_options.transaction_lock_timeout = 1000; // 锁超时(毫秒)
txn_options.default_lock_timeout = 1000; // 默认锁超时
rocksdb::TransactionDB* txn_db;
rocksdb::Status status = rocksdb::TransactionDB::Open(
options, txn_options, db_path, &txn_db);
return txn_db;
}
2.2 事务基本操作 #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>
void BasicTransaction(rocksdb::TransactionDB* txn_db) {
// 开始事务
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 写入数据
rocksdb::Status status = txn->Put("key1", "value1");
if (!status.ok()) {
std::cerr << "Put failed: " << status.ToString() << std::endl;
delete txn;
return;
}
// 读取数据
std::string value;
status = txn->Get(rocksdb::ReadOptions(), "key1", &value);
if (status.ok()) {
std::cout << "Read in transaction: " << value << std::endl;
}
// 删除数据
status = txn->Delete("key2");
// 提交事务
status = txn->Commit();
if (status.ok()) {
std::cout << "Transaction committed!" << std::endl;
} else {
std::cerr << "Commit failed: " << status.ToString() << std::endl;
}
delete txn;
}
2.3 事务回滚 #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>
void TransactionRollback(rocksdb::TransactionDB* txn_db) {
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
txn->Put("key1", "value1");
txn->Put("key2", "value2");
// 检查某些条件
bool should_rollback = true; // 某种条件判断
if (should_rollback) {
// 回滚事务
txn->Rollback();
std::cout << "Transaction rolled back!" << std::endl;
} else {
txn->Commit();
}
delete txn;
}
三、锁机制 #
3.1 GetForUpdate #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>
// GetForUpdate: 读取并锁定键,防止其他事务修改
void GetForUpdateExample(rocksdb::TransactionDB* txn_db) {
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 读取并锁定
std::string value;
rocksdb::Status status = txn->GetForUpdate(
rocksdb::ReadOptions(), "counter", &value);
if (status.ok()) {
int counter = std::stoi(value);
counter++;
// 更新值
txn->Put("counter", std::to_string(counter));
} else if (status.IsNotFound()) {
// 键不存在,初始化
txn->Put("counter", "1");
}
// 提交事务
status = txn->Commit();
if (!status.ok()) {
std::cerr << "Commit failed: " << status.ToString() << std::endl;
}
delete txn;
}
3.2 批量锁定 #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <vector>
void MultiGetForUpdate(rocksdb::TransactionDB* txn_db,
const std::vector<std::string>& keys) {
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 批量读取并锁定
std::vector<rocksdb::Slice> key_slices;
std::vector<std::string> values(keys.size());
std::vector<rocksdb::Status> statuses;
for (const auto& key : keys) {
key_slices.push_back(key);
}
statuses = txn->MultiGetForUpdate(
rocksdb::ReadOptions(), key_slices, &values);
// 处理结果
for (size_t i = 0; i < keys.size(); i++) {
if (statuses[i].ok()) {
std::cout << keys[i] << " = " << values[i] << std::endl;
}
}
// 执行更新
// ...
txn->Commit();
delete txn;
}
3.3 设置锁超时 #
cpp
#include <rocksdb/utilities/transaction_db.h>
void SetLockTimeout(rocksdb::TransactionDB* txn_db) {
rocksdb::WriteOptions write_options;
rocksdb::Transaction* txn = txn_db->BeginTransaction(write_options);
// 设置事务锁超时(毫秒)
txn->SetLockTimeout(5000); // 5秒
// 或者设置特定键的锁超时
// txn->SetTimeout(5000);
// 执行操作...
txn->Commit();
delete txn;
}
四、乐观事务 #
4.1 OptimisticTransactionDB #
cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <iostream>
void OptimisticTransactionExample(const std::string& db_path) {
rocksdb::OptimisticTransactionDB* txn_db;
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(
options, db_path, &txn_db);
if (!status.ok()) {
std::cerr << "Open failed: " << status.ToString() << std::endl;
return;
}
// 创建乐观事务
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 执行操作(不立即获取锁)
txn->Put("key1", "value1");
txn->Put("key2", "value2");
// 提交时检测冲突
status = txn->Commit();
if (status.ok()) {
std::cout << "Optimistic transaction committed!" << std::endl;
} else if (status.IsBusy()) {
std::cout << "Transaction conflict detected!" << std::endl;
// 可以重试
}
delete txn;
delete txn_db;
}
4.2 乐观事务重试 #
cpp
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <iostream>
bool OptimisticTransactionWithRetry(
rocksdb::OptimisticTransactionDB* txn_db,
std::function<void(rocksdb::Transaction*)> operations,
int max_retries = 3) {
for (int i = 0; i < max_retries; i++) {
rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
// 执行操作
operations(txn);
// 尝试提交
rocksdb::Status status = txn->Commit();
if (status.ok()) {
delete txn;
return true;
}
if (status.IsBusy()) {
std::cout << "Retry " << (i + 1) << " due to conflict" << std::endl;
delete txn;
continue;
}
// 其他错误
std::cerr << "Error: " << status.ToString() << std::endl;
delete txn;
return false;
}
return false;
}
// 使用示例
void DemoOptimisticRetry(rocksdb::OptimisticTransactionDB* txn_db) {
bool success = OptimisticTransactionWithRetry(txn_db, [](rocksdb::Transaction* txn) {
std::string value;
txn->Get(rocksdb::ReadOptions(), "counter", &value);
int counter = value.empty() ? 0 : std::stoi(value);
txn->Put("counter", std::to_string(counter + 1));
});
if (success) {
std::cout << "Transaction succeeded!" << std::endl;
}
}
五、事务选项 #
5.1 TransactionOptions #
cpp
#include <rocksdb/utilities/transaction_db.h>
rocksdb::Transaction* CreateTransactionWithOptions(
rocksdb::TransactionDB* txn_db) {
rocksdb::WriteOptions write_options;
write_options.sync = true;
rocksdb::TransactionOptions txn_options;
txn_options.deadline = std::chrono::system_clock::now() +
std::chrono::seconds(30); // 30秒超时
txn_options.lock_timeout = 5000; // 锁超时5秒
txn_options.expiration = std::chrono::system_clock::now() +
std::chrono::seconds(60); // 60秒过期
return txn_db->BeginTransaction(write_options, txn_options);
}
5.2 设置快照 #
cpp
#include <rocksdb/utilities/transaction_db.h>
void TransactionWithSnapshot(rocksdb::TransactionDB* txn_db) {
rocksdb::TransactionOptions txn_options;
txn_options.set_snapshot = true; // 创建快照
rocksdb::Transaction* txn = txn_db->BeginTransaction(
rocksdb::WriteOptions(), txn_options);
// 获取快照
const rocksdb::Snapshot* snapshot = txn->GetSnapshot();
// 使用快照读取
rocksdb::ReadOptions read_options;
read_options.snapshot = snapshot;
std::string value;
txn->Get(read_options, "key1", &value);
// 提交
txn->Commit();
delete txn;
}
六、实际应用示例 #
6.1 银行转账 #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>
class BankService {
public:
BankService(rocksdb::TransactionDB* db) : db_(db) {}
bool Transfer(const std::string& from_account,
const std::string& to_account,
int64_t amount) {
rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
// 锁定并读取两个账户
std::string from_balance_str, to_balance_str;
rocksdb::Status status;
status = txn->GetForUpdate(rocksdb::ReadOptions(),
from_account, &from_balance_str);
if (!status.ok()) {
txn->Rollback();
delete txn;
return false;
}
status = txn->GetForUpdate(rocksdb::ReadOptions(),
to_account, &to_balance_str);
if (!status.ok()) {
txn->Rollback();
delete txn;
return false;
}
int64_t from_balance = std::stoll(from_balance_str);
int64_t to_balance = std::stoll(to_balance_str);
// 检查余额
if (from_balance < amount) {
txn->Rollback();
delete txn;
return false;
}
// 更新余额
txn->Put(from_account, std::to_string(from_balance - amount));
txn->Put(to_account, std::to_string(to_balance + amount));
// 提交事务
status = txn->Commit();
delete txn;
return status.ok();
}
private:
rocksdb::TransactionDB* db_;
};
6.2 库存管理 #
cpp
#include <rocksdb/utilities/transaction_db.h>
#include <string>
class InventoryService {
public:
InventoryService(rocksdb::TransactionDB* db) : db_(db) {}
bool DeductStock(const std::string& product_id, int quantity) {
rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
std::string stock_str;
rocksdb::Status status = txn->GetForUpdate(
rocksdb::ReadOptions(), "stock:" + product_id, &stock_str);
if (!status.ok()) {
txn->Rollback();
delete txn;
return false;
}
int stock = std::stoi(stock_str);
if (stock < quantity) {
txn->Rollback();
delete txn;
return false;
}
txn->Put("stock:" + product_id, std::to_string(stock - quantity));
status = txn->Commit();
delete txn;
return status.ok();
}
bool AddStock(const std::string& product_id, int quantity) {
rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
std::string stock_str;
rocksdb::Status status = txn->GetForUpdate(
rocksdb::ReadOptions(), "stock:" + product_id, &stock_str);
int stock = status.ok() ? std::stoi(stock_str) : 0;
txn->Put("stock:" + product_id, std::to_string(stock + quantity));
status = txn->Commit();
delete txn;
return status.ok();
}
private:
rocksdb::TransactionDB* db_;
};
七、事务最佳实践 #
7.1 事务设计建议 #
| 建议 | 说明 |
|---|---|
| 保持短事务 | 减少锁持有时间 |
| 按顺序访问键 | 避免死锁 |
| 设置超时 | 防止无限等待 |
| 处理冲突 | 实现重试机制 |
| 合理选择类型 | 根据冲突率选择事务类型 |
7.2 死锁预防 #
cpp
// 死锁预防策略:
// 1. 按固定顺序访问键
// 2. 设置锁超时
// 3. 使用乐观事务(无锁)
void DeadlockFreeTransfer(rocksdb::TransactionDB* db,
const std::string& account1,
const std::string& account2,
int64_t amount) {
rocksdb::Transaction* txn = db->BeginTransaction(rocksdb::WriteOptions());
txn->SetLockTimeout(5000); // 5秒超时
// 按字典序锁定账户,避免死锁
std::string first = std::min(account1, account2);
std::string second = std::max(account1, account2);
std::string value;
txn->GetForUpdate(rocksdb::ReadOptions(), first, &value);
txn->GetForUpdate(rocksdb::ReadOptions(), second, &value);
// 执行转账逻辑...
txn->Commit();
delete txn;
}
八、总结 #
8.1 事务API速查 #
| 操作 | 方法 | 说明 |
|---|---|---|
| 开始事务 | BeginTransaction(WriteOptions()) |
创建新事务 |
| 写入 | txn->Put(key, value) |
事务内写入 |
| 读取 | txn->Get(ReadOptions(), key, &value) |
事务内读取 |
| 锁定读取 | txn->GetForUpdate(ReadOptions(), key, &value) |
读取并锁定 |
| 删除 | txn->Delete(key) |
事务内删除 |
| 提交 | txn->Commit() |
提交事务 |
| 回滚 | txn->Rollback() |
回滚事务 |
8.2 关键要点 #
- 选择事务类型:根据冲突率选择TransactionDB或OptimisticTransactionDB
- 使用GetForUpdate:读取需要更新的数据时锁定
- 设置超时:避免无限等待
- 保持短事务:减少锁持有时间
- 处理冲突:实现重试机制
下一步,让我们学习备份恢复!
最后更新:2026-03-27