Couchbase事务 #

一、事务概述 #

1.1 什么是事务 #

事务是一组操作的逻辑单元,具有ACID特性:

特性 说明
Atomicity(原子性) 全部成功或全部失败
Consistency(一致性) 数据保持一致状态
Isolation(隔离性) 事务之间互不影响
Durability(持久性) 提交后永久保存

1.2 Couchbase事务特点 #

text
特点:
- 多文档ACID事务
- 支持跨Bucket事务
- 乐观锁机制
- 自动重试
- 超时控制

1.3 事务适用场景 #

text
适用场景:
- 银行转账
- 订单创建与库存扣减
- 多文档原子更新
- 复杂业务流程

不适用场景:
- 单文档操作(使用CAS即可)
- 大批量数据操作
- 长时间运行的操作

二、N1QL事务 #

2.1 BEGIN TRANSACTION #

sql
BEGIN TRANSACTION;

SELECT * FROM `my-bucket`.`_default`.`_default` 
WHERE META().id = 'user::001';

UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';

UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance + 100
WHERE META().id = 'user::002';

COMMIT;

2.2 ROLLBACK #

sql
BEGIN TRANSACTION;

UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';

ROLLBACK;

2.3 SAVEPOINT #

sql
BEGIN TRANSACTION;

UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';

SAVEPOINT transfer_from_done;

UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance + 100
WHERE META().id = 'user::002';

ROLLBACK TO SAVEPOINT transfer_from_done;

COMMIT;

三、SDK事务 #

3.1 Python SDK #

python
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.transactions import Transactions
from couchbase.transactions.config import TransactionsConfig
from couchbase.transactions.error import TransactionFailed

cluster = Cluster(
    'couchbase://localhost',
    ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)

bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()

transactions = Transactions(cluster, TransactionsConfig())

def transfer_money(from_id, to_id, amount):
    def txn_logic(ctx):
        from_doc = ctx.get(collection, from_id)
        to_doc = ctx.get(collection, to_id)
        
        if from_doc.content['balance'] < amount:
            raise Exception('余额不足')
        
        ctx.replace(
            from_doc,
            {**from_doc.content, 'balance': from_doc.content['balance'] - amount}
        )
        
        ctx.replace(
            to_doc,
            {**to_doc.content, 'balance': to_doc.content['balance'] + amount}
        )
    
    try:
        transactions.run(txn_logic)
        print('转账成功')
    except TransactionFailed as e:
        print(f'转账失败: {e}')

transfer_money('user::001', 'user::002', 100)

3.2 Node.js SDK #

javascript
const couchbase = require('couchbase');

const cluster = new couchbase.Cluster('couchbase://localhost', {
    username: 'Administrator',
    password: 'password'
});

const bucket = cluster.bucket('my-bucket');
const collection = bucket.defaultCollection();

async function transferMoney(fromId, toId, amount) {
    try {
        await cluster.transactions().run(async (ctx) => {
            const fromDoc = await ctx.get(collection, fromId);
            const toDoc = await ctx.get(collection, toId);
            
            if (fromDoc.content.balance < amount) {
                throw new Error('余额不足');
            }
            
            await ctx.replace(fromDoc, {
                ...fromDoc.content,
                balance: fromDoc.content.balance - amount
            });
            
            await ctx.replace(toDoc, {
                ...toDoc.content,
                balance: toDoc.content.balance + amount
            });
        });
        
        console.log('转账成功');
    } catch (error) {
        console.log('转账失败:', error);
    }
}

transferMoney('user::001', 'user::002', 100);

3.3 Java SDK #

java
import com.couchbase.client.java.*;
import com.couchbase.client.java.json.*;
import com.couchbase.client.java.transactions.*;
import com.couchbase.client.java.transactions.config.*;
import com.couchbase.client.java.transactions.error.*;

Cluster cluster = Cluster.connect(
    "localhost",
    ClusterOptions.clusterOptions("Administrator", "password")
);

Bucket bucket = cluster.bucket("my-bucket");
Collection collection = bucket.defaultCollection();

Transactions transactions = cluster.transactions(
    TransactionsConfig.transactionsConfig()
);

public void transferMoney(String fromId, String toId, int amount) {
    try {
        transactions.run(ctx -> {
            GetResult fromDoc = ctx.get(collection, fromId);
            GetResult toDoc = ctx.get(collection, toId);
            
            JsonObject fromContent = fromDoc.contentAsObject();
            JsonObject toContent = toDoc.contentAsObject();
            
            if (fromContent.getInt("balance") < amount) {
                throw new RuntimeException("余额不足");
            }
            
            ctx.replace(fromDoc, fromContent.put("balance", 
                fromContent.getInt("balance") - amount));
            
            ctx.replace(toDoc, toContent.put("balance", 
                toContent.getInt("balance") + amount));
        });
        
        System.out.println("转账成功");
    } catch (TransactionFailed e) {
        System.out.println("转账失败: " + e);
    }
}

四、事务操作 #

4.1 插入操作 #

python
def create_order_with_items(order_id, items):
    def txn_logic(ctx):
        order_doc = {
            'type': 'order',
            'id': order_id,
            'items': [item['id'] for item in items],
            'status': 'pending',
            'created_at': NOW_STR()
        }
        ctx.insert(collection, order_id, order_doc)
        
        for item in items:
            item_doc = {
                'type': 'order_item',
                'id': item['id'],
                'order_id': order_id,
                'product_id': item['product_id'],
                'quantity': item['quantity'],
                'price': item['price']
            }
            ctx.insert(collection, item['id'], item_doc)
    
    transactions.run(txn_logic)

4.2 更新操作 #

python
def update_order_status(order_id, new_status):
    def txn_logic(ctx):
        order_doc = ctx.get(collection, order_id)
        
        ctx.replace(order_doc, {
            **order_doc.content,
            'status': new_status,
            'updated_at': NOW_STR()
        })
        
        if new_status == 'cancelled':
            items = ctx.query('''
                SELECT META().id 
                FROM `my-bucket`.`_default`.`_default`
                WHERE type = 'order_item' AND order_id = $order_id
            ''', {'order_id': order_id})
            
            for item in items.rows():
                ctx.remove(ctx.get(collection, item['id']))
    
    transactions.run(txn_logic)

4.3 删除操作 #

python
def delete_user_with_data(user_id):
    def txn_logic(ctx):
        ctx.remove(ctx.get(collection, user_id))
        
        orders = ctx.query('''
            SELECT META().id 
            FROM `my-bucket`.`_default`.`_default`
            WHERE type = 'order' AND user_id = $user_id
        ''', {'user_id': user_id})
        
        for order in orders.rows():
            ctx.remove(ctx.get(collection, order['id']))
    
    transactions.run(txn_logic)

五、事务配置 #

5.1 超时配置 #

python
from couchbase.transactions.config import TransactionsConfig
from datetime import timedelta

config = TransactionsConfig(
    timeout=timedelta(seconds=60),
    durability_level='majority'
)

transactions = Transactions(cluster, config)

5.2 重试配置 #

python
config = TransactionsConfig(
    timeout=timedelta(seconds=60),
    num_attempts=10,
    cleanup_window=timedelta(seconds=120)
)

5.3 持久化级别 #

python
from couchbase.transactions.config import DurabilityLevel

config = TransactionsConfig(
    durability_level=DurabilityLevel.MAJORITY
)

持久化级别:

级别 说明
NONE 不等待持久化
MAJORITY 等待多数节点确认
MAJORITY_AND_PERSIST_TO_ACTIVE 多数节点确认并持久化到主节点
PERSIST_TO_MAJORITY 持久化到多数节点

六、事务错误处理 #

6.1 错误类型 #

python
from couchbase.transactions.error import (
    TransactionFailed,
    TransactionExpired,
    TransactionCommitAmbiguous
)

try:
    transactions.run(txn_logic)
except TransactionFailed as e:
    print(f'事务失败: {e}')
except TransactionExpired as e:
    print(f'事务超时: {e}')
except TransactionCommitAmbiguous as e:
    print(f'事务提交状态不确定: {e}')

6.2 重试策略 #

python
import time

def run_with_retry(txn_logic, max_retries=3):
    for attempt in range(max_retries):
        try:
            transactions.run(txn_logic)
            return True
        except TransactionFailed as e:
            if attempt < max_retries - 1:
                time.sleep(0.1 * (attempt + 1))
                continue
            raise
    return False

6.3 幂等性设计 #

python
def transfer_money_idempotent(transaction_id, from_id, to_id, amount):
    def txn_logic(ctx):
        txn_record = ctx.get(collection, f'txn::{transaction_id}')
        if txn_record:
            return
        
        from_doc = ctx.get(collection, from_id)
        to_doc = ctx.get(collection, to_id)
        
        ctx.replace(from_doc, {
            **from_doc.content,
            'balance': from_doc.content['balance'] - amount
        })
        
        ctx.replace(to_doc, {
            **to_doc.content,
            'balance': to_doc.content['balance'] + amount
        })
        
        ctx.insert(collection, f'txn::{transaction_id}', {
            'type': 'transaction_record',
            'from': from_id,
            'to': to_id,
            'amount': amount,
            'created_at': NOW_STR()
        })
    
    transactions.run(txn_logic)

七、事务最佳实践 #

7.1 保持事务简短 #

python
def good_transaction():
    def txn_logic(ctx):
        doc1 = ctx.get(collection, 'doc1')
        doc2 = ctx.get(collection, 'doc2')
        ctx.replace(doc1, {'value': doc1.content['value'] + 1})
        ctx.replace(doc2, {'value': doc2.content['value'] - 1})
    
    transactions.run(txn_logic)

def bad_transaction():
    def txn_logic(ctx):
        doc1 = ctx.get(collection, 'doc1')
        time.sleep(10)
        doc2 = ctx.get(collection, 'doc2')
        ctx.replace(doc1, {'value': doc1.content['value'] + 1})
        ctx.replace(doc2, {'value': doc2.content['value'] - 1})
    
    transactions.run(txn_logic)

7.2 避免热点文档 #

python
def distribute_hot_key(base_key, num_shards):
    shard = hash(base_key) % num_shards
    return f'{base_key}::shard::{shard}'

7.3 合理设置超时 #

python
config = TransactionsConfig(
    timeout=timedelta(seconds=30)
)

def complex_transaction():
    config = TransactionsConfig(
        timeout=timedelta(seconds=120)
    )
    transactions = Transactions(cluster, config)
    transactions.run(txn_logic)

八、事务监控 #

8.1 事务日志 #

python
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('couchbase.transactions')

def txn_logic(ctx):
    logger.debug('开始事务')
    doc = ctx.get(collection, 'doc1')
    logger.debug(f'读取文档: {doc.content}')
    ctx.replace(doc, {'value': doc.content['value'] + 1})
    logger.debug('更新文档')

8.2 性能指标 #

python
import time

def measure_transaction(txn_logic):
    start = time.time()
    try:
        transactions.run(txn_logic)
        duration = time.time() - start
        print(f'事务耗时: {duration:.3f}s')
    except TransactionFailed as e:
        duration = time.time() - start
        print(f'事务失败,耗时: {duration:.3f}s,错误: {e}')

九、总结 #

事务要点:

操作 说明
BEGIN 开始事务
COMMIT 提交事务
ROLLBACK 回滚事务
SAVEPOINT 设置保存点

最佳实践:

  1. 保持事务简短
  2. 合理设置超时
  3. 设计幂等操作
  4. 处理好并发冲突
  5. 监控事务性能

下一步,让我们学习复制与故障转移!

最后更新:2026-03-27