DynamoDB Streams #
一、Streams概述 #
1.1 什么是Streams #
DynamoDB Streams是一种数据变更捕获服务,记录表中项目的修改事件。
text
Streams特点:
├── 实时捕获数据变更
├── 有序的事件流
├── 24小时数据保留
├── 支持多种视图类型
└── 与Lambda深度集成
1.2 变更事件类型 #
| 事件类型 | 说明 |
|---|---|
| INSERT | 新项目插入 |
| MODIFY | 项目修改 |
| REMOVE | 项目删除 |
1.3 视图类型 #
| 视图类型 | 说明 | 存储成本 |
|---|---|---|
| KEYS_ONLY | 仅主键 | 最低 |
| NEW_IMAGE | 新值 | 中等 |
| OLD_IMAGE | 旧值 | 中等 |
| NEW_AND_OLD_IMAGES | 新旧值 | 最高 |
二、启用Streams #
2.1 创建表时启用 #
使用CLI:
bash
aws dynamodb create-table \
--table-name Events \
--attribute-definitions \
AttributeName=EventId,AttributeType=S \
--key-schema \
AttributeName=EventId,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--stream-specification \
StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
使用JavaScript SDK:
javascript
const { CreateTableCommand } = require('@aws-sdk/client-dynamodb');
const command = new CreateTableCommand({
TableName: 'Events',
AttributeDefinitions: [
{ AttributeName: 'EventId', AttributeType: 'S' }
],
KeySchema: [
{ AttributeName: 'EventId', KeyType: 'HASH' }
],
BillingMode: 'PAY_PER_REQUEST',
StreamSpecification: {
StreamEnabled: true,
StreamViewType: 'NEW_AND_OLD_IMAGES'
}
});
2.2 向现有表启用 #
bash
aws dynamodb update-table \
--table-name Events \
--stream-specification \
StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
javascript
const { UpdateTableCommand } = require('@aws-sdk/client-dynamodb');
const command = new UpdateTableCommand({
TableName: 'Events',
StreamSpecification: {
StreamEnabled: true,
StreamViewType: 'NEW_AND_OLD_IMAGES'
}
});
2.3 获取Stream ARN #
bash
aws dynamodb describe-table \
--table-name Events \
--query 'Table.LatestStreamArn'
三、Stream记录格式 #
3.1 记录结构 #
json
{
"Records": [
{
"eventID": "1",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1704067200,
"Keys": {
"EventId": { "S": "event001" }
},
"NewImage": {
"EventId": { "S": "event001" },
"Data": { "S": "sample data" }
},
"SequenceNumber": "100000000000000000001",
"SizeBytes": 50,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Events/stream/2024-01-01T00:00:00.000"
}
]
}
3.2 INSERT事件 #
json
{
"eventName": "INSERT",
"dynamodb": {
"Keys": {
"EventId": { "S": "event001" }
},
"NewImage": {
"EventId": { "S": "event001" },
"Data": { "S": "new data" }
}
}
}
3.3 MODIFY事件 #
json
{
"eventName": "MODIFY",
"dynamodb": {
"Keys": {
"EventId": { "S": "event001" }
},
"OldImage": {
"EventId": { "S": "event001" },
"Data": { "S": "old data" }
},
"NewImage": {
"EventId": { "S": "event001" },
"Data": { "S": "updated data" }
}
}
}
3.4 REMOVE事件 #
json
{
"eventName": "REMOVE",
"dynamodb": {
"Keys": {
"EventId": { "S": "event001" }
},
"OldImage": {
"EventId": { "S": "event001" },
"Data": { "S": "deleted data" }
}
}
}
四、Lambda集成 #
4.1 创建Lambda触发器 #
使用AWS控制台:
text
步骤:
1. 打开DynamoDB表
2. 选择"触发器"选项卡
3. 点击"创建触发器"
4. 选择Lambda函数
5. 配置批处理大小
6. 启用触发器
使用CLI:
bash
aws lambda create-event-source-mapping \
--function-name ProcessDynamoDBStream \
--batch-size 100 \
--starting-position LATEST \
--event-source-arn arn:aws:dynamodb:us-east-1:123456789012:table/Events/stream/2024-01-01T00:00:00.000
4.2 Lambda函数示例 #
javascript
exports.handler = async (event) => {
for (const record of event.Records) {
console.log('Stream record:', JSON.stringify(record, null, 2));
const eventName = record.eventName;
const oldImage = record.dynamodb.OldImage;
const newImage = record.dynamodb.NewImage;
switch (eventName) {
case 'INSERT':
console.log('New item inserted:', newImage);
await handleInsert(newImage);
break;
case 'MODIFY':
console.log('Item modified:', { oldImage, newImage });
await handleModify(oldImage, newImage);
break;
case 'REMOVE':
console.log('Item removed:', oldImage);
await handleRemove(oldImage);
break;
}
}
};
async function handleInsert(newImage) {
// 处理插入事件
}
async function handleModify(oldImage, newImage) {
// 处理修改事件
}
async function handleRemove(oldImage) {
// 处理删除事件
}
4.3 Python Lambda示例 #
python
import json
def lambda_handler(event, context):
for record in event['Records']:
event_name = record['eventName']
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
handle_insert(new_image)
elif event_name == 'MODIFY':
old_image = record['dynamodb'].get('OldImage')
new_image = record['dynamodb']['NewImage']
handle_modify(old_image, new_image)
elif event_name == 'REMOVE':
old_image = record['dynamodb']['OldImage']
handle_remove(old_image)
return {'statusCode': 200}
def handle_insert(new_image):
print(f"Insert: {new_image}")
def handle_modify(old_image, new_image):
print(f"Modify: {old_image} -> {new_image}")
def handle_remove(old_image):
print(f"Remove: {old_image}")
五、读取Stream #
5.1 使用Kinesis Client Library #
java
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class StreamProcessor {
public static void main(String[] args) {
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamArn,
appName,
kinesisClient,
dynamoDBClient,
cloudWatchClient,
workerId,
new RecordProcessorFactory()
);
Scheduler scheduler = new Scheduler(
configsBuilder.schedulerConfigsBuilder().build()
);
scheduler.run();
}
}
class RecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}
}
5.2 使用DynamoDB Streams Kinesis Adapter #
javascript
const { DynamoDBStreams } = require('@aws-sdk/client-dynamodb-streams');
const streamsClient = new DynamoDBStreams({ region: 'us-east-1' });
async function readStream(streamArn) {
let shardIterator = await streamsClient.describeStream({
StreamArn: streamArn
});
const shards = shardIterator.StreamDescription.Shards;
for (const shard of shards) {
const iterator = await streamsClient.getShardIterator({
StreamArn: streamArn,
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST'
});
await processRecords(iterator.ShardIterator);
}
}
async function processRecords(shardIterator) {
const response = await streamsClient.getRecords({
ShardIterator: shardIterator
});
for (const record of response.Records) {
console.log('Record:', record);
}
if (response.NextShardIterator) {
await processRecords(response.NextShardIterator);
}
}
六、使用场景 #
6.1 数据复制 #
javascript
// 复制数据到另一个表
exports.handler = async (event) => {
const items = [];
for (const record of event.Records) {
if (record.eventName === 'INSERT' || record.eventName === 'MODIFY') {
items.push({
PutRequest: {
Item: record.dynamodb.NewImage
}
});
} else if (record.eventName === 'REMOVE') {
items.push({
DeleteRequest: {
Key: record.dynamodb.Keys
}
});
}
}
if (items.length > 0) {
await docClient.send(new BatchWriteCommand({
RequestItems: {
ReplicaTable: items
}
}));
}
};
6.2 搜索索引同步 #
javascript
const { OpenSearchClient } = require('@opensearch-project/opensearch');
const opensearch = new OpenSearchClient({
node: 'https://your-opensearch-domain'
});
exports.handler = async (event) => {
for (const record of event.Records) {
const id = record.dynamodb.Keys.EventId.S;
if (record.eventName === 'REMOVE') {
await opensearch.delete({
index: 'events',
id: id
});
} else {
const document = unmarshall(record.dynamodb.NewImage);
await opensearch.index({
index: 'events',
id: id,
body: document
});
}
}
};
6.3 缓存失效 #
javascript
exports.handler = async (event) => {
for (const record of event.Records) {
const key = record.dynamodb.Keys.UserId.S;
// 使缓存失效
await cacheClient.del(`user:${key}`);
console.log(`Cache invalidated for key: ${key}`);
}
};
6.4 通知发送 #
javascript
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');
const sns = new SNSClient({ region: 'us-east-1' });
exports.handler = async (event) => {
for (const record of event.Records) {
if (record.eventName === 'INSERT') {
const newItem = unmarshall(record.dynamodb.NewImage);
await sns.send(new PublishCommand({
TopicArn: 'arn:aws:sns:us-east-1:123456789012:NewOrder',
Message: JSON.stringify({
orderId: newItem.OrderId,
customerId: newItem.CustomerId,
totalAmount: newItem.TotalAmount
})
}));
}
}
};
七、配置选项 #
7.1 批处理大小 #
text
批处理大小:
├── 最小:1
├── 最大:1000
├── 默认:100
└── 建议:根据处理时间调整
7.2 起始位置 #
| 位置 | 说明 |
|---|---|
| LATEST | 从最新记录开始 |
| TRIM_HORIZON | 从最早记录开始 |
| AT_TIMESTAMP | 从指定时间开始 |
7.3 并行度 #
text
并行度:
├── 默认:基于分区数
├── 可配置并行Lambda函数数
└── 影响处理吞吐量
八、错误处理 #
8.1 批处理失败 #
javascript
exports.handler = async (event) => {
const failedItems = [];
for (const record of event.Records) {
try {
await processRecord(record);
} catch (error) {
console.error('Failed to process record:', error);
failedItems.push(record);
}
}
if (failedItems.length > 0) {
// 返回失败的项目
return {
batchItemFailures: failedItems.map(record => ({
itemIdentifier: record.dynamodb.SequenceNumber
}))
};
}
};
8.2 重试策略 #
text
Lambda重试:
├── 自动重试失败批次
├── 重试次数可配置
├── 失败记录进入DLQ
└── 监控失败率
九、最佳实践 #
9.1 视图类型选择 #
text
选择建议:
├── KEYS_ONLY
│ └── 仅需要知道变更的项目
├── NEW_IMAGE
│ └── 需要新值(如搜索索引)
├── OLD_IMAGE
│ └── 需要旧值(如审计日志)
└── NEW_AND_OLD_IMAGES
└── 需要比较新旧值
9.2 性能优化 #
text
优化建议:
├── 合理设置批处理大小
├── 控制Lambda执行时间
├── 使用异步处理
├── 避免阻塞操作
└── 监控处理延迟
十、总结 #
Streams要点:
| 特性 | 说明 |
|---|---|
| 变更捕获 | INSERT/MODIFY/REMOVE |
| 视图类型 | KEYS_ONLY/NEW_IMAGE/OLD_IMAGE/NEW_AND_OLD_IMAGES |
| 数据保留 | 24小时 |
| Lambda集成 | 自动触发处理 |
下一步,让我们学习TTL!
最后更新:2026-03-27