Amazon DocumentDB 聚合管道 #
一、聚合管道概述 #
1.1 什么是聚合管道 #
text
聚合管道:
├── 数据处理管道
├── 多个阶段顺序执行
├── 每个阶段处理并传递结果
├── 支持复杂的数据转换
└── 类似Linux管道操作
1.2 基本语法 #
javascript
db.collection.aggregate([
{ stage1 },
{ stage2 },
{ stage3 }
])
1.3 常用阶段 #
text
常用聚合阶段:
├── $match - 过滤
├── $group - 分组
├── $project - 投影
├── $sort - 排序
├── $limit - 限制数量
├── $skip - 跳过
├── $unwind - 展开数组
├── $lookup - 关联查询
├── $addFields - 添加字段
└── $facet - 多管道并行
二、$match - 过滤 #
2.1 基本用法 #
javascript
// 过滤文档
db.orders.aggregate([
{
$match: {
status: "completed",
amount: { $gte: 100 }
}
}
])
// 尽早使用$match减少数据量
db.orders.aggregate([
{ $match: { createdAt: { $gte: new Date("2024-01-01") } } },
{ $group: { _id: "$status", total: { $sum: "$amount" } } }
])
2.2 与find的区别 #
text
$match vs find:
├── $match在管道中使用
├── 可以与其他阶段组合
├── 支持索引优化
└── 建议放在管道开头
三、$group - 分组 #
3.1 基本分组 #
javascript
// 按字段分组
db.orders.aggregate([
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
])
// 输出
[
{ "_id": "pending", "count": 50 },
{ "_id": "completed", "count": 100 },
{ "_id": "cancelled", "count": 20 }
]
3.2 分组聚合函数 #
javascript
// 多种聚合函数
db.orders.aggregate([
{
$group: {
_id: "$productId",
totalSales: { $sum: "$amount" },
avgAmount: { $avg: "$amount" },
minAmount: { $min: "$amount" },
maxAmount: { $max: "$amount" },
firstOrder: { $first: "$createdAt" },
lastOrder: { $last: "$createdAt" },
orderCount: { $sum: 1 },
uniqueUsers: { $addToSet: "$userId" }
}
}
])
3.3 多字段分组 #
javascript
// 按多个字段分组
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
status: "$status"
},
total: { $sum: "$amount" }
}
}
])
3.4 分组后筛选 #
javascript
// 使用$match筛选分组结果
db.orders.aggregate([
{
$group: {
_id: "$productId",
totalSales: { $sum: "$amount" }
}
},
{
$match: {
totalSales: { $gte: 10000 }
}
}
])
四、$project - 投影 #
4.1 基本投影 #
javascript
// 选择和重命名字段
db.users.aggregate([
{
$project: {
_id: 0,
name: 1,
email: 1,
username: "$name" // 重命名
}
}
])
4.2 计算字段 #
javascript
// 添加计算字段
db.orders.aggregate([
{
$project: {
orderId: "$_id",
total: { $multiply: ["$price", "$quantity"] },
discount: { $cond: {
if: { $gte: ["$quantity", 10] },
then: 0.1,
else: 0
}}
}
}
])
4.3 嵌套字段投影 #
javascript
// 投影嵌套字段
db.users.aggregate([
{
$project: {
name: 1,
city: "$address.city",
country: "$address.country"
}
}
])
五、$sort - 排序 #
5.1 基本排序 #
javascript
// 升序排序
db.orders.aggregate([
{ $sort: { createdAt: 1 } }
])
// 降序排序
db.orders.aggregate([
{ $sort: { amount: -1 } }
])
// 多字段排序
db.orders.aggregate([
{ $sort: { status: 1, createdAt: -1 } }
])
5.2 排序与内存 #
javascript
// 允许使用磁盘排序
db.orders.aggregate([
{ $sort: { amount: -1 } }
], { allowDiskUse: true })
六、$unwind - 展开数组 #
6.1 基本用法 #
javascript
// 展开数组
db.orders.aggregate([
{ $unwind: "$items" }
])
// 输入
{ "_id": 1, "items": ["A", "B", "C"] }
// 输出
{ "_id": 1, "items": "A" }
{ "_id": 1, "items": "B" }
{ "_id": 1, "items": "C" }
6.2 保留空数组 #
javascript
// 保留空数组文档
db.orders.aggregate([
{
$unwind: {
path: "$items",
preserveNullAndEmptyArrays: true
}
}
])
6.3 展开嵌套数组 #
javascript
// 展开嵌套对象数组
db.orders.aggregate([
{ $unwind: "$items" },
{
$project: {
productId: "$items.productId",
quantity: "$items.quantity",
price: "$items.price"
}
}
])
七、$lookup - 关联查询 #
7.1 基本关联 #
javascript
// 左连接
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
}
])
7.2 管道关联 #
javascript
// 使用管道进行复杂关联
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { productId: "$productId" },
pipeline: [
{
$match: {
$expr: { $eq: ["$_id", "$$productId"] }
}
},
{ $project: { name: 1, price: 1 } }
],
as: "product"
}
}
])
7.3 多集合关联 #
javascript
// 关联多个集合
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
},
{
$lookup: {
from: "products",
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$user"
},
{
$unwind: "$product"
}
])
八、$addFields - 添加字段 #
8.1 基本用法 #
javascript
// 添加新字段
db.orders.aggregate([
{
$addFields: {
totalAmount: { $multiply: ["$price", "$quantity"] },
orderYear: { $year: "$createdAt" }
}
}
])
8.2 与$project的区别 #
text
$addFields vs $project:
├── $addFields保留所有字段
├── $project需要指定保留字段
├── $addFields可多次使用
└── 都可以添加计算字段
九、$facet - 多管道并行 #
9.1 基本用法 #
javascript
// 并行执行多个聚合管道
db.products.aggregate([
{
$facet: {
categories: [
{ $group: { _id: "$category", count: { $sum: 1 } } }
],
priceRanges: [
{
$bucket: {
groupBy: "$price",
boundaries: [0, 100, 500, 1000, 5000],
default: "other"
}
}
],
topRated: [
{ $sort: { rating: -1 } },
{ $limit: 5 }
],
totalCount: [
{ $count: "count" }
]
}
}
])
9.2 分页统计 #
javascript
// 同时获取数据和总数
db.products.aggregate([
{ $match: { status: "active" } },
{
$facet: {
data: [
{ $sort: { createdAt: -1 } },
{ $skip: 0 },
{ $limit: 10 }
],
totalCount: [
{ $count: "count" }
]
}
}
])
十、常用聚合操作符 #
10.1 算术操作符 #
javascript
// 算术运算
db.orders.aggregate([
{
$project: {
total: { $add: ["$price", "$tax"] },
discount: { $subtract: ["$price", "$discount"] },
quantity: { $multiply: ["$price", "$quantity"] },
average: { $divide: ["$total", "$count"] },
remainder: { $mod: ["$total", 100] }
}
}
])
10.2 字符串操作符 #
javascript
// 字符串操作
db.users.aggregate([
{
$project: {
upperName: { $toUpper: "$name" },
lowerEmail: { $toLower: "$email" },
fullName: { $concat: ["$firstName", " ", "$lastName"] },
substring: { $substr: ["$phone", 0, 3] },
strLength: { $strLenCP: "$name" }
}
}
])
10.3 日期操作符 #
javascript
// 日期操作
db.orders.aggregate([
{
$project: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" },
hour: { $hour: "$createdAt" },
dayOfWeek: { $dayOfWeek: "$createdAt" },
week: { $week: "$createdAt" },
formatted: {
$dateToString: {
format: "%Y-%m-%d %H:%M:%S",
date: "$createdAt"
}
}
}
}
])
10.4 条件操作符 #
javascript
// 条件表达式
db.orders.aggregate([
{
$project: {
status: {
$cond: {
if: { $gte: ["$amount", 1000] },
then: "high",
else: "normal"
}
},
level: {
$switch: {
branches: [
{ case: { $lt: ["$amount", 100] }, then: "low" },
{ case: { $lt: ["$amount", 500] }, then: "medium" },
{ case: { $gte: ["$amount", 500] }, then: "high" }
],
default: "unknown"
}
}
}
}
])
10.5 数组操作符 #
javascript
// 数组操作
db.orders.aggregate([
{
$project: {
arraySize: { $size: "$items" },
firstItem: { $arrayElemAt: ["$items", 0] },
lastItem: { $arrayElemAt: ["$items", -1] },
slice: { $slice: ["$items", 2, 3] },
concatArrays: { $concatArrays: ["$items", "$extras"] },
filterArray: {
$filter: {
input: "$items",
as: "item",
cond: { $gte: ["$$item.price", 100] }
}
}
}
}
])
十一、实际应用示例 #
11.1 销售报表 #
javascript
// 月度销售报表
db.orders.aggregate([
{
$match: {
createdAt: {
$gte: new Date("2024-01-01"),
$lt: new Date("2024-02-01")
}
}
},
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" }
},
totalSales: { $sum: "$amount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$amount" },
maxOrder: { $max: "$amount" },
minOrder: { $min: "$amount" }
}
},
{ $sort: { "_id.day": 1 } }
])
11.2 用户行为分析 #
javascript
// 用户行为统计
db.userActions.aggregate([
{
$match: {
timestamp: {
$gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
}
}
},
{
$group: {
_id: {
userId: "$userId",
action: "$action"
},
count: { $sum: 1 },
lastAction: { $max: "$timestamp" }
}
},
{
$group: {
_id: "$_id.userId",
actions: {
$push: {
action: "$_id.action",
count: "$count",
lastAction: "$lastAction"
}
},
totalActions: { $sum: "$count" }
}
},
{ $sort: { totalActions: -1 } },
{ $limit: 100 }
])
11.3 库存分析 #
javascript
// 库存预警分析
db.products.aggregate([
{
$lookup: {
from: "inventory",
localField: "_id",
foreignField: "productId",
as: "inventory"
}
},
{ $unwind: "$inventory" },
{
$addFields: {
stockRatio: {
$divide: ["$inventory.quantity", "$inventory.threshold"]
}
}
},
{
$match: {
stockRatio: { $lt: 1.2 }
}
},
{
$project: {
name: 1,
currentStock: "$inventory.quantity",
threshold: "$inventory.threshold",
stockRatio: 1,
status: {
$cond: {
if: { $lt: ["$stockRatio", 0.5] },
then: "critical",
else: "warning"
}
}
}
},
{ $sort: { stockRatio: 1 } }
])
十二、聚合优化 #
12.1 优化建议 #
text
聚合优化:
├── 尽早使用$match减少数据量
├── 在$match后使用索引
├── 合理使用$project减少字段
├── 避免不必要的$unwind
├── 使用allowDiskUse处理大数据
└── 监控聚合性能
12.2 内存限制 #
javascript
// 允许使用磁盘
db.orders.aggregate([...], { allowDiskUse: true })
// 设置批量大小
db.orders.aggregate([...], { cursor: { batchSize: 100 } })
十三、总结 #
13.1 聚合阶段速查 #
| 阶段 | 用途 |
|---|---|
| $match | 过滤文档 |
| $group | 分组聚合 |
| $project | 投影字段 |
| $sort | 排序 |
| $limit/$skip | 分页 |
| $unwind | 展开数组 |
| $lookup | 关联查询 |
| $addFields | 添加字段 |
| $facet | 多管道并行 |
13.2 最佳实践 #
text
聚合最佳实践:
├── 理解数据流向
├── 优化管道顺序
├── 合理使用索引
├── 控制内存使用
├── 测试和监控性能
└── 处理大数据使用allowDiskUse
下一步,让我们学习索引管理!
最后更新:2026-03-27