MongoDB聚合管道 #
一、聚合概述 #
1.1 什么是聚合管道 #
聚合管道(Aggregation Pipeline)是MongoDB强大的数据处理框架,通过一系列阶段(Stage)处理文档。
text
聚合管道流程
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 文档 │ -> │ Stage 1 │ -> │ Stage 2 │ -> │ 结果 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
1.2 基本语法 #
javascript
db.collection.aggregate([
{ stage1 },
{ stage2 },
{ stage3 }
])
1.3 常用阶段 #
| 阶段 | 说明 |
|---|---|
| $match | 过滤文档 |
| $group | 分组聚合 |
| $project | 投影字段 |
| $sort | 排序 |
| $limit | 限制数量 |
| $skip | 跳过文档 |
| $unwind | 展开数组 |
| $lookup | 关联查询 |
| $addFields | 添加字段 |
二、$match阶段 #
2.1 基本过滤 #
javascript
// 过滤文档
db.orders.aggregate([
{ $match: { status: "completed" } }
])
// 多条件过滤
db.orders.aggregate([
{
$match: {
status: "completed",
amount: { $gt: 100 }
}
}
])
2.2 日期过滤 #
javascript
// 日期范围过滤
db.orders.aggregate([
{
$match: {
createdAt: {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2025-01-01")
}
}
}
])
2.3 使用索引 #
javascript
// $match放在管道开头可以使用索引
db.orders.aggregate([
{ $match: { status: "completed" } }, // 可以使用索引
{ $group: { _id: "$userId", total: { $sum: "$amount" } } }
])
三、$group阶段 #
3.1 基本分组 #
javascript
// 按字段分组
db.orders.aggregate([
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
])
// 输出
[
{ "_id": "completed", "count": 100 },
{ "_id": "pending", "count": 50 },
{ "_id": "cancelled", "count": 20 }
]
3.2 分组聚合 #
javascript
// 多种聚合操作
db.orders.aggregate([
{
$group: {
_id: "$userId",
totalOrders: { $sum: 1 },
totalAmount: { $sum: "$amount" },
avgAmount: { $avg: "$amount" },
maxAmount: { $max: "$amount" },
minAmount: { $min: "$amount" },
firstOrder: { $first: "$createdAt" },
lastOrder: { $last: "$createdAt" }
}
}
])
3.3 分组字段 #
javascript
// 按多个字段分组
db.orders.aggregate([
{
$group: {
_id: {
userId: "$userId",
status: "$status"
},
count: { $sum: 1 }
}
}
])
// 按日期分组
db.orders.aggregate([
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" }
},
count: { $sum: 1 }
}
}
])
3.4 聚合操作符 #
$sum - 求和
javascript
// 计数
{ $sum: 1 }
// 求和
{ $sum: "$amount" }
// 条件求和
{ $sum: { $cond: [{ $eq: ["$status", "completed"] }, "$amount", 0] } }
$avg - 平均值
javascript
// 平均值
{ $avg: "$amount" }
$max/$min - 最大/最小值
javascript
// 最大值
{ $max: "$amount" }
// 最小值
{ $min: "$amount" }
$first/$last - 第一个/最后一个
javascript
// 第一个
{ $first: "$createdAt" }
// 最后一个
{ $last: "$createdAt" }
$push - 添加到数组
javascript
// 添加到数组
{
$group: {
_id: "$userId",
orders: { $push: "$orderId" }
}
}
$addToSet - 添加到集合(去重)
javascript
// 添加到集合
{
$group: {
_id: "$userId",
products: { $addToSet: "$productId" }
}
}
四、$project阶段 #
4.1 基本投影 #
javascript
// 包含字段
db.users.aggregate([
{
$project: {
name: 1,
email: 1
}
}
])
// 排除字段
db.users.aggregate([
{
$project: {
password: 0,
_id: 0
}
}
])
4.2 重命名字段 #
javascript
// 重命名字段
db.users.aggregate([
{
$project: {
userName: "$name",
userEmail: "$email"
}
}
])
4.3 计算字段 #
javascript
// 计算字段
db.orders.aggregate([
{
$project: {
orderId: 1,
totalAmount: { $multiply: ["$quantity", "$price"] },
discount: { $cond: [{ $gte: ["$quantity", 10] }, 0.1, 0] }
}
}
])
4.4 嵌套字段 #
javascript
// 嵌套字段投影
db.users.aggregate([
{
$project: {
name: 1,
"address.city": 1,
"address.street": 1
}
}
])
五、$sort阶段 #
5.1 基本排序 #
javascript
// 升序
db.orders.aggregate([
{ $sort: { createdAt: 1 } }
])
// 降序
db.orders.aggregate([
{ $sort: { amount: -1 } }
])
// 多字段排序
db.orders.aggregate([
{ $sort: { status: 1, amount: -1 } }
])
5.2 内存限制 #
javascript
// 排序内存限制(默认100MB)
// 超过限制需要使用allowDiskUse
db.orders.aggregate([
{ $sort: { createdAt: -1 } }
], { allowDiskUse: true })
六、$limit和$skip阶段 #
6.1 分页 #
javascript
// 分页
db.orders.aggregate([
{ $sort: { createdAt: -1 } },
{ $skip: 10 },
{ $limit: 10 }
])
6.2 性能优化 #
javascript
// $match + $limit优化
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $limit: 100 }, // 尽早限制
{ $sort: { amount: -1 } }
])
七、$unwind阶段 #
7.1 展开数组 #
javascript
// 展开数组
db.orders.aggregate([
{ $unwind: "$items" }
])
// 输出
// 原文档:{ _id: 1, items: [1, 2, 3] }
// 展开后:
// { _id: 1, items: 1 }
// { _id: 1, items: 2 }
// { _id: 1, items: 3 }
7.2 展开选项 #
javascript
// 保留空数组
db.orders.aggregate([
{
$unwind: {
path: "$items",
preserveNullAndEmptyArrays: true
}
}
])
// 包含索引
db.orders.aggregate([
{
$unwind: {
path: "$items",
includeArrayIndex: "index"
}
}
])
7.3 展开嵌套数组 #
javascript
// 展开嵌套数组
db.orders.aggregate([
{ $unwind: "$items" },
{ $unwind: "$items.tags" }
])
八、$lookup阶段 #
8.1 基本关联 #
javascript
// 关联查询
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
}
])
// 输出
{
_id: ObjectId("..."),
userId: ObjectId("..."),
amount: 100,
user: [
{ _id: ObjectId("..."), name: "John", email: "john@example.com" }
]
}
8.2 关联后处理 #
javascript
// 展开关联结果
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
},
{ $unwind: "$user" },
{
$project: {
orderId: 1,
amount: 1,
userName: "$user.name"
}
}
])
8.3 复杂关联 #
javascript
// 使用pipeline进行复杂关联
db.orders.aggregate([
{
$lookup: {
from: "users",
let: { userId: "$userId", status: "$status" },
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ["$_id", "$$userId"] },
{ $eq: ["$$status", "completed"] }
]
}
}
},
{ $project: { name: 1, email: 1 } }
],
as: "user"
}
}
])
九、$addFields阶段 #
9.1 添加字段 #
javascript
// 添加字段
db.orders.aggregate([
{
$addFields: {
totalAmount: { $multiply: ["$quantity", "$price"] },
year: { $year: "$createdAt" }
}
}
])
9.2 修改字段 #
javascript
// 修改字段
db.orders.aggregate([
{
$addFields: {
status: { $toUpper: "$status" }
}
}
])
十、其他阶段 #
10.1 $replaceRoot #
javascript
// 替换根文档
db.orders.aggregate([
{
$replaceRoot: { newRoot: "$user" }
}
])
10.2 $count #
javascript
// 统计数量
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $count: "total" }
])
10.3 $facet #
javascript
// 多管道处理
db.orders.aggregate([
{
$facet: {
byStatus: [
{ $group: { _id: "$status", count: { $sum: 1 } } }
],
byYear: [
{ $group: { _id: { $year: "$createdAt" }, count: { $sum: 1 } } }
],
topOrders: [
{ $sort: { amount: -1 } },
{ $limit: 10 }
]
}
}
])
10.4 $bucket #
javascript
// 分桶
db.users.aggregate([
{
$bucket: {
groupBy: "$age",
boundaries: [0, 18, 30, 50, 100],
default: "other",
output: {
count: { $sum: 1 },
names: { $push: "$name" }
}
}
}
])
10.5 $out #
javascript
// 输出到集合
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $out: "completed_orders" }
])
十一、聚合表达式 #
11.1 算术表达式 #
javascript
// 算术运算
{ $add: ["$price", "$tax"] }
{ $subtract: ["$price", "$discount"] }
{ $multiply: ["$quantity", "$price"] }
{ $divide: ["$total", "$count"] }
{ $mod: ["$value", 10] }
11.2 字符串表达式 #
javascript
// 字符串操作
{ $concat: ["$firstName", " ", "$lastName"] }
{ $toUpper: "$name" }
{ $toLower: "$name" }
{ $substr: ["$name", 0, 5] }
{ $split: ["$tags", ","] }
{ $trim: { input: "$name" } }
{ $strLenCP: "$name" }
11.3 日期表达式 #
javascript
// 日期操作
{ $year: "$createdAt" }
{ $month: "$createdAt" }
{ $dayOfMonth: "$createdAt" }
{ $hour: "$createdAt" }
{ $minute: "$createdAt" }
{ $second: "$createdAt" }
{ $dayOfWeek: "$createdAt" }
{ $dayOfYear: "$createdAt" }
{ $week: "$createdAt" }
11.4 条件表达式 #
javascript
// 条件表达式
{
$cond: {
if: { $gte: ["$age", 18] },
then: "adult",
else: "minor"
}
}
// switch表达式
{
$switch: {
branches: [
{ case: { $eq: ["$status", "pending"] }, then: "P" },
{ case: { $eq: ["$status", "completed"] }, then: "C" }
],
default: "U"
}
}
// ifNull
{ $ifNull: ["$nickname", "$name"] }
11.5 数组表达式 #
javascript
// 数组操作
{ $size: "$tags" }
{ $arrayElemAt: ["$tags", 0] }
{ $first: "$tags" }
{ $last: "$tags" }
{ $reverseArray: "$tags" }
{ $sortArray: { input: "$scores", sortBy: 1 } }
{ $slice: ["$tags", 0, 5] }
{ $concatArrays: ["$tags1", "$tags2"] }
{ $setUnion: ["$tags1", "$tags2"] }
{ $setIntersection: ["$tags1", "$tags2"] }
{ $setDifference: ["$tags1", "$tags2"] }
十二、聚合最佳实践 #
12.1 管道顺序优化 #
javascript
// 推荐:先过滤再处理
db.orders.aggregate([
{ $match: { status: "completed" } }, // 尽早过滤
{ $group: { _id: "$userId", total: { $sum: "$amount" } } }
])
// 不推荐:先处理再过滤
db.orders.aggregate([
{ $group: { _id: "$userId", total: { $sum: "$amount" } } },
{ $match: { total: { $gt: 100 } } }
])
12.2 使用索引 #
javascript
// $match使用索引
db.orders.createIndex({ status: 1 })
db.orders.aggregate([
{ $match: { status: "completed" } }, // 使用索引
{ $group: { _id: "$userId", count: { $sum: 1 } } }
])
12.3 内存管理 #
javascript
// 使用allowDiskUse处理大数据
db.orders.aggregate([
{ $sort: { createdAt: -1 } }
], { allowDiskUse: true })
// 使用$limit限制内存
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $limit: 10000 }, // 限制处理文档数
{ $group: { _id: "$userId", count: { $sum: 1 } } }
])
十三、总结 #
聚合阶段速查表:
| 阶段 | 说明 |
|---|---|
| $match | 过滤文档 |
| $group | 分组聚合 |
| $project | 投影字段 |
| $sort | 排序 |
| $limit | 限制数量 |
| $skip | 跳过文档 |
| $unwind | 展开数组 |
| $lookup | 关联查询 |
| $addFields | 添加字段 |
| $replaceRoot | 替换根文档 |
| $count | 统计数量 |
| $facet | 多管道处理 |
| $bucket | 分桶 |
| $out | 输出到集合 |
下一步,让我们学习索引管理!
最后更新:2026-03-27