流式处理 #
一、Neptune Streams概述 #
1.1 什么是Neptune Streams #
Neptune Streams是Neptune的变更数据捕获(CDC)功能,可以实时捕获图数据库中的数据变更。
text
Neptune Streams特点:
├── 实时变更捕获
├── 完整的变更记录
├── 有序的事件流
├── 支持Gremlin和SPARQL
└── 可与Lambda集成
1.2 变更事件类型 #
text
变更事件类型:
├── ADD_VERTEX:添加顶点
├── DELETE_VERTEX:删除顶点
├── UPDATE_VERTEX:更新顶点
├── ADD_EDGE:添加边
├── DELETE_EDGE:删除边
└── UPDATE_EDGE:更新边
二、启用Streams #
2.1 使用CLI启用 #
bash
aws neptune modify-db-cluster \
--db-cluster-identifier my-neptune-cluster \
--cloudwatch-logs-export-configuration '{"EnableLogTypes":["neptune-stream"]}'
2.2 使用参数组 #
bash
aws neptune modify-db-cluster-parameter-group \
--db-cluster-parameter-group-name my-params \
--parameters "ParameterName=neptune_streams,ParameterValue=true"
三、Streams API #
3.1 获取流记录 #
bash
# 获取最早的记录
curl -X GET "https://endpoint:8182/streams?limit=100"
# 从特定序列号开始
curl -X GET "https://endpoint:8182/streams?commitNum=1&opNum=1&limit=100"
3.2 响应格式 #
json
{
"lastEventId": {
"commitNum": 1,
"opNum": 5
},
"records": [
{
"eventId": {
"commitNum": 1,
"opNum": 1
},
"data": {
"type": "ADD_VERTEX",
"id": "vertex_001",
"label": "person",
"properties": {
"name": [{"value": "Tom"}]
}
}
}
]
}
3.3 变更事件结构 #
Gremlin事件:
json
{
"eventId": {"commitNum": 1, "opNum": 1},
"data": {
"type": "ADD_VERTEX",
"id": "vertex_001",
"label": "person",
"properties": {
"name": [{"value": "Tom"}],
"age": [{"value": 30}]
}
}
}
SPARQL事件:
json
{
"eventId": {"commitNum": 1, "opNum": 1},
"data": {
"stmtType": "ADD",
"s": "http://example.org/Tom",
"p": "http://xmlns.com/foaf/0.1/name",
"o": "\"Tom\""
}
}
四、Lambda集成 #
4.1 创建Lambda函数 #
python
import json
import urllib.request
def lambda_handler(event, context):
# 获取Neptune Stream记录
endpoint = "https://your-cluster-endpoint:8182"
# 处理变更记录
for record in event.get('records', []):
event_type = record.get('data', {}).get('type')
if event_type == 'ADD_VERTEX':
handle_add_vertex(record['data'])
elif event_type == 'DELETE_VERTEX':
handle_delete_vertex(record['data'])
elif event_type == 'ADD_EDGE':
handle_add_edge(record['data'])
# ... 其他事件类型
return {'statusCode': 200}
def handle_add_vertex(data):
vertex_id = data.get('id')
label = data.get('label')
properties = data.get('properties', {})
# 处理新增顶点
print(f"New vertex: {vertex_id}, label: {label}")
def handle_delete_vertex(data):
vertex_id = data.get('id')
# 处理删除顶点
print(f"Deleted vertex: {vertex_id}")
4.2 配置Lambda触发器 #
bash
# 创建EventSourceMapping
aws lambda create-event-source-mapping \
--function-name my-neptune-processor \
--batch-size 100 \
--starting-position LATEST \
--event-source-arn arn:aws:kinesis:region:account:stream/my-stream
五、使用场景 #
5.1 数据同步 #
python
# 同步到其他数据存储
def sync_to_elasticsearch(record):
if record['data']['type'] in ['ADD_VERTEX', 'UPDATE_VERTEX']:
# 索引到Elasticsearch
es.index(
index='vertices',
id=record['data']['id'],
body={
'label': record['data']['label'],
'properties': record['data']['properties']
}
)
elif record['data']['type'] == 'DELETE_VERTEX':
# 从Elasticsearch删除
es.delete(index='vertices', id=record['data']['id'])
5.2 实时通知 #
python
# 发送实时通知
def send_notification(record):
if record['data']['type'] == 'ADD_EDGE':
if record['data']['label'] == 'follows':
# 发送关注通知
from_user = record['data']['from']
to_user = record['data']['to']
send_email(to_user, f"You have a new follower: {from_user}")
5.3 审计日志 #
python
# 记录审计日志
def audit_log(record):
log_entry = {
'timestamp': datetime.now().isoformat(),
'event_type': record['data']['type'],
'resource_id': record['data'].get('id'),
'details': record['data']
}
# 存储到审计日志表
dynamodb.put_item(TableName='audit-logs', Item=log_entry)
5.4 缓存更新 #
python
# 更新缓存
def update_cache(record):
if record['data']['type'] in ['ADD_VERTEX', 'UPDATE_VERTEX']:
# 更新Redis缓存
redis.hset(
f"vertex:{record['data']['id']}",
mapping=record['data']['properties']
)
elif record['data']['type'] == 'DELETE_VERTEX':
# 删除缓存
redis.delete(f"vertex:{record['data']['id']}")
六、Streams最佳实践 #
6.1 处理策略 #
text
处理策略:
├── 批量处理:批量处理多条记录
├── 错误重试:处理失败时重试
├── 幂等处理:支持重复处理
├── 顺序保证:按顺序处理
└── 监控告警:监控处理状态
6.2 性能优化 #
text
性能优化建议:
├── 合理设置批量大小
├── 使用异步处理
├── 控制并发数
├── 监控延迟
└── 处理积压
6.3 错误处理 #
python
def process_records(records):
failed_records = []
for record in records:
try:
process_record(record)
except Exception as e:
failed_records.append({
'record': record,
'error': str(e)
})
if failed_records:
# 记录失败记录
log_failed_records(failed_records)
# 可以选择重试或发送到死信队列
return len(failed_records)
七、监控与告警 #
7.1 CloudWatch指标 #
text
关键指标:
├── StreamLatency:流延迟
├── StreamRecordsProcessed:处理记录数
├── StreamErrors:错误数
└── StreamLag:积压量
7.2 告警配置 #
bash
# 创建告警
aws cloudwatch put-metric-alarm \
--alarm-name neptune-stream-lag \
--metric-name StreamLag \
--namespace AWS/Neptune \
--threshold 1000 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 3
八、总结 #
Neptune Streams要点:
| 项目 | 说明 |
|---|---|
| 功能 | 变更数据捕获 |
| 事件类型 | ADD/DELETE/UPDATE |
| API | Streams REST API |
| 集成 | Lambda, Kinesis |
| 用途 | 同步、通知、审计 |
最佳实践:
- 合理设置批量大小
- 实现幂等处理
- 处理错误和重试
- 监控处理状态
- 控制处理延迟
下一步,让我们学习集群管理!
最后更新:2026-03-27