Amazon DocumentDB 变更流 #

一、变更流概述 #

1.1 什么是变更流 #

text
变更流(Change Streams):
├── 实时数据变更捕获
├── 类似数据库触发器
├── 异步事件处理
├── 支持过滤和投影
└── 可恢复的监听

1.2 变更流用途 #

text
应用场景:
├── 实时数据同步
├── 审计日志
├── 缓存更新
├── 消息通知
├── 数据分析
└── 搜索索引更新

1.3 变更流特点 #

text
变更流特点:
├── 实时性高
├── 有序性保证
├── 可恢复性
├── 支持过滤
├── 支持投影
└── 低延迟

二、启用变更流 #

2.1 检查变更流支持 #

javascript
// 检查是否启用变更流
db.adminCommand({
  getParameter: 1,
  changeStreamOptions: 1
})

2.2 启用变更流 #

javascript
// DocumentDB默认启用变更流
// 无需额外配置

// 检查集合变更流
db.users.watch()

三、基本使用 #

3.1 监听集合变更 #

javascript
// 监听集合的所有变更
const changeStream = db.users.watch();

changeStream.on('change', (change) => {
  console.log('变更事件:', change);
});

3.2 监听数据库变更 #

javascript
// 监听整个数据库的变更
const changeStream = db.watch();

changeStream.on('change', (change) => {
  console.log('数据库变更:', change);
});

3.3 监听所有变更 #

javascript
// 监听所有数据库的变更
const changeStream = client.watch();

changeStream.on('change', (change) => {
  console.log('全局变更:', change);
});

四、变更事件结构 #

4.1 插入事件 #

javascript
{
  "_id": { ... },  // Resume Token
  "operationType": "insert",
  "fullDocument": {
    "_id": ObjectId("..."),
    "name": "张三",
    "email": "zhangsan@example.com"
  },
  "ns": {
    "db": "mydb",
    "coll": "users"
  },
  "documentKey": {
    "_id": ObjectId("...")
  }
}

4.2 更新事件 #

javascript
{
  "_id": { ... },
  "operationType": "update",
  "fullDocument": {
    "_id": ObjectId("..."),
    "name": "张三",
    "age": 31
  },
  "ns": {
    "db": "mydb",
    "coll": "users"
  },
  "documentKey": {
    "_id": ObjectId("...")
  },
  "updateDescription": {
    "updatedFields": {
      "age": 31
    },
    "removedFields": []
  }
}

4.3 删除事件 #

javascript
{
  "_id": { ... },
  "operationType": "delete",
  "ns": {
    "db": "mydb",
    "coll": "users"
  },
  "documentKey": {
    "_id": ObjectId("...")
  }
}

4.4 替换事件 #

javascript
{
  "_id": { ... },
  "operationType": "replace",
  "fullDocument": {
    "_id": ObjectId("..."),
    "name": "张三",
    "age": 30,
    "city": "北京"
  },
  "ns": {
    "db": "mydb",
    "coll": "users"
  },
  "documentKey": {
    "_id": ObjectId("...")
  }
}

4.5 操作类型 #

text
操作类型:
├── insert - 插入文档
├── update - 更新文档
├── replace - 替换文档
├── delete - 删除文档
├── invalidate - 流失效
├── drop - 删除集合
├── rename - 重命名集合
└── dropDatabase - 删除数据库

五、过滤变更流 #

5.1 按操作类型过滤 #

javascript
// 只监听插入和更新
const changeStream = db.users.watch([
  {
    $match: {
      operationType: { $in: ['insert', 'update'] }
    }
  }
]);

5.2 按字段值过滤 #

javascript
// 只监听特定字段的变更
const changeStream = db.orders.watch([
  {
    $match: {
      'fullDocument.status': 'completed'
    }
  }
]);

5.3 按更新字段过滤 #

javascript
// 只监听特定字段的更新
const changeStream = db.users.watch([
  {
    $match: {
      operationType: 'update',
      'updateDescription.updatedFields.status': { $exists: true }
    }
  }
]);

5.4 复杂过滤 #

javascript
// 复杂过滤条件
const changeStream = db.orders.watch([
  {
    $match: {
      $or: [
        { operationType: 'insert' },
        {
          operationType: 'update',
          'updateDescription.updatedFields.status': { $exists: true }
        }
      ]
    }
  }
]);

六、投影 #

6.1 投影文档字段 #

javascript
// 只返回特定字段
const changeStream = db.users.watch(
  [],
  {
    fullDocument: 'updateLookup',
    fullDocumentBeforeChange: 'whenAvailable',
    projection: { 'fullDocument.name': 1, 'fullDocument.email': 1 }
  }
);

6.2 fullDocument选项 #

javascript
// fullDocument选项
const changeStream = db.users.watch([], {
  fullDocument: 'updateLookup'  // 更新时返回完整文档
});

// 选项值:
// default - 插入和替换时返回
// updateLookup - 更新时也返回完整文档
// whenAvailable - 可用时返回
// required - 必须返回,否则报错

6.3 fullDocumentBeforeChange选项 #

javascript
// 返回变更前的文档
const changeStream = db.users.watch([], {
  fullDocumentBeforeChange: 'whenAvailable'
});

七、恢复变更流 #

7.1 Resume Token #

javascript
// 获取Resume Token
const changeStream = db.users.watch();

let resumeToken;
changeStream.on('change', (change) => {
  resumeToken = change._id;
  console.log('Resume Token:', resumeToken);
});

7.2 从Token恢复 #

javascript
// 从Resume Token恢复
const changeStream = db.users.watch([], {
  resumeAfter: resumeToken
});

// 或者从时间戳开始
const changeStream = db.users.watch([], {
  startAtOperationTime: timestamp
});

7.3 持久化Resume Token #

javascript
// 保存Resume Token到文件
const fs = require('fs');

async function watchWithResume() {
  let resumeToken;
  
  // 尝试读取保存的Token
  try {
    const data = fs.readFileSync('resume_token.json', 'utf8');
    resumeToken = JSON.parse(data);
  } catch (e) {
    // 文件不存在,从头开始
  }
  
  const options = resumeToken 
    ? { resumeAfter: resumeToken }
    : {};
  
  const changeStream = db.users.watch([], options);
  
  changeStream.on('change', (change) => {
    // 处理变更
    handleChange(change);
    
    // 保存Token
    fs.writeFileSync(
      'resume_token.json',
      JSON.stringify(change._id)
    );
  });
}

八、实际应用示例 #

8.1 实时数据同步 #

javascript
// 同步数据到Elasticsearch
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'http://localhost:9200' });

async function syncToElasticsearch() {
  const changeStream = db.products.watch([], {
    fullDocument: 'updateLookup'
  });
  
  changeStream.on('change', async (change) => {
    const doc = change.fullDocument;
    
    switch (change.operationType) {
      case 'insert':
      case 'update':
      case 'replace':
        await esClient.index({
          index: 'products',
          id: doc._id.toString(),
          body: doc
        });
        console.log('同步文档:', doc._id);
        break;
        
      case 'delete':
        await esClient.delete({
          index: 'products',
          id: change.documentKey._id.toString()
        });
        console.log('删除文档:', change.documentKey._id);
        break;
    }
  });
}

8.2 审计日志 #

javascript
// 记录审计日志
async function auditLog() {
  const changeStream = db.users.watch([], {
    fullDocument: 'updateLookup',
    fullDocumentBeforeChange: 'whenAvailable'
  });
  
  changeStream.on('change', async (change) => {
    await db.auditLogs.insertOne({
      collection: change.ns.coll,
      operation: change.operationType,
      documentId: change.documentKey._id,
      oldDocument: change.fullDocumentBeforeChange,
      newDocument: change.fullDocument,
      timestamp: new Date(),
      changedFields: change.updateDescription?.updatedFields
    });
  });
}

8.3 缓存更新 #

javascript
// 更新Redis缓存
const redis = require('redis');
const redisClient = redis.createClient();

async function updateCache() {
  const changeStream = db.users.watch([], {
    fullDocument: 'updateLookup'
  });
  
  changeStream.on('change', async (change) => {
    const userId = change.documentKey._id.toString();
    const cacheKey = `user:${userId}`;
    
    switch (change.operationType) {
      case 'insert':
      case 'update':
      case 'replace':
        await redisClient.set(
          cacheKey,
          JSON.stringify(change.fullDocument)
        );
        break;
        
      case 'delete':
        await redisClient.del(cacheKey);
        break;
    }
  });
}

8.4 消息通知 #

javascript
// 发送WebSocket通知
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

async function notifyClients() {
  const changeStream = db.notifications.watch([
    { $match: { operationType: 'insert' } }
  ]);
  
  changeStream.on('change', (change) => {
    const notification = change.fullDocument;
    
    // 广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({
          type: 'notification',
          data: notification
        }));
      }
    });
  });
}

九、错误处理 #

9.1 连接断开 #

javascript
// 处理连接断开
async function watchWithRetry() {
  let resumeToken;
  
  while (true) {
    try {
      const options = resumeToken 
        ? { resumeAfter: resumeToken }
        : {};
      
      const changeStream = db.users.watch([], options);
      
      changeStream.on('change', (change) => {
        resumeToken = change._id;
        handleChange(change);
      });
      
      changeStream.on('error', (error) => {
        console.error('变更流错误:', error);
        // 将在下一个循环重试
      });
      
      // 等待流结束
      await new Promise((resolve) => {
        changeStream.on('close', resolve);
      });
      
    } catch (error) {
      console.error('连接错误:', error);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

9.2 流失效 #

javascript
// 处理invalidate事件
changeStream.on('change', (change) => {
  if (change.operationType === 'invalidate') {
    console.log('变更流失效,重新启动');
    // 重新启动变更流
    restartChangeStream();
  }
});

十、性能考虑 #

10.1 过滤优化 #

text
性能建议:
├── 尽早过滤减少处理量
├── 使用投影减少数据传输
├── 合理设置batchSize
├── 避免复杂过滤条件
└── 监控变更流延迟

10.2 资源使用 #

text
资源考虑:
├── 变更流消耗网络带宽
├── 需要足够的内存缓存
├── 影响主实例性能
├── 建议使用副本读取
└── 监控资源使用

10.3 批量处理 #

javascript
// 批量处理变更
async function batchProcess() {
  const changeStream = db.orders.watch();
  const batch = [];
  const BATCH_SIZE = 100;
  
  changeStream.on('change', (change) => {
    batch.push(change);
    
    if (batch.length >= BATCH_SIZE) {
      processBatch([...batch]);
      batch.length = 0;
    }
  });
  
  // 定期处理剩余
  setInterval(() => {
    if (batch.length > 0) {
      processBatch([...batch]);
      batch.length = 0;
    }
  }, 5000);
}

十一、最佳实践 #

11.1 设计原则 #

text
设计原则:
├── 使用过滤减少不必要的事件
├── 实现错误处理和重试
├── 持久化Resume Token
├── 监控变更流健康状态
└── 合理处理背压

11.2 错误处理 #

text
错误处理:
├── 捕获所有错误
├── 实现重连机制
├── 保存Resume Token
├── 处理invalidate事件
└── 记录错误日志

11.3 监控 #

text
监控指标:
├── 变更流延迟
├── 处理速率
├── 错误率
├── Resume Token位置
└── 资源使用

十二、总结 #

12.1 变更流要点 #

要点 说明
监听 watch()方法
过滤 $match阶段
投影 projection选项
恢复 Resume Token
错误处理 重连和重试

12.2 最佳实践总结 #

text
变更流最佳实践:
├── 使用过滤减少事件量
├── 实现可靠的错误处理
├── 持久化Resume Token
├── 监控变更流性能
└── 合理处理背压

下一步,让我们学习集群管理!

最后更新:2026-03-27