Amazon DocumentDB 事务 #

一、事务概述 #

1.1 什么是事务 #

text
事务特性(ACID):
├── Atomicity(原子性)- 全部成功或全部失败
├── Consistency(一致性)- 数据保持一致状态
├── Isolation(隔离性)- 事务间互不干扰
└── Durability(持久性)- 提交后永久保存

1.2 DocumentDB事务支持 #

text
事务支持:
├── 版本要求:4.0及以上
├── 支持多文档事务
├── 支持多集合事务
├── 支持跨分片事务(有限制)
└── 支持可重试写入

1.3 事务限制 #

text
事务限制:
├── 单事务操作数:最多1000个
├── 单事务大小:最大16MB
├── 事务超时:默认60秒
├── 不支持跨数据库事务
└── 需要副本集配置

二、基本事务操作 #

2.1 启动会话 #

javascript
// 启动会话
const session = client.startSession();

try {
  // 事务操作
} finally {
  // 结束会话
  session.endSession();
}

2.2 开始事务 #

javascript
// 开始事务
session.startTransaction();

try {
  // 执行操作
  await collection.insertOne({ ... }, { session });
  await collection.updateOne({ ... }, { session });
  
  // 提交事务
  await session.commitTransaction();
} catch (error) {
  // 回滚事务
  await session.abortTransaction();
  throw error;
}

2.3 完整事务示例 #

javascript
// 完整的事务处理
async function transferFunds(fromAccountId, toAccountId, amount) {
  const session = client.startSession();
  
  try {
    session.startTransaction();
    
    const accounts = client.db('bank').collection('accounts');
    
    // 检查余额
    const fromAccount = await accounts.findOne(
      { _id: fromAccountId },
      { session }
    );
    
    if (fromAccount.balance < amount) {
      throw new Error('余额不足');
    }
    
    // 扣款
    await accounts.updateOne(
      { _id: fromAccountId },
      { $inc: { balance: -amount } },
      { session }
    );
    
    // 入账
    await accounts.updateOne(
      { _id: toAccountId },
      { $inc: { balance: amount } },
      { session }
    );
    
    // 记录交易
    await client.db('bank').collection('transactions').insertOne(
      {
        from: fromAccountId,
        to: toAccountId,
        amount: amount,
        createdAt: new Date()
      },
      { session }
    );
    
    await session.commitTransaction();
    console.log('转账成功');
    
  } catch (error) {
    await session.abortTransaction();
    console.error('转账失败:', error.message);
    throw error;
  } finally {
    session.endSession();
  }
}

三、事务选项 #

3.1 读关注 #

javascript
// 设置读关注
session.startTransaction({
  readConcern: { level: 'snapshot' }
});

// 读关注级别
// local - 读取最新数据
// majority - 读取大多数节点确认的数据
// snapshot - 读取事务开始时的快照

3.2 写关注 #

javascript
// 设置写关注
session.startTransaction({
  writeConcern: { w: 'majority' }
});

// 写关注级别
// w: 1 - 主节点确认
// w: 'majority' - 大多数节点确认
// j: true - 写入日志

3.3 读偏好 #

javascript
// 设置读偏好
session.startTransaction({
  readPreference: 'primary'
});

// 读偏好选项
// primary - 只从主节点读取
// primaryPreferred - 优先主节点
// secondary - 只从副本读取
// secondaryPreferred - 优先副本
// nearest - 延迟最低的节点

3.4 完整选项 #

javascript
// 完整的事务选项
session.startTransaction({
  readConcern: { level: 'snapshot' },
  writeConcern: { w: 'majority', j: true },
  readPreference: 'primary',
  maxCommitTimeMS: 30000
});

四、事务与操作 #

4.1 CRUD操作 #

javascript
// 插入
await collection.insertOne(
  { name: "张三", age: 30 },
  { session }
);

// 更新
await collection.updateOne(
  { _id: ObjectId("...") },
  { $set: { status: "active" } },
  { session }
);

// 删除
await collection.deleteOne(
  { _id: ObjectId("...") },
  { session }
);

// 查找
const doc = await collection.findOne(
  { _id: ObjectId("...") },
  { session }
);

4.2 批量操作 #

javascript
// 批量插入
await collection.insertMany(
  [
    { name: "用户1" },
    { name: "用户2" }
  ],
  { session }
);

// 批量更新
await collection.updateMany(
  { status: "pending" },
  { $set: { status: "active" } },
  { session }
);

4.3 聚合操作 #

javascript
// 事务中的聚合
const results = await collection.aggregate(
  [
    { $match: { status: "active" } },
    { $group: { _id: "$category", count: { $sum: 1 } } }
  ],
  { session }
).toArray();

五、事务重试 #

5.1 可重试写入 #

javascript
// 启用可重试写入
const client = new MongoClient(uri, {
  retryWrites: true,
  w: 'majority'
});

5.2 手动重试 #

javascript
// 手动重试事务
async function withRetry(operation, maxRetries = 3) {
  let retries = 0;
  
  while (retries < maxRetries) {
    const session = client.startSession();
    
    try {
      session.startTransaction();
      const result = await operation(session);
      await session.commitTransaction();
      return result;
      
    } catch (error) {
      await session.abortTransaction();
      
      if (error.hasErrorLabel('TransientTransactionError')) {
        retries++;
        console.log(`重试事务 (${retries}/${maxRetries})`);
        continue;
      }
      
      throw error;
    } finally {
      session.endSession();
    }
  }
  
  throw new Error('事务重试次数超过限制');
}

// 使用示例
await withRetry(async (session) => {
  await db.orders.insertOne({ ... }, { session });
  await db.inventory.updateOne({ ... }, { session });
});

5.3 提交重试 #

javascript
// 提交重试
async function commitWithRetry(session) {
  while (true) {
    try {
      await session.commitTransaction();
      console.log('事务提交成功');
      break;
    } catch (error) {
      if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
        console.log('提交失败,重试中...');
        continue;
      }
      throw error;
    }
  }
}

六、实际应用场景 #

6.1 订单处理 #

javascript
// 订单处理事务
async function processOrder(orderData) {
  const session = client.startSession();
  
  try {
    session.startTransaction();
    
    const orders = client.db('shop').collection('orders');
    const products = client.db('shop').collection('products');
    const inventory = client.db('shop').collection('inventory');
    
    // 创建订单
    const orderResult = await orders.insertOne(
      {
        ...orderData,
        status: 'pending',
        createdAt: new Date()
      },
      { session }
    );
    const orderId = orderResult.insertedId;
    
    // 检查并扣减库存
    for (const item of orderData.items) {
      const inv = await inventory.findOne(
        { productId: item.productId },
        { session }
      );
      
      if (!inv || inv.quantity < item.quantity) {
        throw new Error(`商品 ${item.productId} 库存不足`);
      }
      
      await inventory.updateOne(
        { productId: item.productId },
        { $inc: { quantity: -item.quantity } },
        { session }
      );
    }
    
    // 更新订单状态
    await orders.updateOne(
      { _id: orderId },
      { $set: { status: 'confirmed' } },
      { session }
    );
    
    await session.commitTransaction();
    return orderId;
    
  } catch (error) {
    await session.abortTransaction();
    throw error;
  } finally {
    session.endSession();
  }
}

6.2 用户注册 #

javascript
// 用户注册事务
async function registerUser(userData) {
  const session = client.startSession();
  
  try {
    session.startTransaction();
    
    const users = client.db('app').collection('users');
    const profiles = client.db('app').collection('profiles');
    const settings = client.db('app').collection('settings');
    
    // 创建用户
    const userResult = await users.insertOne(
      {
        email: userData.email,
        password: hashPassword(userData.password),
        createdAt: new Date()
      },
      { session }
    );
    const userId = userResult.insertedId;
    
    // 创建用户资料
    await profiles.insertOne(
      {
        userId: userId,
        name: userData.name,
        avatar: null
      },
      { session }
    );
    
    // 创建默认设置
    await settings.insertOne(
      {
        userId: userId,
        theme: 'light',
        language: 'zh-CN',
        notifications: true
      },
      { session }
    );
    
    await session.commitTransaction();
    return userId;
    
  } catch (error) {
    await session.abortTransaction();
    throw error;
  } finally {
    session.endSession();
  }
}

6.3 数据迁移 #

javascript
// 数据迁移事务
async function migrateUserData(userId) {
  const session = client.startSession();
  
  try {
    session.startTransaction();
    
    const sourceDb = client.db('old_db');
    const targetDb = client.db('new_db');
    
    // 读取源数据
    const user = await sourceDb.collection('users').findOne(
      { _id: userId },
      { session }
    );
    
    const orders = await sourceDb.collection('orders').find(
      { userId: userId },
      { session }
    ).toArray();
    
    // 写入目标数据库
    await targetDb.collection('users').insertOne(user, { session });
    
    if (orders.length > 0) {
      await targetDb.collection('orders').insertMany(orders, { session });
    }
    
    // 删除源数据
    await sourceDb.collection('users').deleteOne({ _id: userId }, { session });
    await sourceDb.collection('orders').deleteMany({ userId: userId }, { session });
    
    await session.commitTransaction();
    console.log('迁移成功');
    
  } catch (error) {
    await session.abortTransaction();
    console.error('迁移失败:', error.message);
    throw error;
  } finally {
    session.endSession();
  }
}

七、事务监控 #

7.1 查看活跃事务 #

javascript
// 查看当前活跃事务
db.currentOp({ "transaction": { $exists: true } })

7.2 终止事务 #

javascript
// 终止长时间运行的事务
db.killOp(opId)

7.3 事务指标 #

text
监控指标:
├── transactions.totalStarted
├── transactions.totalCommitted
├── transactions.totalAborted
├── transactions.currentActive
└── transactions.totalDurationMillis

八、最佳实践 #

8.1 事务设计原则 #

text
设计原则:
├── 保持事务简短
├── 减少事务中的操作数
├── 避免在事务中执行耗时操作
├── 合理设置超时时间
└── 实现重试机制

8.2 性能优化 #

text
性能建议:
├── 使用适当的索引
├── 减少事务持续时间
├── 避免热点数据竞争
├── 批量操作代替循环
└── 监控事务性能

8.3 错误处理 #

text
错误处理:
├── 捕获并处理事务错误
├── 实现重试逻辑
├── 记录事务日志
├── 处理死锁情况
└── 提供用户友好提示

九、常见问题 #

9.1 事务超时 #

javascript
// 问题:事务执行时间过长
// 解决:优化事务逻辑,减少操作数

// 设置更长的超时时间
session.startTransaction({
  maxTimeMS: 120000  // 2分钟
});

9.2 写冲突 #

javascript
// 问题:多个事务修改同一文档
// 解决:实现重试机制

try {
  // 事务操作
} catch (error) {
  if (error.code === 112) {  // WriteConflict
    // 重试事务
  }
}

9.3 内存限制 #

text
问题:事务操作超过16MB限制
解决:
├── 分批处理数据
├── 减少单次操作数量
├── 使用多个小事务
└── 优化数据结构

十、总结 #

10.1 事务要点 #

要点 说明
ACID 原子性、一致性、隔离性、持久性
启动 startSession + startTransaction
提交 commitTransaction
回滚 abortTransaction
重试 处理TransientTransactionError

10.2 最佳实践总结 #

text
事务最佳实践:
├── 保持事务简短
├── 使用适当的隔离级别
├── 实现重试机制
├── 处理所有错误情况
└── 监控事务性能

下一步,让我们学习集群架构!

最后更新:2026-03-27