DynamoDB事务 #

一、事务概述 #

1.1 什么是事务 #

DynamoDB事务提供ACID保证,支持跨多个项目的原子性操作。

text
ACID特性:
├── Atomicity(原子性)
│   └── 全部成功或全部失败
├── Consistency(一致性)
│   └── 数据保持一致状态
├── Isolation(隔离性)
│   └── 事务间相互隔离
└── Durability(持久性)
    └── 提交后永久保存

1.2 事务类型 #

操作 说明 最大项目数
TransactGetItems 批量读取 100个
TransactWriteItems 批量写入 100个

1.3 事务限制 #

text
事务限制:
├── 最多100个项目
├── 最多4MB请求大小
├── 不支持跨表事务(同一表内)
├── 不支持跨区域事务
└── 消耗更多容量单位

二、TransactGetItems #

2.1 基本用法 #

javascript
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, TransactGetCommand } = require('@aws-sdk/lib-dynamodb');

const client = new DynamoDBClient({ region: 'us-east-1' });
const docClient = DynamoDBDocumentClient.from(client);

const response = await docClient.send(new TransactGetCommand({
  TransactItems: [
    {
      Get: {
        TableName: 'Users',
        Key: { UserId: 'user1' }
      }
    },
    {
      Get: {
        TableName: 'Users',
        Key: { UserId: 'user2' }
      }
    }
  ]
}));

console.log(response.Responses);

2.2 从多个表获取 #

javascript
const response = await docClient.send(new TransactGetCommand({
  TransactItems: [
    {
      Get: {
        TableName: 'Users',
        Key: { UserId: 'user123' }
      }
    },
    {
      Get: {
        TableName: 'Products',
        Key: { ProductId: 'prod001' }
      }
    }
  ]
}));

const user = response.Responses[0].Item;
const product = response.Responses[1].Item;

2.3 事务读取特性 #

text
特性:
├── 原子性读取
├── 强一致性
├── 所有项目同时读取
└── 任一失败则全部失败

三、TransactWriteItems #

3.1 支持的操作 #

text
支持的操作:
├── Put - 写入项目
├── Update - 更新项目
├── Delete - 删除项目
└── ConditionCheck - 条件检查

3.2 基本写入 #

javascript
const { TransactWriteCommand } = require('@aws-sdk/lib-dynamodb');

await docClient.send(new TransactWriteCommand({
  TransactItems: [
    {
      Put: {
        TableName: 'Orders',
        Item: {
          OrderId: 'order001',
          CustomerId: 'customer001',
          Status: 'PENDING',
          TotalAmount: 100
        }
      }
    },
    {
      Update: {
        TableName: 'Customers',
        Key: { CustomerId: 'customer001' },
        UpdateExpression: 'SET OrderCount = OrderCount + :inc',
        ExpressionAttributeValues: {
          ':inc': 1
        }
      }
    }
  ]
}));

3.3 条件写入 #

javascript
await docClient.send(new TransactWriteCommand({
  TransactItems: [
    {
      Put: {
        TableName: 'Orders',
        Item: {
          OrderId: 'order001',
          CustomerId: 'customer001',
          Status: 'PENDING'
        },
        ConditionExpression: 'attribute_not_exists(OrderId)'
      }
    },
    {
      Update: {
        TableName: 'Products',
        Key: { ProductId: 'prod001' },
        UpdateExpression: 'SET Stock = Stock - :quantity',
        ConditionExpression: 'Stock >= :quantity',
        ExpressionAttributeValues: {
          ':quantity': 1
        }
      }
    }
  ]
}));

3.4 ConditionCheck #

javascript
await docClient.send(new TransactWriteCommand({
  TransactItems: [
    {
      ConditionCheck: {
        TableName: 'Users',
        Key: { UserId: 'user123' },
        ConditionExpression: 'IsActive = :active',
        ExpressionAttributeValues: {
          ':active': true
        }
      }
    },
    {
      Update: {
        TableName: 'Orders',
        Key: { OrderId: 'order001' },
        UpdateExpression: 'SET Status = :status',
        ExpressionAttributeValues: {
          ':status': 'CONFIRMED'
        }
      }
    }
  ]
}));

四、事务示例 #

4.1 银行转账 #

javascript
async function transferMoney(fromAccount, toAccount, amount) {
  try {
    await docClient.send(new TransactWriteCommand({
      TransactItems: [
        {
          Update: {
            TableName: 'Accounts',
            Key: { AccountId: fromAccount },
            UpdateExpression: 'SET Balance = Balance - :amount',
            ConditionExpression: 'Balance >= :amount',
            ExpressionAttributeValues: {
              ':amount': amount
            }
          }
        },
        {
          Update: {
            TableName: 'Accounts',
            Key: { AccountId: toAccount },
            UpdateExpression: 'SET Balance = Balance + :amount',
            ExpressionAttributeValues: {
              ':amount': amount
            }
          }
        },
        {
          Put: {
            TableName: 'Transactions',
            Item: {
              TransactionId: generateId(),
              FromAccount: fromAccount,
              ToAccount: toAccount,
              Amount: amount,
              Timestamp: new Date().toISOString()
            }
          }
        }
      ]
    }));
    
    return { success: true };
  } catch (error) {
    if (error.name === 'TransactionCanceledException') {
      return { success: false, error: 'Insufficient balance or account not found' };
    }
    throw error;
  }
}

4.2 订单创建 #

javascript
async function createOrder(orderData) {
  const { orderId, customerId, items } = orderData;
  
  try {
    const transactItems = [
      {
        Put: {
          TableName: 'Orders',
          Item: {
            OrderId: orderId,
            CustomerId: customerId,
            Status: 'PENDING',
            Items: items,
            CreatedAt: new Date().toISOString()
          },
          ConditionExpression: 'attribute_not_exists(OrderId)'
        }
      }
    ];
    
    for (const item of items) {
      transactItems.push({
        Update: {
          TableName: 'Products',
          Key: { ProductId: item.productId },
          UpdateExpression: 'SET Stock = Stock - :quantity',
          ConditionExpression: 'Stock >= :quantity',
          ExpressionAttributeValues: {
            ':quantity': item.quantity
          }
        }
      });
    }
    
    await docClient.send(new TransactWriteCommand({
      TransactItems: transactItems
    }));
    
    return { success: true, orderId };
  } catch (error) {
    if (error.name === 'TransactionCanceledException') {
      const reasons = error.CancellationReasons || [];
      const stockError = reasons.find((r, i) => i > 0 && r.Code === 'ConditionalCheckFailed');
      if (stockError) {
        return { success: false, error: 'Insufficient stock' };
      }
      return { success: false, error: 'Order already exists' };
    }
    throw error;
  }
}

4.3 库存预留 #

javascript
async function reserveInventory(orderId, items) {
  const transactItems = [
    {
      Put: {
        TableName: 'Reservations',
        Item: {
          ReservationId: orderId,
          Items: items,
          Status: 'RESERVED',
          ExpiresAt: Math.floor(Date.now() / 1000) + 1800,  // 30分钟
          CreatedAt: new Date().toISOString()
        },
        ConditionExpression: 'attribute_not_exists(ReservationId)'
      }
    }
  ];
  
  for (const item of items) {
    transactItems.push({
      Update: {
        TableName: 'Inventory',
        Key: { ProductId: item.productId },
        UpdateExpression: 'SET Available = Available - :quantity, Reserved = Reserved + :quantity',
        ConditionExpression: 'Available >= :quantity',
        ExpressionAttributeValues: {
          ':quantity': item.quantity
        }
      }
    });
  }
  
  await docClient.send(new TransactWriteCommand({
    TransactItems: transactItems
  }));
}

五、错误处理 #

5.1 TransactionCanceledException #

javascript
try {
  await docClient.send(new TransactWriteCommand({
    TransactItems: [/* ... */]
  }));
} catch (error) {
  if (error.name === 'TransactionCanceledException') {
    console.log('Transaction canceled');
    console.log('Cancellation reasons:', error.CancellationReasons);
    
    for (const reason of error.CancellationReasons) {
      if (reason.Code === 'ConditionalCheckFailed') {
        console.log('Condition check failed');
      } else if (reason.Code === 'TransactionConflict') {
        console.log('Transaction conflict');
      }
    }
  }
}

5.2 常见错误类型 #

text
错误类型:
├── ConditionalCheckFailed
│   └── 条件检查失败
├── TransactionConflict
│   └── 事务冲突
├── ProvisionedThroughputExceeded
│   └── 超过预置吞吐量
├── ResourceNotFound
│   └── 资源不存在
└── ValidationError
    └── 参数验证失败

5.3 重试策略 #

javascript
async function transactWithRetry(params, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await docClient.send(new TransactWriteCommand(params));
    } catch (error) {
      if (error.name === 'TransactionConflict' && i < maxRetries - 1) {
        await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 50));
        continue;
      }
      throw error;
    }
  }
}

六、事务成本 #

6.1 容量消耗 #

text
容量消耗:
├── TransactGetItems
│   └── 2x RCU(强一致性)
├── TransactWriteItems
│   └── 2x WCU
└── 示例:
    ├── 读取10个项目 = 20 RCU
    └── 写入10个项目 = 20 WCU

6.2 成本优化 #

text
优化建议:
├── 仅在必要时使用事务
├── 减少事务中的项目数量
├── 使用条件检查避免不必要的事务
└── 考虑使用单项目操作

七、事务限制 #

7.1 项目数量限制 #

text
限制:
├── TransactGetItems: 最多100个项目
├── TransactWriteItems: 最多100个项目
├── 请求大小: 最多4MB
└── 同一项目不能重复

7.2 并发限制 #

text
并发限制:
├── 同一项目同时只能有一个事务
├── 事务冲突会导致失败
└── 需要实现重试机制

八、最佳实践 #

8.1 使用场景 #

text
适用场景:
├── 需要原子性操作
├── 跨项目数据一致性
├── 金融交易
├── 库存管理
└── 订单处理

8.2 不适用场景 #

text
不适用场景:
├── 简单CRUD操作
├── 单项目操作
├── 大批量操作
├── 对延迟敏感
└── 高并发场景

8.3 设计建议 #

text
设计建议:
├── 保持事务简短
├── 减少事务中的项目数量
├── 实现重试机制
├── 处理所有可能的错误
└── 监控事务失败率

九、总结 #

事务要点:

特性 说明
ACID保证 原子性、一致性、隔离性、持久性
TransactGet 批量原子读取
TransactWrite 批量原子写入
项目限制 最多100个项目
成本 2倍容量消耗

下一步,让我们学习DynamoDB Streams!

最后更新:2026-03-27