DynamoDB批量操作 #
一、批量操作概述 #
1.1 批量操作类型 #
text
批量操作类型:
├── BatchGetItem
│ └── 批量读取多个项目
└── BatchWriteItem
├── 批量写入项目
└── 批量删除项目
1.2 批量操作优势 #
text
优势:
├── 减少网络往返
├── 提高吞吐效率
├── 降低延迟
└── 简化代码逻辑
1.3 批量操作限制 #
| 操作 | 限制 |
|---|---|
| BatchGetItem | 最多100个项目,16MB响应 |
| BatchWriteItem | 最多25个操作,16MB请求 |
二、BatchGetItem #
2.1 基本用法 #
使用CLI:
bash
aws dynamodb batch-get-item \
--request-items '{
"Users": {
"Keys": [
{"UserId": {"S": "user1"}},
{"UserId": {"S": "user2"}},
{"UserId": {"S": "user3"}}
]
}
}'
使用JavaScript SDK:
javascript
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, BatchGetCommand } = require('@aws-sdk/lib-dynamodb');
const client = new DynamoDBClient({ region: 'us-east-1' });
const docClient = DynamoDBDocumentClient.from(client);
const response = await docClient.send(new BatchGetCommand({
RequestItems: {
Users: {
Keys: [
{ UserId: 'user1' },
{ UserId: 'user2' },
{ UserId: 'user3' }
]
}
}
}));
console.log(response.Responses.Users);
使用Python SDK:
python
import boto3
dynamodb = boto3.resource('dynamodb')
response = dynamodb.batch_get_item(
RequestItems={
'Users': {
'Keys': [
{'UserId': 'user1'},
{'UserId': 'user2'},
{'UserId': 'user3'}
]
}
}
)
print(response['Responses']['Users'])
2.2 从多个表获取 #
javascript
const response = await docClient.send(new BatchGetCommand({
RequestItems: {
Users: {
Keys: [
{ UserId: 'user1' },
{ UserId: 'user2' }
]
},
Products: {
Keys: [
{ ProductId: 'prod1' },
{ ProductId: 'prod2' }
]
}
}
}));
console.log('Users:', response.Responses.Users);
console.log('Products:', response.Responses.Products);
2.3 指定投影属性 #
javascript
const response = await docClient.send(new BatchGetCommand({
RequestItems: {
Users: {
Keys: [
{ UserId: 'user1' },
{ UserId: 'user2' }
],
ProjectionExpression: 'UserId, Name, Email'
}
}
}));
2.4 一致性读取 #
javascript
const response = await docClient.send(new BatchGetCommand({
RequestItems: {
Users: {
Keys: [
{ UserId: 'user1' },
{ UserId: 'user2' }
],
ConsistentRead: true // 强一致性读取
}
}
}));
2.5 处理未处理的项目 #
javascript
async function batchGetAll(requestItems) {
let allResponses = {};
let unprocessed = requestItems;
while (Object.keys(unprocessed).length > 0) {
const response = await docClient.send(new BatchGetCommand({
RequestItems: unprocessed
}));
// 合并响应
for (const tableName in response.Responses) {
if (!allResponses[tableName]) {
allResponses[tableName] = [];
}
allResponses[tableName].push(...response.Responses[tableName]);
}
// 处理未处理的项目
unprocessed = response.UnprocessedKeys || {};
if (Object.keys(unprocessed).length > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
return allResponses;
}
2.6 复合主键批量获取 #
javascript
const response = await docClient.send(new BatchGetCommand({
RequestItems: {
Orders: {
Keys: [
{ UserId: 'user1', OrderId: 'order1' },
{ UserId: 'user1', OrderId: 'order2' },
{ UserId: 'user2', OrderId: 'order1' }
]
}
}
}));
三、BatchWriteItem #
3.1 基本写入 #
javascript
const { BatchWriteCommand } = require('@aws-sdk/lib-dynamodb');
await docClient.send(new BatchWriteCommand({
RequestItems: {
Users: [
{
PutRequest: {
Item: {
UserId: 'user1',
Name: 'User One',
Email: 'user1@example.com'
}
}
},
{
PutRequest: {
Item: {
UserId: 'user2',
Name: 'User Two',
Email: 'user2@example.com'
}
}
}
]
}
}));
3.2 基本删除 #
javascript
await docClient.send(new BatchWriteCommand({
RequestItems: {
Users: [
{
DeleteRequest: {
Key: { UserId: 'user1' }
}
},
{
DeleteRequest: {
Key: { UserId: 'user2' }
}
}
]
}
}));
3.3 混合写入和删除 #
javascript
await docClient.send(new BatchWriteCommand({
RequestItems: {
Users: [
{
PutRequest: {
Item: { UserId: 'user1', Name: 'User One' }
}
},
{
PutRequest: {
Item: { UserId: 'user2', Name: 'User Two' }
}
},
{
DeleteRequest: {
Key: { UserId: 'old_user' }
}
}
]
}
}));
3.4 多表批量写入 #
javascript
await docClient.send(new BatchWriteCommand({
RequestItems: {
Users: [
{
PutRequest: {
Item: { UserId: 'user1', Name: 'User One' }
}
}
],
Products: [
{
PutRequest: {
Item: { ProductId: 'prod1', Name: 'Product One' }
}
}
]
}
}));
3.5 处理未处理的项目 #
javascript
async function batchWriteAll(requestItems) {
let unprocessed = requestItems;
while (Object.keys(unprocessed).length > 0) {
const response = await docClient.send(new BatchWriteCommand({
RequestItems: unprocessed
}));
unprocessed = response.UnprocessedItems || {};
if (Object.keys(unprocessed).length > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
}
四、批量操作工具函数 #
4.1 批量获取工具 #
javascript
async function batchGetItems(tableName, keys, options = {}) {
const { projectionExpression, consistentRead = false } = options;
const batchSize = 100;
const results = [];
for (let i = 0; i < keys.length; i += batchSize) {
const batch = keys.slice(i, i + batchSize);
const requestItems = {
[tableName]: {
Keys: batch
}
};
if (projectionExpression) {
requestItems[tableName].ProjectionExpression = projectionExpression;
}
if (consistentRead) {
requestItems[tableName].ConsistentRead = true;
}
let unprocessed = requestItems;
while (Object.keys(unprocessed).length > 0) {
const response = await docClient.send(new BatchGetCommand({
RequestItems: unprocessed
}));
if (response.Responses && response.Responses[tableName]) {
results.push(...response.Responses[tableName]);
}
unprocessed = response.UnprocessedKeys || {};
if (Object.keys(unprocessed).length > 0) {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
return results;
}
// 使用示例
const users = await batchGetItems('Users', [
{ UserId: 'user1' },
{ UserId: 'user2' },
{ UserId: 'user3' }
], {
projectionExpression: 'UserId, Name, Email'
});
4.2 批量写入工具 #
javascript
async function batchWriteItems(tableName, items) {
const batchSize = 25;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const requestItems = {
[tableName]: batch.map(item => ({
PutRequest: { Item: item }
}))
};
let unprocessed = requestItems;
while (Object.keys(unprocessed).length > 0) {
const response = await docClient.send(new BatchWriteCommand({
RequestItems: unprocessed
}));
unprocessed = response.UnprocessedItems || {};
if (Object.keys(unprocessed).length > 0) {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
}
// 使用示例
await batchWriteItems('Users', [
{ UserId: 'user1', Name: 'User One' },
{ UserId: 'user2', Name: 'User Two' },
{ UserId: 'user3', Name: 'User Three' }
]);
4.3 批量删除工具 #
javascript
async function batchDeleteItems(tableName, keys) {
const batchSize = 25;
for (let i = 0; i < keys.length; i += batchSize) {
const batch = keys.slice(i, i + batchSize);
const requestItems = {
[tableName]: batch.map(key => ({
DeleteRequest: { Key: key }
}))
};
let unprocessed = requestItems;
while (Object.keys(unprocessed).length > 0) {
const response = await docClient.send(new BatchWriteCommand({
RequestItems: unprocessed
}));
unprocessed = response.UnprocessedItems || {};
if (Object.keys(unprocessed).length > 0) {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
}
// 使用示例
await batchDeleteItems('Users', [
{ UserId: 'user1' },
{ UserId: 'user2' },
{ UserId: 'user3' }
]);
五、批量操作限制处理 #
5.1 自动分批 #
javascript
async function batchWriteLargeDataset(tableName, items) {
const batchSize = 25;
let processed = 0;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await batchWriteItems(tableName, batch);
processed += batch.length;
console.log(`Progress: ${processed}/${items.length}`);
}
}
5.2 并行批量操作 #
javascript
async function parallelBatchWrite(tableName, items, concurrency = 5) {
const batchSize = 25;
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const results = [];
for (let i = 0; i < batches.length; i += concurrency) {
const concurrentBatches = batches.slice(i, i + concurrency);
const batchResults = await Promise.all(
concurrentBatches.map(batch => batchWriteItems(tableName, batch))
);
results.push(...batchResults);
}
return results;
}
5.3 大小限制处理 #
javascript
function estimateItemSize(item) {
return JSON.stringify(item).length;
}
async function batchWriteWithSizeLimit(tableName, items) {
const maxBatchSize = 16 * 1024 * 1024; // 16MB
const maxItems = 25;
let currentBatch = [];
let currentSize = 0;
for (const item of items) {
const itemSize = estimateItemSize(item);
if (currentBatch.length >= maxItems || currentSize + itemSize > maxBatchSize) {
await batchWriteItems(tableName, currentBatch);
currentBatch = [];
currentSize = 0;
}
currentBatch.push(item);
currentSize += itemSize;
}
if (currentBatch.length > 0) {
await batchWriteItems(tableName, currentBatch);
}
}
六、性能优化 #
6.1 批量大小优化 #
text
优化建议:
├── BatchGetItem
│ ├── 推荐每批50-100个项目
│ └── 避免超过16MB响应限制
└── BatchWriteItem
├── 推荐每批25个项目
└── 避免超过16MB请求限制
6.2 并行处理 #
javascript
async function parallelBatchGet(tablesAndKeys) {
const promises = tablesAndKeys.map(({ tableName, keys }) =>
batchGetItems(tableName, keys)
);
return Promise.all(promises);
}
6.3 错误处理 #
javascript
async function batchWriteWithErrorHandling(tableName, items) {
const results = {
success: [],
failed: []
};
const batchSize = 25;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
try {
await batchWriteItems(tableName, batch);
results.success.push(...batch);
} catch (error) {
console.error(`Batch failed: ${error.message}`);
results.failed.push(...batch);
}
}
return results;
}
七、实际应用示例 #
7.1 批量导入数据 #
javascript
async function importUsersFromCSV(csvData) {
const users = parseCSV(csvData);
const processedUsers = users.map(row => ({
UserId: row.id,
Name: row.name,
Email: row.email,
CreatedAt: new Date().toISOString()
}));
await batchWriteItems('Users', processedUsers);
return { imported: processedUsers.length };
}
7.2 批量更新用户状态 #
javascript
async function batchUpdateUserStatus(userIds, newStatus) {
const items = userIds.map(userId => ({
UserId: userId,
Status: newStatus,
UpdatedAt: new Date().toISOString()
}));
await batchWriteItems('Users', items);
}
7.3 批量获取用户详情 #
javascript
async function getUsersWithOrders(userIds) {
// 批量获取用户
const users = await batchGetItems('Users',
userIds.map(id => ({ UserId: id }))
);
// 批量获取订单
const orderKeys = [];
for (const userId of userIds) {
orderKeys.push({ UserId: userId, OrderId: 'LATEST' });
}
const orders = await batchGetItems('Orders', orderKeys);
// 合并数据
return users.map(user => ({
...user,
LatestOrder: orders.find(o => o.UserId === user.UserId)
}));
}
八、最佳实践 #
8.1 批量操作建议 #
text
最佳实践:
├── 合理设置批量大小
├── 处理未处理的项目
├── 实现重试机制
├── 监控批量操作性能
└── 避免热点分区
8.2 错误处理建议 #
text
错误处理:
├── 捕获并记录错误
├── 实现重试逻辑
├── 处理部分失败
└── 提供回滚机制
8.3 性能建议 #
text
性能优化:
├── 使用并行处理
├── 控制并发数量
├── 监控吞吐量
└── 调整批量大小
九、总结 #
批量操作要点:
| 操作 | 限制 | 用途 |
|---|---|---|
| BatchGetItem | 100项目/16MB | 批量读取 |
| BatchWriteItem | 25操作/16MB | 批量写入/删除 |
下一步,让我们学习Query查询操作!
最后更新:2026-03-27