Amazon DocumentDB 聚合管道 #

一、聚合管道概述 #

1.1 什么是聚合管道 #

text
聚合管道:
├── 数据处理管道
├── 多个阶段顺序执行
├── 每个阶段处理并传递结果
├── 支持复杂的数据转换
└── 类似Linux管道操作

1.2 基本语法 #

javascript
db.collection.aggregate([
  { stage1 },
  { stage2 },
  { stage3 }
])

1.3 常用阶段 #

text
常用聚合阶段:
├── $match - 过滤
├── $group - 分组
├── $project - 投影
├── $sort - 排序
├── $limit - 限制数量
├── $skip - 跳过
├── $unwind - 展开数组
├── $lookup - 关联查询
├── $addFields - 添加字段
└── $facet - 多管道并行

二、$match - 过滤 #

2.1 基本用法 #

javascript
// 过滤文档
db.orders.aggregate([
  {
    $match: {
      status: "completed",
      amount: { $gte: 100 }
    }
  }
])

// 尽早使用$match减少数据量
db.orders.aggregate([
  { $match: { createdAt: { $gte: new Date("2024-01-01") } } },
  { $group: { _id: "$status", total: { $sum: "$amount" } } }
])

2.2 与find的区别 #

text
$match vs find:
├── $match在管道中使用
├── 可以与其他阶段组合
├── 支持索引优化
└── 建议放在管道开头

三、$group - 分组 #

3.1 基本分组 #

javascript
// 按字段分组
db.orders.aggregate([
  {
    $group: {
      _id: "$status",
      count: { $sum: 1 }
    }
  }
])

// 输出
[
  { "_id": "pending", "count": 50 },
  { "_id": "completed", "count": 100 },
  { "_id": "cancelled", "count": 20 }
]

3.2 分组聚合函数 #

javascript
// 多种聚合函数
db.orders.aggregate([
  {
    $group: {
      _id: "$productId",
      totalSales: { $sum: "$amount" },
      avgAmount: { $avg: "$amount" },
      minAmount: { $min: "$amount" },
      maxAmount: { $max: "$amount" },
      firstOrder: { $first: "$createdAt" },
      lastOrder: { $last: "$createdAt" },
      orderCount: { $sum: 1 },
      uniqueUsers: { $addToSet: "$userId" }
    }
  }
])

3.3 多字段分组 #

javascript
// 按多个字段分组
db.orders.aggregate([
  {
    $group: {
      _id: {
        year: { $year: "$createdAt" },
        month: { $month: "$createdAt" },
        status: "$status"
      },
      total: { $sum: "$amount" }
    }
  }
])

3.4 分组后筛选 #

javascript
// 使用$match筛选分组结果
db.orders.aggregate([
  {
    $group: {
      _id: "$productId",
      totalSales: { $sum: "$amount" }
    }
  },
  {
    $match: {
      totalSales: { $gte: 10000 }
    }
  }
])

四、$project - 投影 #

4.1 基本投影 #

javascript
// 选择和重命名字段
db.users.aggregate([
  {
    $project: {
      _id: 0,
      name: 1,
      email: 1,
      username: "$name"  // 重命名
    }
  }
])

4.2 计算字段 #

javascript
// 添加计算字段
db.orders.aggregate([
  {
    $project: {
      orderId: "$_id",
      total: { $multiply: ["$price", "$quantity"] },
      discount: { $cond: {
        if: { $gte: ["$quantity", 10] },
        then: 0.1,
        else: 0
      }}
    }
  }
])

4.3 嵌套字段投影 #

javascript
// 投影嵌套字段
db.users.aggregate([
  {
    $project: {
      name: 1,
      city: "$address.city",
      country: "$address.country"
    }
  }
])

五、$sort - 排序 #

5.1 基本排序 #

javascript
// 升序排序
db.orders.aggregate([
  { $sort: { createdAt: 1 } }
])

// 降序排序
db.orders.aggregate([
  { $sort: { amount: -1 } }
])

// 多字段排序
db.orders.aggregate([
  { $sort: { status: 1, createdAt: -1 } }
])

5.2 排序与内存 #

javascript
// 允许使用磁盘排序
db.orders.aggregate([
  { $sort: { amount: -1 } }
], { allowDiskUse: true })

六、$unwind - 展开数组 #

6.1 基本用法 #

javascript
// 展开数组
db.orders.aggregate([
  { $unwind: "$items" }
])

// 输入
{ "_id": 1, "items": ["A", "B", "C"] }

// 输出
{ "_id": 1, "items": "A" }
{ "_id": 1, "items": "B" }
{ "_id": 1, "items": "C" }

6.2 保留空数组 #

javascript
// 保留空数组文档
db.orders.aggregate([
  {
    $unwind: {
      path: "$items",
      preserveNullAndEmptyArrays: true
    }
  }
])

6.3 展开嵌套数组 #

javascript
// 展开嵌套对象数组
db.orders.aggregate([
  { $unwind: "$items" },
  {
    $project: {
      productId: "$items.productId",
      quantity: "$items.quantity",
      price: "$items.price"
    }
  }
])

七、$lookup - 关联查询 #

7.1 基本关联 #

javascript
// 左连接
db.orders.aggregate([
  {
    $lookup: {
      from: "users",
      localField: "userId",
      foreignField: "_id",
      as: "user"
    }
  }
])

7.2 管道关联 #

javascript
// 使用管道进行复杂关联
db.orders.aggregate([
  {
    $lookup: {
      from: "products",
      let: { productId: "$productId" },
      pipeline: [
        {
          $match: {
            $expr: { $eq: ["$_id", "$$productId"] }
          }
        },
        { $project: { name: 1, price: 1 } }
      ],
      as: "product"
    }
  }
])

7.3 多集合关联 #

javascript
// 关联多个集合
db.orders.aggregate([
  {
    $lookup: {
      from: "users",
      localField: "userId",
      foreignField: "_id",
      as: "user"
    }
  },
  {
    $lookup: {
      from: "products",
      localField: "productId",
      foreignField: "_id",
      as: "product"
    }
  },
  {
    $unwind: "$user"
  },
  {
    $unwind: "$product"
  }
])

八、$addFields - 添加字段 #

8.1 基本用法 #

javascript
// 添加新字段
db.orders.aggregate([
  {
    $addFields: {
      totalAmount: { $multiply: ["$price", "$quantity"] },
      orderYear: { $year: "$createdAt" }
    }
  }
])

8.2 与$project的区别 #

text
$addFields vs $project:
├── $addFields保留所有字段
├── $project需要指定保留字段
├── $addFields可多次使用
└── 都可以添加计算字段

九、$facet - 多管道并行 #

9.1 基本用法 #

javascript
// 并行执行多个聚合管道
db.products.aggregate([
  {
    $facet: {
      categories: [
        { $group: { _id: "$category", count: { $sum: 1 } } }
      ],
      priceRanges: [
        {
          $bucket: {
            groupBy: "$price",
            boundaries: [0, 100, 500, 1000, 5000],
            default: "other"
          }
        }
      ],
      topRated: [
        { $sort: { rating: -1 } },
        { $limit: 5 }
      ],
      totalCount: [
        { $count: "count" }
      ]
    }
  }
])

9.2 分页统计 #

javascript
// 同时获取数据和总数
db.products.aggregate([
  { $match: { status: "active" } },
  {
    $facet: {
      data: [
        { $sort: { createdAt: -1 } },
        { $skip: 0 },
        { $limit: 10 }
      ],
      totalCount: [
        { $count: "count" }
      ]
    }
  }
])

十、常用聚合操作符 #

10.1 算术操作符 #

javascript
// 算术运算
db.orders.aggregate([
  {
    $project: {
      total: { $add: ["$price", "$tax"] },
      discount: { $subtract: ["$price", "$discount"] },
      quantity: { $multiply: ["$price", "$quantity"] },
      average: { $divide: ["$total", "$count"] },
      remainder: { $mod: ["$total", 100] }
    }
  }
])

10.2 字符串操作符 #

javascript
// 字符串操作
db.users.aggregate([
  {
    $project: {
      upperName: { $toUpper: "$name" },
      lowerEmail: { $toLower: "$email" },
      fullName: { $concat: ["$firstName", " ", "$lastName"] },
      substring: { $substr: ["$phone", 0, 3] },
      strLength: { $strLenCP: "$name" }
    }
  }
])

10.3 日期操作符 #

javascript
// 日期操作
db.orders.aggregate([
  {
    $project: {
      year: { $year: "$createdAt" },
      month: { $month: "$createdAt" },
      day: { $dayOfMonth: "$createdAt" },
      hour: { $hour: "$createdAt" },
      dayOfWeek: { $dayOfWeek: "$createdAt" },
      week: { $week: "$createdAt" },
      formatted: {
        $dateToString: {
          format: "%Y-%m-%d %H:%M:%S",
          date: "$createdAt"
        }
      }
    }
  }
])

10.4 条件操作符 #

javascript
// 条件表达式
db.orders.aggregate([
  {
    $project: {
      status: {
        $cond: {
          if: { $gte: ["$amount", 1000] },
          then: "high",
          else: "normal"
        }
      },
      level: {
        $switch: {
          branches: [
            { case: { $lt: ["$amount", 100] }, then: "low" },
            { case: { $lt: ["$amount", 500] }, then: "medium" },
            { case: { $gte: ["$amount", 500] }, then: "high" }
          ],
          default: "unknown"
        }
      }
    }
  }
])

10.5 数组操作符 #

javascript
// 数组操作
db.orders.aggregate([
  {
    $project: {
      arraySize: { $size: "$items" },
      firstItem: { $arrayElemAt: ["$items", 0] },
      lastItem: { $arrayElemAt: ["$items", -1] },
      slice: { $slice: ["$items", 2, 3] },
      concatArrays: { $concatArrays: ["$items", "$extras"] },
      filterArray: {
        $filter: {
          input: "$items",
          as: "item",
          cond: { $gte: ["$$item.price", 100] }
        }
      }
    }
  }
])

十一、实际应用示例 #

11.1 销售报表 #

javascript
// 月度销售报表
db.orders.aggregate([
  {
    $match: {
      createdAt: {
        $gte: new Date("2024-01-01"),
        $lt: new Date("2024-02-01")
      }
    }
  },
  {
    $group: {
      _id: {
        year: { $year: "$createdAt" },
        month: { $month: "$createdAt" },
        day: { $dayOfMonth: "$createdAt" }
      },
      totalSales: { $sum: "$amount" },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: "$amount" },
      maxOrder: { $max: "$amount" },
      minOrder: { $min: "$amount" }
    }
  },
  { $sort: { "_id.day": 1 } }
])

11.2 用户行为分析 #

javascript
// 用户行为统计
db.userActions.aggregate([
  {
    $match: {
      timestamp: {
        $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
      }
    }
  },
  {
    $group: {
      _id: {
        userId: "$userId",
        action: "$action"
      },
      count: { $sum: 1 },
      lastAction: { $max: "$timestamp" }
    }
  },
  {
    $group: {
      _id: "$_id.userId",
      actions: {
        $push: {
          action: "$_id.action",
          count: "$count",
          lastAction: "$lastAction"
        }
      },
      totalActions: { $sum: "$count" }
    }
  },
  { $sort: { totalActions: -1 } },
  { $limit: 100 }
])

11.3 库存分析 #

javascript
// 库存预警分析
db.products.aggregate([
  {
    $lookup: {
      from: "inventory",
      localField: "_id",
      foreignField: "productId",
      as: "inventory"
    }
  },
  { $unwind: "$inventory" },
  {
    $addFields: {
      stockRatio: {
        $divide: ["$inventory.quantity", "$inventory.threshold"]
      }
    }
  },
  {
    $match: {
      stockRatio: { $lt: 1.2 }
    }
  },
  {
    $project: {
      name: 1,
      currentStock: "$inventory.quantity",
      threshold: "$inventory.threshold",
      stockRatio: 1,
      status: {
        $cond: {
          if: { $lt: ["$stockRatio", 0.5] },
          then: "critical",
          else: "warning"
        }
      }
    }
  },
  { $sort: { stockRatio: 1 } }
])

十二、聚合优化 #

12.1 优化建议 #

text
聚合优化:
├── 尽早使用$match减少数据量
├── 在$match后使用索引
├── 合理使用$project减少字段
├── 避免不必要的$unwind
├── 使用allowDiskUse处理大数据
└── 监控聚合性能

12.2 内存限制 #

javascript
// 允许使用磁盘
db.orders.aggregate([...], { allowDiskUse: true })

// 设置批量大小
db.orders.aggregate([...], { cursor: { batchSize: 100 } })

十三、总结 #

13.1 聚合阶段速查 #

阶段 用途
$match 过滤文档
$group 分组聚合
$project 投影字段
$sort 排序
$limit/$skip 分页
$unwind 展开数组
$lookup 关联查询
$addFields 添加字段
$facet 多管道并行

13.2 最佳实践 #

text
聚合最佳实践:
├── 理解数据流向
├── 优化管道顺序
├── 合理使用索引
├── 控制内存使用
├── 测试和监控性能
└── 处理大数据使用allowDiskUse

下一步,让我们学习索引管理!

最后更新:2026-03-27