DynamoDB批量操作 #

一、批量操作概述 #

1.1 批量操作类型 #

text
批量操作类型:
├── BatchGetItem
│   └── 批量读取多个项目
└── BatchWriteItem
    ├── 批量写入项目
    └── 批量删除项目

1.2 批量操作优势 #

text
优势:
├── 减少网络往返
├── 提高吞吐效率
├── 降低延迟
└── 简化代码逻辑

1.3 批量操作限制 #

操作 限制
BatchGetItem 最多100个项目,16MB响应
BatchWriteItem 最多25个操作,16MB请求

二、BatchGetItem #

2.1 基本用法 #

使用CLI:

bash
aws dynamodb batch-get-item \
  --request-items '{
    "Users": {
      "Keys": [
        {"UserId": {"S": "user1"}},
        {"UserId": {"S": "user2"}},
        {"UserId": {"S": "user3"}}
      ]
    }
  }'

使用JavaScript SDK:

javascript
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, BatchGetCommand } = require('@aws-sdk/lib-dynamodb');

const client = new DynamoDBClient({ region: 'us-east-1' });
const docClient = DynamoDBDocumentClient.from(client);

const response = await docClient.send(new BatchGetCommand({
  RequestItems: {
    Users: {
      Keys: [
        { UserId: 'user1' },
        { UserId: 'user2' },
        { UserId: 'user3' }
      ]
    }
  }
}));

console.log(response.Responses.Users);

使用Python SDK:

python
import boto3

dynamodb = boto3.resource('dynamodb')

response = dynamodb.batch_get_item(
    RequestItems={
        'Users': {
            'Keys': [
                {'UserId': 'user1'},
                {'UserId': 'user2'},
                {'UserId': 'user3'}
            ]
        }
    }
)

print(response['Responses']['Users'])

2.2 从多个表获取 #

javascript
const response = await docClient.send(new BatchGetCommand({
  RequestItems: {
    Users: {
      Keys: [
        { UserId: 'user1' },
        { UserId: 'user2' }
      ]
    },
    Products: {
      Keys: [
        { ProductId: 'prod1' },
        { ProductId: 'prod2' }
      ]
    }
  }
}));

console.log('Users:', response.Responses.Users);
console.log('Products:', response.Responses.Products);

2.3 指定投影属性 #

javascript
const response = await docClient.send(new BatchGetCommand({
  RequestItems: {
    Users: {
      Keys: [
        { UserId: 'user1' },
        { UserId: 'user2' }
      ],
      ProjectionExpression: 'UserId, Name, Email'
    }
  }
}));

2.4 一致性读取 #

javascript
const response = await docClient.send(new BatchGetCommand({
  RequestItems: {
    Users: {
      Keys: [
        { UserId: 'user1' },
        { UserId: 'user2' }
      ],
      ConsistentRead: true  // 强一致性读取
    }
  }
}));

2.5 处理未处理的项目 #

javascript
async function batchGetAll(requestItems) {
  let allResponses = {};
  let unprocessed = requestItems;
  
  while (Object.keys(unprocessed).length > 0) {
    const response = await docClient.send(new BatchGetCommand({
      RequestItems: unprocessed
    }));
    
    // 合并响应
    for (const tableName in response.Responses) {
      if (!allResponses[tableName]) {
        allResponses[tableName] = [];
      }
      allResponses[tableName].push(...response.Responses[tableName]);
    }
    
    // 处理未处理的项目
    unprocessed = response.UnprocessedKeys || {};
    
    if (Object.keys(unprocessed).length > 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
  
  return allResponses;
}

2.6 复合主键批量获取 #

javascript
const response = await docClient.send(new BatchGetCommand({
  RequestItems: {
    Orders: {
      Keys: [
        { UserId: 'user1', OrderId: 'order1' },
        { UserId: 'user1', OrderId: 'order2' },
        { UserId: 'user2', OrderId: 'order1' }
      ]
    }
  }
}));

三、BatchWriteItem #

3.1 基本写入 #

javascript
const { BatchWriteCommand } = require('@aws-sdk/lib-dynamodb');

await docClient.send(new BatchWriteCommand({
  RequestItems: {
    Users: [
      {
        PutRequest: {
          Item: {
            UserId: 'user1',
            Name: 'User One',
            Email: 'user1@example.com'
          }
        }
      },
      {
        PutRequest: {
          Item: {
            UserId: 'user2',
            Name: 'User Two',
            Email: 'user2@example.com'
          }
        }
      }
    ]
  }
}));

3.2 基本删除 #

javascript
await docClient.send(new BatchWriteCommand({
  RequestItems: {
    Users: [
      {
        DeleteRequest: {
          Key: { UserId: 'user1' }
        }
      },
      {
        DeleteRequest: {
          Key: { UserId: 'user2' }
        }
      }
    ]
  }
}));

3.3 混合写入和删除 #

javascript
await docClient.send(new BatchWriteCommand({
  RequestItems: {
    Users: [
      {
        PutRequest: {
          Item: { UserId: 'user1', Name: 'User One' }
        }
      },
      {
        PutRequest: {
          Item: { UserId: 'user2', Name: 'User Two' }
        }
      },
      {
        DeleteRequest: {
          Key: { UserId: 'old_user' }
        }
      }
    ]
  }
}));

3.4 多表批量写入 #

javascript
await docClient.send(new BatchWriteCommand({
  RequestItems: {
    Users: [
      {
        PutRequest: {
          Item: { UserId: 'user1', Name: 'User One' }
        }
      }
    ],
    Products: [
      {
        PutRequest: {
          Item: { ProductId: 'prod1', Name: 'Product One' }
        }
      }
    ]
  }
}));

3.5 处理未处理的项目 #

javascript
async function batchWriteAll(requestItems) {
  let unprocessed = requestItems;
  
  while (Object.keys(unprocessed).length > 0) {
    const response = await docClient.send(new BatchWriteCommand({
      RequestItems: unprocessed
    }));
    
    unprocessed = response.UnprocessedItems || {};
    
    if (Object.keys(unprocessed).length > 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
}

四、批量操作工具函数 #

4.1 批量获取工具 #

javascript
async function batchGetItems(tableName, keys, options = {}) {
  const { projectionExpression, consistentRead = false } = options;
  const batchSize = 100;
  const results = [];
  
  for (let i = 0; i < keys.length; i += batchSize) {
    const batch = keys.slice(i, i + batchSize);
    
    const requestItems = {
      [tableName]: {
        Keys: batch
      }
    };
    
    if (projectionExpression) {
      requestItems[tableName].ProjectionExpression = projectionExpression;
    }
    
    if (consistentRead) {
      requestItems[tableName].ConsistentRead = true;
    }
    
    let unprocessed = requestItems;
    
    while (Object.keys(unprocessed).length > 0) {
      const response = await docClient.send(new BatchGetCommand({
        RequestItems: unprocessed
      }));
      
      if (response.Responses && response.Responses[tableName]) {
        results.push(...response.Responses[tableName]);
      }
      
      unprocessed = response.UnprocessedKeys || {};
      
      if (Object.keys(unprocessed).length > 0) {
        await new Promise(resolve => setTimeout(resolve, 50));
      }
    }
  }
  
  return results;
}

// 使用示例
const users = await batchGetItems('Users', [
  { UserId: 'user1' },
  { UserId: 'user2' },
  { UserId: 'user3' }
], {
  projectionExpression: 'UserId, Name, Email'
});

4.2 批量写入工具 #

javascript
async function batchWriteItems(tableName, items) {
  const batchSize = 25;
  
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    
    const requestItems = {
      [tableName]: batch.map(item => ({
        PutRequest: { Item: item }
      }))
    };
    
    let unprocessed = requestItems;
    
    while (Object.keys(unprocessed).length > 0) {
      const response = await docClient.send(new BatchWriteCommand({
        RequestItems: unprocessed
      }));
      
      unprocessed = response.UnprocessedItems || {};
      
      if (Object.keys(unprocessed).length > 0) {
        await new Promise(resolve => setTimeout(resolve, 50));
      }
    }
  }
}

// 使用示例
await batchWriteItems('Users', [
  { UserId: 'user1', Name: 'User One' },
  { UserId: 'user2', Name: 'User Two' },
  { UserId: 'user3', Name: 'User Three' }
]);

4.3 批量删除工具 #

javascript
async function batchDeleteItems(tableName, keys) {
  const batchSize = 25;
  
  for (let i = 0; i < keys.length; i += batchSize) {
    const batch = keys.slice(i, i + batchSize);
    
    const requestItems = {
      [tableName]: batch.map(key => ({
        DeleteRequest: { Key: key }
      }))
    };
    
    let unprocessed = requestItems;
    
    while (Object.keys(unprocessed).length > 0) {
      const response = await docClient.send(new BatchWriteCommand({
        RequestItems: unprocessed
      }));
      
      unprocessed = response.UnprocessedItems || {};
      
      if (Object.keys(unprocessed).length > 0) {
        await new Promise(resolve => setTimeout(resolve, 50));
      }
    }
  }
}

// 使用示例
await batchDeleteItems('Users', [
  { UserId: 'user1' },
  { UserId: 'user2' },
  { UserId: 'user3' }
]);

五、批量操作限制处理 #

5.1 自动分批 #

javascript
async function batchWriteLargeDataset(tableName, items) {
  const batchSize = 25;
  let processed = 0;
  
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    
    await batchWriteItems(tableName, batch);
    processed += batch.length;
    
    console.log(`Progress: ${processed}/${items.length}`);
  }
}

5.2 并行批量操作 #

javascript
async function parallelBatchWrite(tableName, items, concurrency = 5) {
  const batchSize = 25;
  const batches = [];
  
  for (let i = 0; i < items.length; i += batchSize) {
    batches.push(items.slice(i, i + batchSize));
  }
  
  const results = [];
  
  for (let i = 0; i < batches.length; i += concurrency) {
    const concurrentBatches = batches.slice(i, i + concurrency);
    
    const batchResults = await Promise.all(
      concurrentBatches.map(batch => batchWriteItems(tableName, batch))
    );
    
    results.push(...batchResults);
  }
  
  return results;
}

5.3 大小限制处理 #

javascript
function estimateItemSize(item) {
  return JSON.stringify(item).length;
}

async function batchWriteWithSizeLimit(tableName, items) {
  const maxBatchSize = 16 * 1024 * 1024;  // 16MB
  const maxItems = 25;
  
  let currentBatch = [];
  let currentSize = 0;
  
  for (const item of items) {
    const itemSize = estimateItemSize(item);
    
    if (currentBatch.length >= maxItems || currentSize + itemSize > maxBatchSize) {
      await batchWriteItems(tableName, currentBatch);
      currentBatch = [];
      currentSize = 0;
    }
    
    currentBatch.push(item);
    currentSize += itemSize;
  }
  
  if (currentBatch.length > 0) {
    await batchWriteItems(tableName, currentBatch);
  }
}

六、性能优化 #

6.1 批量大小优化 #

text
优化建议:
├── BatchGetItem
│   ├── 推荐每批50-100个项目
│   └── 避免超过16MB响应限制
└── BatchWriteItem
    ├── 推荐每批25个项目
    └── 避免超过16MB请求限制

6.2 并行处理 #

javascript
async function parallelBatchGet(tablesAndKeys) {
  const promises = tablesAndKeys.map(({ tableName, keys }) =>
    batchGetItems(tableName, keys)
  );
  
  return Promise.all(promises);
}

6.3 错误处理 #

javascript
async function batchWriteWithErrorHandling(tableName, items) {
  const results = {
    success: [],
    failed: []
  };
  
  const batchSize = 25;
  
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    
    try {
      await batchWriteItems(tableName, batch);
      results.success.push(...batch);
    } catch (error) {
      console.error(`Batch failed: ${error.message}`);
      results.failed.push(...batch);
    }
  }
  
  return results;
}

七、实际应用示例 #

7.1 批量导入数据 #

javascript
async function importUsersFromCSV(csvData) {
  const users = parseCSV(csvData);
  
  const processedUsers = users.map(row => ({
    UserId: row.id,
    Name: row.name,
    Email: row.email,
    CreatedAt: new Date().toISOString()
  }));
  
  await batchWriteItems('Users', processedUsers);
  
  return { imported: processedUsers.length };
}

7.2 批量更新用户状态 #

javascript
async function batchUpdateUserStatus(userIds, newStatus) {
  const items = userIds.map(userId => ({
    UserId: userId,
    Status: newStatus,
    UpdatedAt: new Date().toISOString()
  }));
  
  await batchWriteItems('Users', items);
}

7.3 批量获取用户详情 #

javascript
async function getUsersWithOrders(userIds) {
  // 批量获取用户
  const users = await batchGetItems('Users', 
    userIds.map(id => ({ UserId: id }))
  );
  
  // 批量获取订单
  const orderKeys = [];
  for (const userId of userIds) {
    orderKeys.push({ UserId: userId, OrderId: 'LATEST' });
  }
  
  const orders = await batchGetItems('Orders', orderKeys);
  
  // 合并数据
  return users.map(user => ({
    ...user,
    LatestOrder: orders.find(o => o.UserId === user.UserId)
  }));
}

八、最佳实践 #

8.1 批量操作建议 #

text
最佳实践:
├── 合理设置批量大小
├── 处理未处理的项目
├── 实现重试机制
├── 监控批量操作性能
└── 避免热点分区

8.2 错误处理建议 #

text
错误处理:
├── 捕获并记录错误
├── 实现重试逻辑
├── 处理部分失败
└── 提供回滚机制

8.3 性能建议 #

text
性能优化:
├── 使用并行处理
├── 控制并发数量
├── 监控吞吐量
└── 调整批量大小

九、总结 #

批量操作要点:

操作 限制 用途
BatchGetItem 100项目/16MB 批量读取
BatchWriteItem 25操作/16MB 批量写入/删除

下一步,让我们学习Query查询操作!

最后更新:2026-03-27