流式处理 #

一、Neptune Streams概述 #

1.1 什么是Neptune Streams #

Neptune Streams是Neptune的变更数据捕获(CDC)功能,可以实时捕获图数据库中的数据变更。

text
Neptune Streams特点:
├── 实时变更捕获
├── 完整的变更记录
├── 有序的事件流
├── 支持Gremlin和SPARQL
└── 可与Lambda集成

1.2 变更事件类型 #

text
变更事件类型:
├── ADD_VERTEX:添加顶点
├── DELETE_VERTEX:删除顶点
├── UPDATE_VERTEX:更新顶点
├── ADD_EDGE:添加边
├── DELETE_EDGE:删除边
└── UPDATE_EDGE:更新边

二、启用Streams #

2.1 使用CLI启用 #

bash
aws neptune modify-db-cluster \
  --db-cluster-identifier my-neptune-cluster \
  --cloudwatch-logs-export-configuration '{"EnableLogTypes":["neptune-stream"]}'

2.2 使用参数组 #

bash
aws neptune modify-db-cluster-parameter-group \
  --db-cluster-parameter-group-name my-params \
  --parameters "ParameterName=neptune_streams,ParameterValue=true"

三、Streams API #

3.1 获取流记录 #

bash
# 获取最早的记录
curl -X GET "https://endpoint:8182/streams?limit=100"

# 从特定序列号开始
curl -X GET "https://endpoint:8182/streams?commitNum=1&opNum=1&limit=100"

3.2 响应格式 #

json
{
  "lastEventId": {
    "commitNum": 1,
    "opNum": 5
  },
  "records": [
    {
      "eventId": {
        "commitNum": 1,
        "opNum": 1
      },
      "data": {
        "type": "ADD_VERTEX",
        "id": "vertex_001",
        "label": "person",
        "properties": {
          "name": [{"value": "Tom"}]
        }
      }
    }
  ]
}

3.3 变更事件结构 #

Gremlin事件:

json
{
  "eventId": {"commitNum": 1, "opNum": 1},
  "data": {
    "type": "ADD_VERTEX",
    "id": "vertex_001",
    "label": "person",
    "properties": {
      "name": [{"value": "Tom"}],
      "age": [{"value": 30}]
    }
  }
}

SPARQL事件:

json
{
  "eventId": {"commitNum": 1, "opNum": 1},
  "data": {
    "stmtType": "ADD",
    "s": "http://example.org/Tom",
    "p": "http://xmlns.com/foaf/0.1/name",
    "o": "\"Tom\""
  }
}

四、Lambda集成 #

4.1 创建Lambda函数 #

python
import json
import urllib.request

def lambda_handler(event, context):
    # 获取Neptune Stream记录
    endpoint = "https://your-cluster-endpoint:8182"
    
    # 处理变更记录
    for record in event.get('records', []):
        event_type = record.get('data', {}).get('type')
        
        if event_type == 'ADD_VERTEX':
            handle_add_vertex(record['data'])
        elif event_type == 'DELETE_VERTEX':
            handle_delete_vertex(record['data'])
        elif event_type == 'ADD_EDGE':
            handle_add_edge(record['data'])
        # ... 其他事件类型
    
    return {'statusCode': 200}

def handle_add_vertex(data):
    vertex_id = data.get('id')
    label = data.get('label')
    properties = data.get('properties', {})
    
    # 处理新增顶点
    print(f"New vertex: {vertex_id}, label: {label}")
    
def handle_delete_vertex(data):
    vertex_id = data.get('id')
    
    # 处理删除顶点
    print(f"Deleted vertex: {vertex_id}")

4.2 配置Lambda触发器 #

bash
# 创建EventSourceMapping
aws lambda create-event-source-mapping \
  --function-name my-neptune-processor \
  --batch-size 100 \
  --starting-position LATEST \
  --event-source-arn arn:aws:kinesis:region:account:stream/my-stream

五、使用场景 #

5.1 数据同步 #

python
# 同步到其他数据存储
def sync_to_elasticsearch(record):
    if record['data']['type'] in ['ADD_VERTEX', 'UPDATE_VERTEX']:
        # 索引到Elasticsearch
        es.index(
            index='vertices',
            id=record['data']['id'],
            body={
                'label': record['data']['label'],
                'properties': record['data']['properties']
            }
        )
    elif record['data']['type'] == 'DELETE_VERTEX':
        # 从Elasticsearch删除
        es.delete(index='vertices', id=record['data']['id'])

5.2 实时通知 #

python
# 发送实时通知
def send_notification(record):
    if record['data']['type'] == 'ADD_EDGE':
        if record['data']['label'] == 'follows':
            # 发送关注通知
            from_user = record['data']['from']
            to_user = record['data']['to']
            send_email(to_user, f"You have a new follower: {from_user}")

5.3 审计日志 #

python
# 记录审计日志
def audit_log(record):
    log_entry = {
        'timestamp': datetime.now().isoformat(),
        'event_type': record['data']['type'],
        'resource_id': record['data'].get('id'),
        'details': record['data']
    }
    
    # 存储到审计日志表
    dynamodb.put_item(TableName='audit-logs', Item=log_entry)

5.4 缓存更新 #

python
# 更新缓存
def update_cache(record):
    if record['data']['type'] in ['ADD_VERTEX', 'UPDATE_VERTEX']:
        # 更新Redis缓存
        redis.hset(
            f"vertex:{record['data']['id']}",
            mapping=record['data']['properties']
        )
    elif record['data']['type'] == 'DELETE_VERTEX':
        # 删除缓存
        redis.delete(f"vertex:{record['data']['id']}")

六、Streams最佳实践 #

6.1 处理策略 #

text
处理策略:
├── 批量处理:批量处理多条记录
├── 错误重试:处理失败时重试
├── 幂等处理:支持重复处理
├── 顺序保证:按顺序处理
└── 监控告警:监控处理状态

6.2 性能优化 #

text
性能优化建议:
├── 合理设置批量大小
├── 使用异步处理
├── 控制并发数
├── 监控延迟
└── 处理积压

6.3 错误处理 #

python
def process_records(records):
    failed_records = []
    
    for record in records:
        try:
            process_record(record)
        except Exception as e:
            failed_records.append({
                'record': record,
                'error': str(e)
            })
    
    if failed_records:
        # 记录失败记录
        log_failed_records(failed_records)
        # 可以选择重试或发送到死信队列
    
    return len(failed_records)

七、监控与告警 #

7.1 CloudWatch指标 #

text
关键指标:
├── StreamLatency:流延迟
├── StreamRecordsProcessed:处理记录数
├── StreamErrors:错误数
└── StreamLag:积压量

7.2 告警配置 #

bash
# 创建告警
aws cloudwatch put-metric-alarm \
  --alarm-name neptune-stream-lag \
  --metric-name StreamLag \
  --namespace AWS/Neptune \
  --threshold 1000 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 3

八、总结 #

Neptune Streams要点:

项目 说明
功能 变更数据捕获
事件类型 ADD/DELETE/UPDATE
API Streams REST API
集成 Lambda, Kinesis
用途 同步、通知、审计

最佳实践:

  1. 合理设置批量大小
  2. 实现幂等处理
  3. 处理错误和重试
  4. 监控处理状态
  5. 控制处理延迟

下一步,让我们学习集群管理!

最后更新:2026-03-27