Couchbase事务 #
一、事务概述 #
1.1 什么是事务 #
事务是一组操作的逻辑单元,具有ACID特性:
| 特性 | 说明 |
|---|---|
| Atomicity(原子性) | 全部成功或全部失败 |
| Consistency(一致性) | 数据保持一致状态 |
| Isolation(隔离性) | 事务之间互不影响 |
| Durability(持久性) | 提交后永久保存 |
1.2 Couchbase事务特点 #
text
特点:
- 多文档ACID事务
- 支持跨Bucket事务
- 乐观锁机制
- 自动重试
- 超时控制
1.3 事务适用场景 #
text
适用场景:
- 银行转账
- 订单创建与库存扣减
- 多文档原子更新
- 复杂业务流程
不适用场景:
- 单文档操作(使用CAS即可)
- 大批量数据操作
- 长时间运行的操作
二、N1QL事务 #
2.1 BEGIN TRANSACTION #
sql
BEGIN TRANSACTION;
SELECT * FROM `my-bucket`.`_default`.`_default`
WHERE META().id = 'user::001';
UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';
UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance + 100
WHERE META().id = 'user::002';
COMMIT;
2.2 ROLLBACK #
sql
BEGIN TRANSACTION;
UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';
ROLLBACK;
2.3 SAVEPOINT #
sql
BEGIN TRANSACTION;
UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance - 100
WHERE META().id = 'user::001';
SAVEPOINT transfer_from_done;
UPDATE `my-bucket`.`_default`.`_default`
SET balance = balance + 100
WHERE META().id = 'user::002';
ROLLBACK TO SAVEPOINT transfer_from_done;
COMMIT;
三、SDK事务 #
3.1 Python SDK #
python
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.transactions import Transactions
from couchbase.transactions.config import TransactionsConfig
from couchbase.transactions.error import TransactionFailed
cluster = Cluster(
'couchbase://localhost',
ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)
bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()
transactions = Transactions(cluster, TransactionsConfig())
def transfer_money(from_id, to_id, amount):
def txn_logic(ctx):
from_doc = ctx.get(collection, from_id)
to_doc = ctx.get(collection, to_id)
if from_doc.content['balance'] < amount:
raise Exception('余额不足')
ctx.replace(
from_doc,
{**from_doc.content, 'balance': from_doc.content['balance'] - amount}
)
ctx.replace(
to_doc,
{**to_doc.content, 'balance': to_doc.content['balance'] + amount}
)
try:
transactions.run(txn_logic)
print('转账成功')
except TransactionFailed as e:
print(f'转账失败: {e}')
transfer_money('user::001', 'user::002', 100)
3.2 Node.js SDK #
javascript
const couchbase = require('couchbase');
const cluster = new couchbase.Cluster('couchbase://localhost', {
username: 'Administrator',
password: 'password'
});
const bucket = cluster.bucket('my-bucket');
const collection = bucket.defaultCollection();
async function transferMoney(fromId, toId, amount) {
try {
await cluster.transactions().run(async (ctx) => {
const fromDoc = await ctx.get(collection, fromId);
const toDoc = await ctx.get(collection, toId);
if (fromDoc.content.balance < amount) {
throw new Error('余额不足');
}
await ctx.replace(fromDoc, {
...fromDoc.content,
balance: fromDoc.content.balance - amount
});
await ctx.replace(toDoc, {
...toDoc.content,
balance: toDoc.content.balance + amount
});
});
console.log('转账成功');
} catch (error) {
console.log('转账失败:', error);
}
}
transferMoney('user::001', 'user::002', 100);
3.3 Java SDK #
java
import com.couchbase.client.java.*;
import com.couchbase.client.java.json.*;
import com.couchbase.client.java.transactions.*;
import com.couchbase.client.java.transactions.config.*;
import com.couchbase.client.java.transactions.error.*;
Cluster cluster = Cluster.connect(
"localhost",
ClusterOptions.clusterOptions("Administrator", "password")
);
Bucket bucket = cluster.bucket("my-bucket");
Collection collection = bucket.defaultCollection();
Transactions transactions = cluster.transactions(
TransactionsConfig.transactionsConfig()
);
public void transferMoney(String fromId, String toId, int amount) {
try {
transactions.run(ctx -> {
GetResult fromDoc = ctx.get(collection, fromId);
GetResult toDoc = ctx.get(collection, toId);
JsonObject fromContent = fromDoc.contentAsObject();
JsonObject toContent = toDoc.contentAsObject();
if (fromContent.getInt("balance") < amount) {
throw new RuntimeException("余额不足");
}
ctx.replace(fromDoc, fromContent.put("balance",
fromContent.getInt("balance") - amount));
ctx.replace(toDoc, toContent.put("balance",
toContent.getInt("balance") + amount));
});
System.out.println("转账成功");
} catch (TransactionFailed e) {
System.out.println("转账失败: " + e);
}
}
四、事务操作 #
4.1 插入操作 #
python
def create_order_with_items(order_id, items):
def txn_logic(ctx):
order_doc = {
'type': 'order',
'id': order_id,
'items': [item['id'] for item in items],
'status': 'pending',
'created_at': NOW_STR()
}
ctx.insert(collection, order_id, order_doc)
for item in items:
item_doc = {
'type': 'order_item',
'id': item['id'],
'order_id': order_id,
'product_id': item['product_id'],
'quantity': item['quantity'],
'price': item['price']
}
ctx.insert(collection, item['id'], item_doc)
transactions.run(txn_logic)
4.2 更新操作 #
python
def update_order_status(order_id, new_status):
def txn_logic(ctx):
order_doc = ctx.get(collection, order_id)
ctx.replace(order_doc, {
**order_doc.content,
'status': new_status,
'updated_at': NOW_STR()
})
if new_status == 'cancelled':
items = ctx.query('''
SELECT META().id
FROM `my-bucket`.`_default`.`_default`
WHERE type = 'order_item' AND order_id = $order_id
''', {'order_id': order_id})
for item in items.rows():
ctx.remove(ctx.get(collection, item['id']))
transactions.run(txn_logic)
4.3 删除操作 #
python
def delete_user_with_data(user_id):
def txn_logic(ctx):
ctx.remove(ctx.get(collection, user_id))
orders = ctx.query('''
SELECT META().id
FROM `my-bucket`.`_default`.`_default`
WHERE type = 'order' AND user_id = $user_id
''', {'user_id': user_id})
for order in orders.rows():
ctx.remove(ctx.get(collection, order['id']))
transactions.run(txn_logic)
五、事务配置 #
5.1 超时配置 #
python
from couchbase.transactions.config import TransactionsConfig
from datetime import timedelta
config = TransactionsConfig(
timeout=timedelta(seconds=60),
durability_level='majority'
)
transactions = Transactions(cluster, config)
5.2 重试配置 #
python
config = TransactionsConfig(
timeout=timedelta(seconds=60),
num_attempts=10,
cleanup_window=timedelta(seconds=120)
)
5.3 持久化级别 #
python
from couchbase.transactions.config import DurabilityLevel
config = TransactionsConfig(
durability_level=DurabilityLevel.MAJORITY
)
持久化级别:
| 级别 | 说明 |
|---|---|
| NONE | 不等待持久化 |
| MAJORITY | 等待多数节点确认 |
| MAJORITY_AND_PERSIST_TO_ACTIVE | 多数节点确认并持久化到主节点 |
| PERSIST_TO_MAJORITY | 持久化到多数节点 |
六、事务错误处理 #
6.1 错误类型 #
python
from couchbase.transactions.error import (
TransactionFailed,
TransactionExpired,
TransactionCommitAmbiguous
)
try:
transactions.run(txn_logic)
except TransactionFailed as e:
print(f'事务失败: {e}')
except TransactionExpired as e:
print(f'事务超时: {e}')
except TransactionCommitAmbiguous as e:
print(f'事务提交状态不确定: {e}')
6.2 重试策略 #
python
import time
def run_with_retry(txn_logic, max_retries=3):
for attempt in range(max_retries):
try:
transactions.run(txn_logic)
return True
except TransactionFailed as e:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise
return False
6.3 幂等性设计 #
python
def transfer_money_idempotent(transaction_id, from_id, to_id, amount):
def txn_logic(ctx):
txn_record = ctx.get(collection, f'txn::{transaction_id}')
if txn_record:
return
from_doc = ctx.get(collection, from_id)
to_doc = ctx.get(collection, to_id)
ctx.replace(from_doc, {
**from_doc.content,
'balance': from_doc.content['balance'] - amount
})
ctx.replace(to_doc, {
**to_doc.content,
'balance': to_doc.content['balance'] + amount
})
ctx.insert(collection, f'txn::{transaction_id}', {
'type': 'transaction_record',
'from': from_id,
'to': to_id,
'amount': amount,
'created_at': NOW_STR()
})
transactions.run(txn_logic)
七、事务最佳实践 #
7.1 保持事务简短 #
python
def good_transaction():
def txn_logic(ctx):
doc1 = ctx.get(collection, 'doc1')
doc2 = ctx.get(collection, 'doc2')
ctx.replace(doc1, {'value': doc1.content['value'] + 1})
ctx.replace(doc2, {'value': doc2.content['value'] - 1})
transactions.run(txn_logic)
def bad_transaction():
def txn_logic(ctx):
doc1 = ctx.get(collection, 'doc1')
time.sleep(10)
doc2 = ctx.get(collection, 'doc2')
ctx.replace(doc1, {'value': doc1.content['value'] + 1})
ctx.replace(doc2, {'value': doc2.content['value'] - 1})
transactions.run(txn_logic)
7.2 避免热点文档 #
python
def distribute_hot_key(base_key, num_shards):
shard = hash(base_key) % num_shards
return f'{base_key}::shard::{shard}'
7.3 合理设置超时 #
python
config = TransactionsConfig(
timeout=timedelta(seconds=30)
)
def complex_transaction():
config = TransactionsConfig(
timeout=timedelta(seconds=120)
)
transactions = Transactions(cluster, config)
transactions.run(txn_logic)
八、事务监控 #
8.1 事务日志 #
python
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('couchbase.transactions')
def txn_logic(ctx):
logger.debug('开始事务')
doc = ctx.get(collection, 'doc1')
logger.debug(f'读取文档: {doc.content}')
ctx.replace(doc, {'value': doc.content['value'] + 1})
logger.debug('更新文档')
8.2 性能指标 #
python
import time
def measure_transaction(txn_logic):
start = time.time()
try:
transactions.run(txn_logic)
duration = time.time() - start
print(f'事务耗时: {duration:.3f}s')
except TransactionFailed as e:
duration = time.time() - start
print(f'事务失败,耗时: {duration:.3f}s,错误: {e}')
九、总结 #
事务要点:
| 操作 | 说明 |
|---|---|
| BEGIN | 开始事务 |
| COMMIT | 提交事务 |
| ROLLBACK | 回滚事务 |
| SAVEPOINT | 设置保存点 |
最佳实践:
- 保持事务简短
- 合理设置超时
- 设计幂等操作
- 处理好并发冲突
- 监控事务性能
下一步,让我们学习复制与故障转移!
最后更新:2026-03-27