DynamoDB事务 #
一、事务概述 #
1.1 什么是事务 #
DynamoDB事务提供ACID保证,支持跨多个项目的原子性操作。
text
ACID特性:
├── Atomicity(原子性)
│ └── 全部成功或全部失败
├── Consistency(一致性)
│ └── 数据保持一致状态
├── Isolation(隔离性)
│ └── 事务间相互隔离
└── Durability(持久性)
└── 提交后永久保存
1.2 事务类型 #
| 操作 | 说明 | 最大项目数 |
|---|---|---|
| TransactGetItems | 批量读取 | 100个 |
| TransactWriteItems | 批量写入 | 100个 |
1.3 事务限制 #
text
事务限制:
├── 最多100个项目
├── 最多4MB请求大小
├── 不支持跨表事务(同一表内)
├── 不支持跨区域事务
└── 消耗更多容量单位
二、TransactGetItems #
2.1 基本用法 #
javascript
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, TransactGetCommand } = require('@aws-sdk/lib-dynamodb');
const client = new DynamoDBClient({ region: 'us-east-1' });
const docClient = DynamoDBDocumentClient.from(client);
const response = await docClient.send(new TransactGetCommand({
TransactItems: [
{
Get: {
TableName: 'Users',
Key: { UserId: 'user1' }
}
},
{
Get: {
TableName: 'Users',
Key: { UserId: 'user2' }
}
}
]
}));
console.log(response.Responses);
2.2 从多个表获取 #
javascript
const response = await docClient.send(new TransactGetCommand({
TransactItems: [
{
Get: {
TableName: 'Users',
Key: { UserId: 'user123' }
}
},
{
Get: {
TableName: 'Products',
Key: { ProductId: 'prod001' }
}
}
]
}));
const user = response.Responses[0].Item;
const product = response.Responses[1].Item;
2.3 事务读取特性 #
text
特性:
├── 原子性读取
├── 强一致性
├── 所有项目同时读取
└── 任一失败则全部失败
三、TransactWriteItems #
3.1 支持的操作 #
text
支持的操作:
├── Put - 写入项目
├── Update - 更新项目
├── Delete - 删除项目
└── ConditionCheck - 条件检查
3.2 基本写入 #
javascript
const { TransactWriteCommand } = require('@aws-sdk/lib-dynamodb');
await docClient.send(new TransactWriteCommand({
TransactItems: [
{
Put: {
TableName: 'Orders',
Item: {
OrderId: 'order001',
CustomerId: 'customer001',
Status: 'PENDING',
TotalAmount: 100
}
}
},
{
Update: {
TableName: 'Customers',
Key: { CustomerId: 'customer001' },
UpdateExpression: 'SET OrderCount = OrderCount + :inc',
ExpressionAttributeValues: {
':inc': 1
}
}
}
]
}));
3.3 条件写入 #
javascript
await docClient.send(new TransactWriteCommand({
TransactItems: [
{
Put: {
TableName: 'Orders',
Item: {
OrderId: 'order001',
CustomerId: 'customer001',
Status: 'PENDING'
},
ConditionExpression: 'attribute_not_exists(OrderId)'
}
},
{
Update: {
TableName: 'Products',
Key: { ProductId: 'prod001' },
UpdateExpression: 'SET Stock = Stock - :quantity',
ConditionExpression: 'Stock >= :quantity',
ExpressionAttributeValues: {
':quantity': 1
}
}
}
]
}));
3.4 ConditionCheck #
javascript
await docClient.send(new TransactWriteCommand({
TransactItems: [
{
ConditionCheck: {
TableName: 'Users',
Key: { UserId: 'user123' },
ConditionExpression: 'IsActive = :active',
ExpressionAttributeValues: {
':active': true
}
}
},
{
Update: {
TableName: 'Orders',
Key: { OrderId: 'order001' },
UpdateExpression: 'SET Status = :status',
ExpressionAttributeValues: {
':status': 'CONFIRMED'
}
}
}
]
}));
四、事务示例 #
4.1 银行转账 #
javascript
async function transferMoney(fromAccount, toAccount, amount) {
try {
await docClient.send(new TransactWriteCommand({
TransactItems: [
{
Update: {
TableName: 'Accounts',
Key: { AccountId: fromAccount },
UpdateExpression: 'SET Balance = Balance - :amount',
ConditionExpression: 'Balance >= :amount',
ExpressionAttributeValues: {
':amount': amount
}
}
},
{
Update: {
TableName: 'Accounts',
Key: { AccountId: toAccount },
UpdateExpression: 'SET Balance = Balance + :amount',
ExpressionAttributeValues: {
':amount': amount
}
}
},
{
Put: {
TableName: 'Transactions',
Item: {
TransactionId: generateId(),
FromAccount: fromAccount,
ToAccount: toAccount,
Amount: amount,
Timestamp: new Date().toISOString()
}
}
}
]
}));
return { success: true };
} catch (error) {
if (error.name === 'TransactionCanceledException') {
return { success: false, error: 'Insufficient balance or account not found' };
}
throw error;
}
}
4.2 订单创建 #
javascript
async function createOrder(orderData) {
const { orderId, customerId, items } = orderData;
try {
const transactItems = [
{
Put: {
TableName: 'Orders',
Item: {
OrderId: orderId,
CustomerId: customerId,
Status: 'PENDING',
Items: items,
CreatedAt: new Date().toISOString()
},
ConditionExpression: 'attribute_not_exists(OrderId)'
}
}
];
for (const item of items) {
transactItems.push({
Update: {
TableName: 'Products',
Key: { ProductId: item.productId },
UpdateExpression: 'SET Stock = Stock - :quantity',
ConditionExpression: 'Stock >= :quantity',
ExpressionAttributeValues: {
':quantity': item.quantity
}
}
});
}
await docClient.send(new TransactWriteCommand({
TransactItems: transactItems
}));
return { success: true, orderId };
} catch (error) {
if (error.name === 'TransactionCanceledException') {
const reasons = error.CancellationReasons || [];
const stockError = reasons.find((r, i) => i > 0 && r.Code === 'ConditionalCheckFailed');
if (stockError) {
return { success: false, error: 'Insufficient stock' };
}
return { success: false, error: 'Order already exists' };
}
throw error;
}
}
4.3 库存预留 #
javascript
async function reserveInventory(orderId, items) {
const transactItems = [
{
Put: {
TableName: 'Reservations',
Item: {
ReservationId: orderId,
Items: items,
Status: 'RESERVED',
ExpiresAt: Math.floor(Date.now() / 1000) + 1800, // 30分钟
CreatedAt: new Date().toISOString()
},
ConditionExpression: 'attribute_not_exists(ReservationId)'
}
}
];
for (const item of items) {
transactItems.push({
Update: {
TableName: 'Inventory',
Key: { ProductId: item.productId },
UpdateExpression: 'SET Available = Available - :quantity, Reserved = Reserved + :quantity',
ConditionExpression: 'Available >= :quantity',
ExpressionAttributeValues: {
':quantity': item.quantity
}
}
});
}
await docClient.send(new TransactWriteCommand({
TransactItems: transactItems
}));
}
五、错误处理 #
5.1 TransactionCanceledException #
javascript
try {
await docClient.send(new TransactWriteCommand({
TransactItems: [/* ... */]
}));
} catch (error) {
if (error.name === 'TransactionCanceledException') {
console.log('Transaction canceled');
console.log('Cancellation reasons:', error.CancellationReasons);
for (const reason of error.CancellationReasons) {
if (reason.Code === 'ConditionalCheckFailed') {
console.log('Condition check failed');
} else if (reason.Code === 'TransactionConflict') {
console.log('Transaction conflict');
}
}
}
}
5.2 常见错误类型 #
text
错误类型:
├── ConditionalCheckFailed
│ └── 条件检查失败
├── TransactionConflict
│ └── 事务冲突
├── ProvisionedThroughputExceeded
│ └── 超过预置吞吐量
├── ResourceNotFound
│ └── 资源不存在
└── ValidationError
└── 参数验证失败
5.3 重试策略 #
javascript
async function transactWithRetry(params, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await docClient.send(new TransactWriteCommand(params));
} catch (error) {
if (error.name === 'TransactionConflict' && i < maxRetries - 1) {
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 50));
continue;
}
throw error;
}
}
}
六、事务成本 #
6.1 容量消耗 #
text
容量消耗:
├── TransactGetItems
│ └── 2x RCU(强一致性)
├── TransactWriteItems
│ └── 2x WCU
└── 示例:
├── 读取10个项目 = 20 RCU
└── 写入10个项目 = 20 WCU
6.2 成本优化 #
text
优化建议:
├── 仅在必要时使用事务
├── 减少事务中的项目数量
├── 使用条件检查避免不必要的事务
└── 考虑使用单项目操作
七、事务限制 #
7.1 项目数量限制 #
text
限制:
├── TransactGetItems: 最多100个项目
├── TransactWriteItems: 最多100个项目
├── 请求大小: 最多4MB
└── 同一项目不能重复
7.2 并发限制 #
text
并发限制:
├── 同一项目同时只能有一个事务
├── 事务冲突会导致失败
└── 需要实现重试机制
八、最佳实践 #
8.1 使用场景 #
text
适用场景:
├── 需要原子性操作
├── 跨项目数据一致性
├── 金融交易
├── 库存管理
└── 订单处理
8.2 不适用场景 #
text
不适用场景:
├── 简单CRUD操作
├── 单项目操作
├── 大批量操作
├── 对延迟敏感
└── 高并发场景
8.3 设计建议 #
text
设计建议:
├── 保持事务简短
├── 减少事务中的项目数量
├── 实现重试机制
├── 处理所有可能的错误
└── 监控事务失败率
九、总结 #
事务要点:
| 特性 | 说明 |
|---|---|
| ACID保证 | 原子性、一致性、隔离性、持久性 |
| TransactGet | 批量原子读取 |
| TransactWrite | 批量原子写入 |
| 项目限制 | 最多100个项目 |
| 成本 | 2倍容量消耗 |
下一步,让我们学习DynamoDB Streams!
最后更新:2026-03-27