RocksDB事务 #

一、事务概述 #

1.1 RocksDB事务特性 #

RocksDB提供了简单的事务支持,具有以下特性:

text
事务特性:
├── 原子性(Atomicity) - 全部成功或全部失败
├── 一致性(Consistency) - 数据状态一致
├── 隔离性(Isolation) - 支持快照隔离
└── 持久性(Durability) - WAL保证持久化

1.2 事务类型 #

类型 说明 适用场景
TransactionDB 支持事务和并发控制 需要ACID保证
OptimisticTransactionDB 乐观锁,提交时检测冲突 冲突较少的场景

1.3 基本使用 #

cpp
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>

int main() {
    rocksdb::TransactionDB* txn_db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    rocksdb::TransactionDBOptions txn_options;
    
    rocksdb::Status status = rocksdb::TransactionDB::Open(
        options, txn_options, "/tmp/txn_db", &txn_db);
    
    if (!status.ok()) {
        std::cerr << "Open failed: " << status.ToString() << std::endl;
        return 1;
    }
    
    // 创建事务
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    // 在事务中执行操作
    txn->Put("key1", "value1");
    txn->Put("key2", "value2");
    txn->Delete("key3");
    
    // 提交事务
    status = txn->Commit();
    
    if (status.ok()) {
        std::cout << "Transaction committed successfully!" << std::endl;
    } else {
        std::cerr << "Commit failed: " << status.ToString() << std::endl;
    }
    
    delete txn;
    delete txn_db;
    return 0;
}

二、TransactionDB #

2.1 创建TransactionDB #

cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/transaction_db.h>

rocksdb::TransactionDB* CreateTransactionDB(const std::string& db_path) {
    rocksdb::Options options;
    options.create_if_missing = true;
    
    // 事务数据库选项
    rocksdb::TransactionDBOptions txn_options;
    txn_options.max_num_locks = 1000000;        // 最大锁数量
    txn_options.num_stripes = 16;               // 锁分片数
    txn_options.transaction_lock_timeout = 1000; // 锁超时(毫秒)
    txn_options.default_lock_timeout = 1000;    // 默认锁超时
    
    rocksdb::TransactionDB* txn_db;
    rocksdb::Status status = rocksdb::TransactionDB::Open(
        options, txn_options, db_path, &txn_db);
    
    return txn_db;
}

2.2 事务基本操作 #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>

void BasicTransaction(rocksdb::TransactionDB* txn_db) {
    // 开始事务
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    // 写入数据
    rocksdb::Status status = txn->Put("key1", "value1");
    if (!status.ok()) {
        std::cerr << "Put failed: " << status.ToString() << std::endl;
        delete txn;
        return;
    }
    
    // 读取数据
    std::string value;
    status = txn->Get(rocksdb::ReadOptions(), "key1", &value);
    if (status.ok()) {
        std::cout << "Read in transaction: " << value << std::endl;
    }
    
    // 删除数据
    status = txn->Delete("key2");
    
    // 提交事务
    status = txn->Commit();
    if (status.ok()) {
        std::cout << "Transaction committed!" << std::endl;
    } else {
        std::cerr << "Commit failed: " << status.ToString() << std::endl;
    }
    
    delete txn;
}

2.3 事务回滚 #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>

void TransactionRollback(rocksdb::TransactionDB* txn_db) {
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    txn->Put("key1", "value1");
    txn->Put("key2", "value2");
    
    // 检查某些条件
    bool should_rollback = true;  // 某种条件判断
    
    if (should_rollback) {
        // 回滚事务
        txn->Rollback();
        std::cout << "Transaction rolled back!" << std::endl;
    } else {
        txn->Commit();
    }
    
    delete txn;
}

三、锁机制 #

3.1 GetForUpdate #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>

// GetForUpdate: 读取并锁定键,防止其他事务修改
void GetForUpdateExample(rocksdb::TransactionDB* txn_db) {
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    // 读取并锁定
    std::string value;
    rocksdb::Status status = txn->GetForUpdate(
        rocksdb::ReadOptions(), "counter", &value);
    
    if (status.ok()) {
        int counter = std::stoi(value);
        counter++;
        
        // 更新值
        txn->Put("counter", std::to_string(counter));
    } else if (status.IsNotFound()) {
        // 键不存在,初始化
        txn->Put("counter", "1");
    }
    
    // 提交事务
    status = txn->Commit();
    
    if (!status.ok()) {
        std::cerr << "Commit failed: " << status.ToString() << std::endl;
    }
    
    delete txn;
}

3.2 批量锁定 #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <vector>

void MultiGetForUpdate(rocksdb::TransactionDB* txn_db,
                       const std::vector<std::string>& keys) {
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    // 批量读取并锁定
    std::vector<rocksdb::Slice> key_slices;
    std::vector<std::string> values(keys.size());
    std::vector<rocksdb::Status> statuses;
    
    for (const auto& key : keys) {
        key_slices.push_back(key);
    }
    
    statuses = txn->MultiGetForUpdate(
        rocksdb::ReadOptions(), key_slices, &values);
    
    // 处理结果
    for (size_t i = 0; i < keys.size(); i++) {
        if (statuses[i].ok()) {
            std::cout << keys[i] << " = " << values[i] << std::endl;
        }
    }
    
    // 执行更新
    // ...
    
    txn->Commit();
    delete txn;
}

3.3 设置锁超时 #

cpp
#include <rocksdb/utilities/transaction_db.h>

void SetLockTimeout(rocksdb::TransactionDB* txn_db) {
    rocksdb::WriteOptions write_options;
    
    rocksdb::Transaction* txn = txn_db->BeginTransaction(write_options);
    
    // 设置事务锁超时(毫秒)
    txn->SetLockTimeout(5000);  // 5秒
    
    // 或者设置特定键的锁超时
    // txn->SetTimeout(5000);
    
    // 执行操作...
    
    txn->Commit();
    delete txn;
}

四、乐观事务 #

4.1 OptimisticTransactionDB #

cpp
#include <rocksdb/db.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <iostream>

void OptimisticTransactionExample(const std::string& db_path) {
    rocksdb::OptimisticTransactionDB* txn_db;
    rocksdb::Options options;
    options.create_if_missing = true;
    
    rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(
        options, db_path, &txn_db);
    
    if (!status.ok()) {
        std::cerr << "Open failed: " << status.ToString() << std::endl;
        return;
    }
    
    // 创建乐观事务
    rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
    
    // 执行操作(不立即获取锁)
    txn->Put("key1", "value1");
    txn->Put("key2", "value2");
    
    // 提交时检测冲突
    status = txn->Commit();
    
    if (status.ok()) {
        std::cout << "Optimistic transaction committed!" << std::endl;
    } else if (status.IsBusy()) {
        std::cout << "Transaction conflict detected!" << std::endl;
        // 可以重试
    }
    
    delete txn;
    delete txn_db;
}

4.2 乐观事务重试 #

cpp
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <iostream>

bool OptimisticTransactionWithRetry(
    rocksdb::OptimisticTransactionDB* txn_db,
    std::function<void(rocksdb::Transaction*)> operations,
    int max_retries = 3) {
    
    for (int i = 0; i < max_retries; i++) {
        rocksdb::Transaction* txn = txn_db->BeginTransaction(rocksdb::WriteOptions());
        
        // 执行操作
        operations(txn);
        
        // 尝试提交
        rocksdb::Status status = txn->Commit();
        
        if (status.ok()) {
            delete txn;
            return true;
        }
        
        if (status.IsBusy()) {
            std::cout << "Retry " << (i + 1) << " due to conflict" << std::endl;
            delete txn;
            continue;
        }
        
        // 其他错误
        std::cerr << "Error: " << status.ToString() << std::endl;
        delete txn;
        return false;
    }
    
    return false;
}

// 使用示例
void DemoOptimisticRetry(rocksdb::OptimisticTransactionDB* txn_db) {
    bool success = OptimisticTransactionWithRetry(txn_db, [](rocksdb::Transaction* txn) {
        std::string value;
        txn->Get(rocksdb::ReadOptions(), "counter", &value);
        
        int counter = value.empty() ? 0 : std::stoi(value);
        txn->Put("counter", std::to_string(counter + 1));
    });
    
    if (success) {
        std::cout << "Transaction succeeded!" << std::endl;
    }
}

五、事务选项 #

5.1 TransactionOptions #

cpp
#include <rocksdb/utilities/transaction_db.h>

rocksdb::Transaction* CreateTransactionWithOptions(
    rocksdb::TransactionDB* txn_db) {
    
    rocksdb::WriteOptions write_options;
    write_options.sync = true;
    
    rocksdb::TransactionOptions txn_options;
    txn_options.deadline = std::chrono::system_clock::now() + 
                           std::chrono::seconds(30);  // 30秒超时
    txn_options.lock_timeout = 5000;  // 锁超时5秒
    txn_options.expiration = std::chrono::system_clock::now() + 
                             std::chrono::seconds(60);  // 60秒过期
    
    return txn_db->BeginTransaction(write_options, txn_options);
}

5.2 设置快照 #

cpp
#include <rocksdb/utilities/transaction_db.h>

void TransactionWithSnapshot(rocksdb::TransactionDB* txn_db) {
    rocksdb::TransactionOptions txn_options;
    txn_options.set_snapshot = true;  // 创建快照
    
    rocksdb::Transaction* txn = txn_db->BeginTransaction(
        rocksdb::WriteOptions(), txn_options);
    
    // 获取快照
    const rocksdb::Snapshot* snapshot = txn->GetSnapshot();
    
    // 使用快照读取
    rocksdb::ReadOptions read_options;
    read_options.snapshot = snapshot;
    
    std::string value;
    txn->Get(read_options, "key1", &value);
    
    // 提交
    txn->Commit();
    delete txn;
}

六、实际应用示例 #

6.1 银行转账 #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <iostream>

class BankService {
public:
    BankService(rocksdb::TransactionDB* db) : db_(db) {}
    
    bool Transfer(const std::string& from_account,
                  const std::string& to_account,
                  int64_t amount) {
        
        rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
        
        // 锁定并读取两个账户
        std::string from_balance_str, to_balance_str;
        rocksdb::Status status;
        
        status = txn->GetForUpdate(rocksdb::ReadOptions(), 
                                   from_account, &from_balance_str);
        if (!status.ok()) {
            txn->Rollback();
            delete txn;
            return false;
        }
        
        status = txn->GetForUpdate(rocksdb::ReadOptions(), 
                                   to_account, &to_balance_str);
        if (!status.ok()) {
            txn->Rollback();
            delete txn;
            return false;
        }
        
        int64_t from_balance = std::stoll(from_balance_str);
        int64_t to_balance = std::stoll(to_balance_str);
        
        // 检查余额
        if (from_balance < amount) {
            txn->Rollback();
            delete txn;
            return false;
        }
        
        // 更新余额
        txn->Put(from_account, std::to_string(from_balance - amount));
        txn->Put(to_account, std::to_string(to_balance + amount));
        
        // 提交事务
        status = txn->Commit();
        delete txn;
        
        return status.ok();
    }

private:
    rocksdb::TransactionDB* db_;
};

6.2 库存管理 #

cpp
#include <rocksdb/utilities/transaction_db.h>
#include <string>

class InventoryService {
public:
    InventoryService(rocksdb::TransactionDB* db) : db_(db) {}
    
    bool DeductStock(const std::string& product_id, int quantity) {
        rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
        
        std::string stock_str;
        rocksdb::Status status = txn->GetForUpdate(
            rocksdb::ReadOptions(), "stock:" + product_id, &stock_str);
        
        if (!status.ok()) {
            txn->Rollback();
            delete txn;
            return false;
        }
        
        int stock = std::stoi(stock_str);
        
        if (stock < quantity) {
            txn->Rollback();
            delete txn;
            return false;
        }
        
        txn->Put("stock:" + product_id, std::to_string(stock - quantity));
        
        status = txn->Commit();
        delete txn;
        
        return status.ok();
    }
    
    bool AddStock(const std::string& product_id, int quantity) {
        rocksdb::Transaction* txn = db_->BeginTransaction(rocksdb::WriteOptions());
        
        std::string stock_str;
        rocksdb::Status status = txn->GetForUpdate(
            rocksdb::ReadOptions(), "stock:" + product_id, &stock_str);
        
        int stock = status.ok() ? std::stoi(stock_str) : 0;
        
        txn->Put("stock:" + product_id, std::to_string(stock + quantity));
        
        status = txn->Commit();
        delete txn;
        
        return status.ok();
    }

private:
    rocksdb::TransactionDB* db_;
};

七、事务最佳实践 #

7.1 事务设计建议 #

建议 说明
保持短事务 减少锁持有时间
按顺序访问键 避免死锁
设置超时 防止无限等待
处理冲突 实现重试机制
合理选择类型 根据冲突率选择事务类型

7.2 死锁预防 #

cpp
// 死锁预防策略:
// 1. 按固定顺序访问键
// 2. 设置锁超时
// 3. 使用乐观事务(无锁)

void DeadlockFreeTransfer(rocksdb::TransactionDB* db,
                          const std::string& account1,
                          const std::string& account2,
                          int64_t amount) {
    
    rocksdb::Transaction* txn = db->BeginTransaction(rocksdb::WriteOptions());
    txn->SetLockTimeout(5000);  // 5秒超时
    
    // 按字典序锁定账户,避免死锁
    std::string first = std::min(account1, account2);
    std::string second = std::max(account1, account2);
    
    std::string value;
    txn->GetForUpdate(rocksdb::ReadOptions(), first, &value);
    txn->GetForUpdate(rocksdb::ReadOptions(), second, &value);
    
    // 执行转账逻辑...
    
    txn->Commit();
    delete txn;
}

八、总结 #

8.1 事务API速查 #

操作 方法 说明
开始事务 BeginTransaction(WriteOptions()) 创建新事务
写入 txn->Put(key, value) 事务内写入
读取 txn->Get(ReadOptions(), key, &value) 事务内读取
锁定读取 txn->GetForUpdate(ReadOptions(), key, &value) 读取并锁定
删除 txn->Delete(key) 事务内删除
提交 txn->Commit() 提交事务
回滚 txn->Rollback() 回滚事务

8.2 关键要点 #

  1. 选择事务类型:根据冲突率选择TransactionDB或OptimisticTransactionDB
  2. 使用GetForUpdate:读取需要更新的数据时锁定
  3. 设置超时:避免无限等待
  4. 保持短事务:减少锁持有时间
  5. 处理冲突:实现重试机制

下一步,让我们学习备份恢复!

最后更新:2026-03-27