Amazon DocumentDB 事务 #
一、事务概述 #
1.1 什么是事务 #
text
事务特性(ACID):
├── Atomicity(原子性)- 全部成功或全部失败
├── Consistency(一致性)- 数据保持一致状态
├── Isolation(隔离性)- 事务间互不干扰
└── Durability(持久性)- 提交后永久保存
1.2 DocumentDB事务支持 #
text
事务支持:
├── 版本要求:4.0及以上
├── 支持多文档事务
├── 支持多集合事务
├── 支持跨分片事务(有限制)
└── 支持可重试写入
1.3 事务限制 #
text
事务限制:
├── 单事务操作数:最多1000个
├── 单事务大小:最大16MB
├── 事务超时:默认60秒
├── 不支持跨数据库事务
└── 需要副本集配置
二、基本事务操作 #
2.1 启动会话 #
javascript
// 启动会话
const session = client.startSession();
try {
// 事务操作
} finally {
// 结束会话
session.endSession();
}
2.2 开始事务 #
javascript
// 开始事务
session.startTransaction();
try {
// 执行操作
await collection.insertOne({ ... }, { session });
await collection.updateOne({ ... }, { session });
// 提交事务
await session.commitTransaction();
} catch (error) {
// 回滚事务
await session.abortTransaction();
throw error;
}
2.3 完整事务示例 #
javascript
// 完整的事务处理
async function transferFunds(fromAccountId, toAccountId, amount) {
const session = client.startSession();
try {
session.startTransaction();
const accounts = client.db('bank').collection('accounts');
// 检查余额
const fromAccount = await accounts.findOne(
{ _id: fromAccountId },
{ session }
);
if (fromAccount.balance < amount) {
throw new Error('余额不足');
}
// 扣款
await accounts.updateOne(
{ _id: fromAccountId },
{ $inc: { balance: -amount } },
{ session }
);
// 入账
await accounts.updateOne(
{ _id: toAccountId },
{ $inc: { balance: amount } },
{ session }
);
// 记录交易
await client.db('bank').collection('transactions').insertOne(
{
from: fromAccountId,
to: toAccountId,
amount: amount,
createdAt: new Date()
},
{ session }
);
await session.commitTransaction();
console.log('转账成功');
} catch (error) {
await session.abortTransaction();
console.error('转账失败:', error.message);
throw error;
} finally {
session.endSession();
}
}
三、事务选项 #
3.1 读关注 #
javascript
// 设置读关注
session.startTransaction({
readConcern: { level: 'snapshot' }
});
// 读关注级别
// local - 读取最新数据
// majority - 读取大多数节点确认的数据
// snapshot - 读取事务开始时的快照
3.2 写关注 #
javascript
// 设置写关注
session.startTransaction({
writeConcern: { w: 'majority' }
});
// 写关注级别
// w: 1 - 主节点确认
// w: 'majority' - 大多数节点确认
// j: true - 写入日志
3.3 读偏好 #
javascript
// 设置读偏好
session.startTransaction({
readPreference: 'primary'
});
// 读偏好选项
// primary - 只从主节点读取
// primaryPreferred - 优先主节点
// secondary - 只从副本读取
// secondaryPreferred - 优先副本
// nearest - 延迟最低的节点
3.4 完整选项 #
javascript
// 完整的事务选项
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority', j: true },
readPreference: 'primary',
maxCommitTimeMS: 30000
});
四、事务与操作 #
4.1 CRUD操作 #
javascript
// 插入
await collection.insertOne(
{ name: "张三", age: 30 },
{ session }
);
// 更新
await collection.updateOne(
{ _id: ObjectId("...") },
{ $set: { status: "active" } },
{ session }
);
// 删除
await collection.deleteOne(
{ _id: ObjectId("...") },
{ session }
);
// 查找
const doc = await collection.findOne(
{ _id: ObjectId("...") },
{ session }
);
4.2 批量操作 #
javascript
// 批量插入
await collection.insertMany(
[
{ name: "用户1" },
{ name: "用户2" }
],
{ session }
);
// 批量更新
await collection.updateMany(
{ status: "pending" },
{ $set: { status: "active" } },
{ session }
);
4.3 聚合操作 #
javascript
// 事务中的聚合
const results = await collection.aggregate(
[
{ $match: { status: "active" } },
{ $group: { _id: "$category", count: { $sum: 1 } } }
],
{ session }
).toArray();
五、事务重试 #
5.1 可重试写入 #
javascript
// 启用可重试写入
const client = new MongoClient(uri, {
retryWrites: true,
w: 'majority'
});
5.2 手动重试 #
javascript
// 手动重试事务
async function withRetry(operation, maxRetries = 3) {
let retries = 0;
while (retries < maxRetries) {
const session = client.startSession();
try {
session.startTransaction();
const result = await operation(session);
await session.commitTransaction();
return result;
} catch (error) {
await session.abortTransaction();
if (error.hasErrorLabel('TransientTransactionError')) {
retries++;
console.log(`重试事务 (${retries}/${maxRetries})`);
continue;
}
throw error;
} finally {
session.endSession();
}
}
throw new Error('事务重试次数超过限制');
}
// 使用示例
await withRetry(async (session) => {
await db.orders.insertOne({ ... }, { session });
await db.inventory.updateOne({ ... }, { session });
});
5.3 提交重试 #
javascript
// 提交重试
async function commitWithRetry(session) {
while (true) {
try {
await session.commitTransaction();
console.log('事务提交成功');
break;
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('提交失败,重试中...');
continue;
}
throw error;
}
}
}
六、实际应用场景 #
6.1 订单处理 #
javascript
// 订单处理事务
async function processOrder(orderData) {
const session = client.startSession();
try {
session.startTransaction();
const orders = client.db('shop').collection('orders');
const products = client.db('shop').collection('products');
const inventory = client.db('shop').collection('inventory');
// 创建订单
const orderResult = await orders.insertOne(
{
...orderData,
status: 'pending',
createdAt: new Date()
},
{ session }
);
const orderId = orderResult.insertedId;
// 检查并扣减库存
for (const item of orderData.items) {
const inv = await inventory.findOne(
{ productId: item.productId },
{ session }
);
if (!inv || inv.quantity < item.quantity) {
throw new Error(`商品 ${item.productId} 库存不足`);
}
await inventory.updateOne(
{ productId: item.productId },
{ $inc: { quantity: -item.quantity } },
{ session }
);
}
// 更新订单状态
await orders.updateOne(
{ _id: orderId },
{ $set: { status: 'confirmed' } },
{ session }
);
await session.commitTransaction();
return orderId;
} catch (error) {
await session.abortTransaction();
throw error;
} finally {
session.endSession();
}
}
6.2 用户注册 #
javascript
// 用户注册事务
async function registerUser(userData) {
const session = client.startSession();
try {
session.startTransaction();
const users = client.db('app').collection('users');
const profiles = client.db('app').collection('profiles');
const settings = client.db('app').collection('settings');
// 创建用户
const userResult = await users.insertOne(
{
email: userData.email,
password: hashPassword(userData.password),
createdAt: new Date()
},
{ session }
);
const userId = userResult.insertedId;
// 创建用户资料
await profiles.insertOne(
{
userId: userId,
name: userData.name,
avatar: null
},
{ session }
);
// 创建默认设置
await settings.insertOne(
{
userId: userId,
theme: 'light',
language: 'zh-CN',
notifications: true
},
{ session }
);
await session.commitTransaction();
return userId;
} catch (error) {
await session.abortTransaction();
throw error;
} finally {
session.endSession();
}
}
6.3 数据迁移 #
javascript
// 数据迁移事务
async function migrateUserData(userId) {
const session = client.startSession();
try {
session.startTransaction();
const sourceDb = client.db('old_db');
const targetDb = client.db('new_db');
// 读取源数据
const user = await sourceDb.collection('users').findOne(
{ _id: userId },
{ session }
);
const orders = await sourceDb.collection('orders').find(
{ userId: userId },
{ session }
).toArray();
// 写入目标数据库
await targetDb.collection('users').insertOne(user, { session });
if (orders.length > 0) {
await targetDb.collection('orders').insertMany(orders, { session });
}
// 删除源数据
await sourceDb.collection('users').deleteOne({ _id: userId }, { session });
await sourceDb.collection('orders').deleteMany({ userId: userId }, { session });
await session.commitTransaction();
console.log('迁移成功');
} catch (error) {
await session.abortTransaction();
console.error('迁移失败:', error.message);
throw error;
} finally {
session.endSession();
}
}
七、事务监控 #
7.1 查看活跃事务 #
javascript
// 查看当前活跃事务
db.currentOp({ "transaction": { $exists: true } })
7.2 终止事务 #
javascript
// 终止长时间运行的事务
db.killOp(opId)
7.3 事务指标 #
text
监控指标:
├── transactions.totalStarted
├── transactions.totalCommitted
├── transactions.totalAborted
├── transactions.currentActive
└── transactions.totalDurationMillis
八、最佳实践 #
8.1 事务设计原则 #
text
设计原则:
├── 保持事务简短
├── 减少事务中的操作数
├── 避免在事务中执行耗时操作
├── 合理设置超时时间
└── 实现重试机制
8.2 性能优化 #
text
性能建议:
├── 使用适当的索引
├── 减少事务持续时间
├── 避免热点数据竞争
├── 批量操作代替循环
└── 监控事务性能
8.3 错误处理 #
text
错误处理:
├── 捕获并处理事务错误
├── 实现重试逻辑
├── 记录事务日志
├── 处理死锁情况
└── 提供用户友好提示
九、常见问题 #
9.1 事务超时 #
javascript
// 问题:事务执行时间过长
// 解决:优化事务逻辑,减少操作数
// 设置更长的超时时间
session.startTransaction({
maxTimeMS: 120000 // 2分钟
});
9.2 写冲突 #
javascript
// 问题:多个事务修改同一文档
// 解决:实现重试机制
try {
// 事务操作
} catch (error) {
if (error.code === 112) { // WriteConflict
// 重试事务
}
}
9.3 内存限制 #
text
问题:事务操作超过16MB限制
解决:
├── 分批处理数据
├── 减少单次操作数量
├── 使用多个小事务
└── 优化数据结构
十、总结 #
10.1 事务要点 #
| 要点 | 说明 |
|---|---|
| ACID | 原子性、一致性、隔离性、持久性 |
| 启动 | startSession + startTransaction |
| 提交 | commitTransaction |
| 回滚 | abortTransaction |
| 重试 | 处理TransientTransactionError |
10.2 最佳实践总结 #
text
事务最佳实践:
├── 保持事务简短
├── 使用适当的隔离级别
├── 实现重试机制
├── 处理所有错误情况
└── 监控事务性能
下一步,让我们学习集群架构!
最后更新:2026-03-27