DynamoDB Streams #

一、Streams概述 #

1.1 什么是Streams #

DynamoDB Streams是一种数据变更捕获服务,记录表中项目的修改事件。

text
Streams特点:
├── 实时捕获数据变更
├── 有序的事件流
├── 24小时数据保留
├── 支持多种视图类型
└── 与Lambda深度集成

1.2 变更事件类型 #

事件类型 说明
INSERT 新项目插入
MODIFY 项目修改
REMOVE 项目删除

1.3 视图类型 #

视图类型 说明 存储成本
KEYS_ONLY 仅主键 最低
NEW_IMAGE 新值 中等
OLD_IMAGE 旧值 中等
NEW_AND_OLD_IMAGES 新旧值 最高

二、启用Streams #

2.1 创建表时启用 #

使用CLI:

bash
aws dynamodb create-table \
  --table-name Events \
  --attribute-definitions \
    AttributeName=EventId,AttributeType=S \
  --key-schema \
    AttributeName=EventId,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST \
  --stream-specification \
    StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

使用JavaScript SDK:

javascript
const { CreateTableCommand } = require('@aws-sdk/client-dynamodb');

const command = new CreateTableCommand({
  TableName: 'Events',
  AttributeDefinitions: [
    { AttributeName: 'EventId', AttributeType: 'S' }
  ],
  KeySchema: [
    { AttributeName: 'EventId', KeyType: 'HASH' }
  ],
  BillingMode: 'PAY_PER_REQUEST',
  StreamSpecification: {
    StreamEnabled: true,
    StreamViewType: 'NEW_AND_OLD_IMAGES'
  }
});

2.2 向现有表启用 #

bash
aws dynamodb update-table \
  --table-name Events \
  --stream-specification \
    StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
javascript
const { UpdateTableCommand } = require('@aws-sdk/client-dynamodb');

const command = new UpdateTableCommand({
  TableName: 'Events',
  StreamSpecification: {
    StreamEnabled: true,
    StreamViewType: 'NEW_AND_OLD_IMAGES'
  }
});

2.3 获取Stream ARN #

bash
aws dynamodb describe-table \
  --table-name Events \
  --query 'Table.LatestStreamArn'

三、Stream记录格式 #

3.1 记录结构 #

json
{
  "Records": [
    {
      "eventID": "1",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1704067200,
        "Keys": {
          "EventId": { "S": "event001" }
        },
        "NewImage": {
          "EventId": { "S": "event001" },
          "Data": { "S": "sample data" }
        },
        "SequenceNumber": "100000000000000000001",
        "SizeBytes": 50,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Events/stream/2024-01-01T00:00:00.000"
    }
  ]
}

3.2 INSERT事件 #

json
{
  "eventName": "INSERT",
  "dynamodb": {
    "Keys": {
      "EventId": { "S": "event001" }
    },
    "NewImage": {
      "EventId": { "S": "event001" },
      "Data": { "S": "new data" }
    }
  }
}

3.3 MODIFY事件 #

json
{
  "eventName": "MODIFY",
  "dynamodb": {
    "Keys": {
      "EventId": { "S": "event001" }
    },
    "OldImage": {
      "EventId": { "S": "event001" },
      "Data": { "S": "old data" }
    },
    "NewImage": {
      "EventId": { "S": "event001" },
      "Data": { "S": "updated data" }
    }
  }
}

3.4 REMOVE事件 #

json
{
  "eventName": "REMOVE",
  "dynamodb": {
    "Keys": {
      "EventId": { "S": "event001" }
    },
    "OldImage": {
      "EventId": { "S": "event001" },
      "Data": { "S": "deleted data" }
    }
  }
}

四、Lambda集成 #

4.1 创建Lambda触发器 #

使用AWS控制台:

text
步骤:
1. 打开DynamoDB表
2. 选择"触发器"选项卡
3. 点击"创建触发器"
4. 选择Lambda函数
5. 配置批处理大小
6. 启用触发器

使用CLI:

bash
aws lambda create-event-source-mapping \
  --function-name ProcessDynamoDBStream \
  --batch-size 100 \
  --starting-position LATEST \
  --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:table/Events/stream/2024-01-01T00:00:00.000

4.2 Lambda函数示例 #

javascript
exports.handler = async (event) => {
  for (const record of event.Records) {
    console.log('Stream record:', JSON.stringify(record, null, 2));
    
    const eventName = record.eventName;
    const oldImage = record.dynamodb.OldImage;
    const newImage = record.dynamodb.NewImage;
    
    switch (eventName) {
      case 'INSERT':
        console.log('New item inserted:', newImage);
        await handleInsert(newImage);
        break;
      case 'MODIFY':
        console.log('Item modified:', { oldImage, newImage });
        await handleModify(oldImage, newImage);
        break;
      case 'REMOVE':
        console.log('Item removed:', oldImage);
        await handleRemove(oldImage);
        break;
    }
  }
};

async function handleInsert(newImage) {
  // 处理插入事件
}

async function handleModify(oldImage, newImage) {
  // 处理修改事件
}

async function handleRemove(oldImage) {
  // 处理删除事件
}

4.3 Python Lambda示例 #

python
import json

def lambda_handler(event, context):
    for record in event['Records']:
        event_name = record['eventName']
        
        if event_name == 'INSERT':
            new_image = record['dynamodb']['NewImage']
            handle_insert(new_image)
        elif event_name == 'MODIFY':
            old_image = record['dynamodb'].get('OldImage')
            new_image = record['dynamodb']['NewImage']
            handle_modify(old_image, new_image)
        elif event_name == 'REMOVE':
            old_image = record['dynamodb']['OldImage']
            handle_remove(old_image)
    
    return {'statusCode': 200}

def handle_insert(new_image):
    print(f"Insert: {new_image}")

def handle_modify(old_image, new_image):
    print(f"Modify: {old_image} -> {new_image}")

def handle_remove(old_image):
    print(f"Remove: {old_image}")

五、读取Stream #

5.1 使用Kinesis Client Library #

java
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamProcessor {
    public static void main(String[] args) {
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamArn,
            appName,
            kinesisClient,
            dynamoDBClient,
            cloudWatchClient,
            workerId,
            new RecordProcessorFactory()
        );
        
        Scheduler scheduler = new Scheduler(
            configsBuilder.schedulerConfigsBuilder().build()
        );
        
        scheduler.run();
    }
}

class RecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new RecordProcessor();
    }
}

5.2 使用DynamoDB Streams Kinesis Adapter #

javascript
const { DynamoDBStreams } = require('@aws-sdk/client-dynamodb-streams');

const streamsClient = new DynamoDBStreams({ region: 'us-east-1' });

async function readStream(streamArn) {
  let shardIterator = await streamsClient.describeStream({
    StreamArn: streamArn
  });
  
  const shards = shardIterator.StreamDescription.Shards;
  
  for (const shard of shards) {
    const iterator = await streamsClient.getShardIterator({
      StreamArn: streamArn,
      ShardId: shard.ShardId,
      ShardIteratorType: 'LATEST'
    });
    
    await processRecords(iterator.ShardIterator);
  }
}

async function processRecords(shardIterator) {
  const response = await streamsClient.getRecords({
    ShardIterator: shardIterator
  });
  
  for (const record of response.Records) {
    console.log('Record:', record);
  }
  
  if (response.NextShardIterator) {
    await processRecords(response.NextShardIterator);
  }
}

六、使用场景 #

6.1 数据复制 #

javascript
// 复制数据到另一个表
exports.handler = async (event) => {
  const items = [];
  
  for (const record of event.Records) {
    if (record.eventName === 'INSERT' || record.eventName === 'MODIFY') {
      items.push({
        PutRequest: {
          Item: record.dynamodb.NewImage
        }
      });
    } else if (record.eventName === 'REMOVE') {
      items.push({
        DeleteRequest: {
          Key: record.dynamodb.Keys
        }
      });
    }
  }
  
  if (items.length > 0) {
    await docClient.send(new BatchWriteCommand({
      RequestItems: {
        ReplicaTable: items
      }
    }));
  }
};

6.2 搜索索引同步 #

javascript
const { OpenSearchClient } = require('@opensearch-project/opensearch');

const opensearch = new OpenSearchClient({
  node: 'https://your-opensearch-domain'
});

exports.handler = async (event) => {
  for (const record of event.Records) {
    const id = record.dynamodb.Keys.EventId.S;
    
    if (record.eventName === 'REMOVE') {
      await opensearch.delete({
        index: 'events',
        id: id
      });
    } else {
      const document = unmarshall(record.dynamodb.NewImage);
      await opensearch.index({
        index: 'events',
        id: id,
        body: document
      });
    }
  }
};

6.3 缓存失效 #

javascript
exports.handler = async (event) => {
  for (const record of event.Records) {
    const key = record.dynamodb.Keys.UserId.S;
    
    // 使缓存失效
    await cacheClient.del(`user:${key}`);
    
    console.log(`Cache invalidated for key: ${key}`);
  }
};

6.4 通知发送 #

javascript
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

const sns = new SNSClient({ region: 'us-east-1' });

exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName === 'INSERT') {
      const newItem = unmarshall(record.dynamodb.NewImage);
      
      await sns.send(new PublishCommand({
        TopicArn: 'arn:aws:sns:us-east-1:123456789012:NewOrder',
        Message: JSON.stringify({
          orderId: newItem.OrderId,
          customerId: newItem.CustomerId,
          totalAmount: newItem.TotalAmount
        })
      }));
    }
  }
};

七、配置选项 #

7.1 批处理大小 #

text
批处理大小:
├── 最小:1
├── 最大:1000
├── 默认:100
└── 建议:根据处理时间调整

7.2 起始位置 #

位置 说明
LATEST 从最新记录开始
TRIM_HORIZON 从最早记录开始
AT_TIMESTAMP 从指定时间开始

7.3 并行度 #

text
并行度:
├── 默认:基于分区数
├── 可配置并行Lambda函数数
└── 影响处理吞吐量

八、错误处理 #

8.1 批处理失败 #

javascript
exports.handler = async (event) => {
  const failedItems = [];
  
  for (const record of event.Records) {
    try {
      await processRecord(record);
    } catch (error) {
      console.error('Failed to process record:', error);
      failedItems.push(record);
    }
  }
  
  if (failedItems.length > 0) {
    // 返回失败的项目
    return {
      batchItemFailures: failedItems.map(record => ({
        itemIdentifier: record.dynamodb.SequenceNumber
      }))
    };
  }
};

8.2 重试策略 #

text
Lambda重试:
├── 自动重试失败批次
├── 重试次数可配置
├── 失败记录进入DLQ
└── 监控失败率

九、最佳实践 #

9.1 视图类型选择 #

text
选择建议:
├── KEYS_ONLY
│   └── 仅需要知道变更的项目
├── NEW_IMAGE
│   └── 需要新值(如搜索索引)
├── OLD_IMAGE
│   └── 需要旧值(如审计日志)
└── NEW_AND_OLD_IMAGES
    └── 需要比较新旧值

9.2 性能优化 #

text
优化建议:
├── 合理设置批处理大小
├── 控制Lambda执行时间
├── 使用异步处理
├── 避免阻塞操作
└── 监控处理延迟

十、总结 #

Streams要点:

特性 说明
变更捕获 INSERT/MODIFY/REMOVE
视图类型 KEYS_ONLY/NEW_IMAGE/OLD_IMAGE/NEW_AND_OLD_IMAGES
数据保留 24小时
Lambda集成 自动触发处理

下一步,让我们学习TTL!

最后更新:2026-03-27