Amazon DocumentDB 变更流 #
一、变更流概述 #
1.1 什么是变更流 #
text
变更流(Change Streams):
├── 实时数据变更捕获
├── 类似数据库触发器
├── 异步事件处理
├── 支持过滤和投影
└── 可恢复的监听
1.2 变更流用途 #
text
应用场景:
├── 实时数据同步
├── 审计日志
├── 缓存更新
├── 消息通知
├── 数据分析
└── 搜索索引更新
1.3 变更流特点 #
text
变更流特点:
├── 实时性高
├── 有序性保证
├── 可恢复性
├── 支持过滤
├── 支持投影
└── 低延迟
二、启用变更流 #
2.1 检查变更流支持 #
javascript
// 检查是否启用变更流
db.adminCommand({
getParameter: 1,
changeStreamOptions: 1
})
2.2 启用变更流 #
javascript
// DocumentDB默认启用变更流
// 无需额外配置
// 检查集合变更流
db.users.watch()
三、基本使用 #
3.1 监听集合变更 #
javascript
// 监听集合的所有变更
const changeStream = db.users.watch();
changeStream.on('change', (change) => {
console.log('变更事件:', change);
});
3.2 监听数据库变更 #
javascript
// 监听整个数据库的变更
const changeStream = db.watch();
changeStream.on('change', (change) => {
console.log('数据库变更:', change);
});
3.3 监听所有变更 #
javascript
// 监听所有数据库的变更
const changeStream = client.watch();
changeStream.on('change', (change) => {
console.log('全局变更:', change);
});
四、变更事件结构 #
4.1 插入事件 #
javascript
{
"_id": { ... }, // Resume Token
"operationType": "insert",
"fullDocument": {
"_id": ObjectId("..."),
"name": "张三",
"email": "zhangsan@example.com"
},
"ns": {
"db": "mydb",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
}
}
4.2 更新事件 #
javascript
{
"_id": { ... },
"operationType": "update",
"fullDocument": {
"_id": ObjectId("..."),
"name": "张三",
"age": 31
},
"ns": {
"db": "mydb",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
},
"updateDescription": {
"updatedFields": {
"age": 31
},
"removedFields": []
}
}
4.3 删除事件 #
javascript
{
"_id": { ... },
"operationType": "delete",
"ns": {
"db": "mydb",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
}
}
4.4 替换事件 #
javascript
{
"_id": { ... },
"operationType": "replace",
"fullDocument": {
"_id": ObjectId("..."),
"name": "张三",
"age": 30,
"city": "北京"
},
"ns": {
"db": "mydb",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
}
}
4.5 操作类型 #
text
操作类型:
├── insert - 插入文档
├── update - 更新文档
├── replace - 替换文档
├── delete - 删除文档
├── invalidate - 流失效
├── drop - 删除集合
├── rename - 重命名集合
└── dropDatabase - 删除数据库
五、过滤变更流 #
5.1 按操作类型过滤 #
javascript
// 只监听插入和更新
const changeStream = db.users.watch([
{
$match: {
operationType: { $in: ['insert', 'update'] }
}
}
]);
5.2 按字段值过滤 #
javascript
// 只监听特定字段的变更
const changeStream = db.orders.watch([
{
$match: {
'fullDocument.status': 'completed'
}
}
]);
5.3 按更新字段过滤 #
javascript
// 只监听特定字段的更新
const changeStream = db.users.watch([
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
]);
5.4 复杂过滤 #
javascript
// 复杂过滤条件
const changeStream = db.orders.watch([
{
$match: {
$or: [
{ operationType: 'insert' },
{
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
]
}
}
]);
六、投影 #
6.1 投影文档字段 #
javascript
// 只返回特定字段
const changeStream = db.users.watch(
[],
{
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable',
projection: { 'fullDocument.name': 1, 'fullDocument.email': 1 }
}
);
6.2 fullDocument选项 #
javascript
// fullDocument选项
const changeStream = db.users.watch([], {
fullDocument: 'updateLookup' // 更新时返回完整文档
});
// 选项值:
// default - 插入和替换时返回
// updateLookup - 更新时也返回完整文档
// whenAvailable - 可用时返回
// required - 必须返回,否则报错
6.3 fullDocumentBeforeChange选项 #
javascript
// 返回变更前的文档
const changeStream = db.users.watch([], {
fullDocumentBeforeChange: 'whenAvailable'
});
七、恢复变更流 #
7.1 Resume Token #
javascript
// 获取Resume Token
const changeStream = db.users.watch();
let resumeToken;
changeStream.on('change', (change) => {
resumeToken = change._id;
console.log('Resume Token:', resumeToken);
});
7.2 从Token恢复 #
javascript
// 从Resume Token恢复
const changeStream = db.users.watch([], {
resumeAfter: resumeToken
});
// 或者从时间戳开始
const changeStream = db.users.watch([], {
startAtOperationTime: timestamp
});
7.3 持久化Resume Token #
javascript
// 保存Resume Token到文件
const fs = require('fs');
async function watchWithResume() {
let resumeToken;
// 尝试读取保存的Token
try {
const data = fs.readFileSync('resume_token.json', 'utf8');
resumeToken = JSON.parse(data);
} catch (e) {
// 文件不存在,从头开始
}
const options = resumeToken
? { resumeAfter: resumeToken }
: {};
const changeStream = db.users.watch([], options);
changeStream.on('change', (change) => {
// 处理变更
handleChange(change);
// 保存Token
fs.writeFileSync(
'resume_token.json',
JSON.stringify(change._id)
);
});
}
八、实际应用示例 #
8.1 实时数据同步 #
javascript
// 同步数据到Elasticsearch
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'http://localhost:9200' });
async function syncToElasticsearch() {
const changeStream = db.products.watch([], {
fullDocument: 'updateLookup'
});
changeStream.on('change', async (change) => {
const doc = change.fullDocument;
switch (change.operationType) {
case 'insert':
case 'update':
case 'replace':
await esClient.index({
index: 'products',
id: doc._id.toString(),
body: doc
});
console.log('同步文档:', doc._id);
break;
case 'delete':
await esClient.delete({
index: 'products',
id: change.documentKey._id.toString()
});
console.log('删除文档:', change.documentKey._id);
break;
}
});
}
8.2 审计日志 #
javascript
// 记录审计日志
async function auditLog() {
const changeStream = db.users.watch([], {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
});
changeStream.on('change', async (change) => {
await db.auditLogs.insertOne({
collection: change.ns.coll,
operation: change.operationType,
documentId: change.documentKey._id,
oldDocument: change.fullDocumentBeforeChange,
newDocument: change.fullDocument,
timestamp: new Date(),
changedFields: change.updateDescription?.updatedFields
});
});
}
8.3 缓存更新 #
javascript
// 更新Redis缓存
const redis = require('redis');
const redisClient = redis.createClient();
async function updateCache() {
const changeStream = db.users.watch([], {
fullDocument: 'updateLookup'
});
changeStream.on('change', async (change) => {
const userId = change.documentKey._id.toString();
const cacheKey = `user:${userId}`;
switch (change.operationType) {
case 'insert':
case 'update':
case 'replace':
await redisClient.set(
cacheKey,
JSON.stringify(change.fullDocument)
);
break;
case 'delete':
await redisClient.del(cacheKey);
break;
}
});
}
8.4 消息通知 #
javascript
// 发送WebSocket通知
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
async function notifyClients() {
const changeStream = db.notifications.watch([
{ $match: { operationType: 'insert' } }
]);
changeStream.on('change', (change) => {
const notification = change.fullDocument;
// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
type: 'notification',
data: notification
}));
}
});
});
}
九、错误处理 #
9.1 连接断开 #
javascript
// 处理连接断开
async function watchWithRetry() {
let resumeToken;
while (true) {
try {
const options = resumeToken
? { resumeAfter: resumeToken }
: {};
const changeStream = db.users.watch([], options);
changeStream.on('change', (change) => {
resumeToken = change._id;
handleChange(change);
});
changeStream.on('error', (error) => {
console.error('变更流错误:', error);
// 将在下一个循环重试
});
// 等待流结束
await new Promise((resolve) => {
changeStream.on('close', resolve);
});
} catch (error) {
console.error('连接错误:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
9.2 流失效 #
javascript
// 处理invalidate事件
changeStream.on('change', (change) => {
if (change.operationType === 'invalidate') {
console.log('变更流失效,重新启动');
// 重新启动变更流
restartChangeStream();
}
});
十、性能考虑 #
10.1 过滤优化 #
text
性能建议:
├── 尽早过滤减少处理量
├── 使用投影减少数据传输
├── 合理设置batchSize
├── 避免复杂过滤条件
└── 监控变更流延迟
10.2 资源使用 #
text
资源考虑:
├── 变更流消耗网络带宽
├── 需要足够的内存缓存
├── 影响主实例性能
├── 建议使用副本读取
└── 监控资源使用
10.3 批量处理 #
javascript
// 批量处理变更
async function batchProcess() {
const changeStream = db.orders.watch();
const batch = [];
const BATCH_SIZE = 100;
changeStream.on('change', (change) => {
batch.push(change);
if (batch.length >= BATCH_SIZE) {
processBatch([...batch]);
batch.length = 0;
}
});
// 定期处理剩余
setInterval(() => {
if (batch.length > 0) {
processBatch([...batch]);
batch.length = 0;
}
}, 5000);
}
十一、最佳实践 #
11.1 设计原则 #
text
设计原则:
├── 使用过滤减少不必要的事件
├── 实现错误处理和重试
├── 持久化Resume Token
├── 监控变更流健康状态
└── 合理处理背压
11.2 错误处理 #
text
错误处理:
├── 捕获所有错误
├── 实现重连机制
├── 保存Resume Token
├── 处理invalidate事件
└── 记录错误日志
11.3 监控 #
text
监控指标:
├── 变更流延迟
├── 处理速率
├── 错误率
├── Resume Token位置
└── 资源使用
十二、总结 #
12.1 变更流要点 #
| 要点 | 说明 |
|---|---|
| 监听 | watch()方法 |
| 过滤 | $match阶段 |
| 投影 | projection选项 |
| 恢复 | Resume Token |
| 错误处理 | 重连和重试 |
12.2 最佳实践总结 #
text
变更流最佳实践:
├── 使用过滤减少事件量
├── 实现可靠的错误处理
├── 持久化Resume Token
├── 监控变更流性能
└── 合理处理背压
下一步,让我们学习集群管理!
最后更新:2026-03-27