MongoDB聚合管道 #

一、聚合概述 #

1.1 什么是聚合管道 #

聚合管道(Aggregation Pipeline)是MongoDB强大的数据处理框架,通过一系列阶段(Stage)处理文档。

text
聚合管道流程
┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│  文档   │ -> │ Stage 1 │ -> │ Stage 2 │ -> │  结果   │
└─────────┘    └─────────┘    └─────────┘    └─────────┘

1.2 基本语法 #

javascript
db.collection.aggregate([
    { stage1 },
    { stage2 },
    { stage3 }
])

1.3 常用阶段 #

阶段 说明
$match 过滤文档
$group 分组聚合
$project 投影字段
$sort 排序
$limit 限制数量
$skip 跳过文档
$unwind 展开数组
$lookup 关联查询
$addFields 添加字段

二、$match阶段 #

2.1 基本过滤 #

javascript
// 过滤文档
db.orders.aggregate([
    { $match: { status: "completed" } }
])

// 多条件过滤
db.orders.aggregate([
    {
        $match: {
            status: "completed",
            amount: { $gt: 100 }
        }
    }
])

2.2 日期过滤 #

javascript
// 日期范围过滤
db.orders.aggregate([
    {
        $match: {
            createdAt: {
                $gte: ISODate("2024-01-01"),
                $lt: ISODate("2025-01-01")
            }
        }
    }
])

2.3 使用索引 #

javascript
// $match放在管道开头可以使用索引
db.orders.aggregate([
    { $match: { status: "completed" } },  // 可以使用索引
    { $group: { _id: "$userId", total: { $sum: "$amount" } } }
])

三、$group阶段 #

3.1 基本分组 #

javascript
// 按字段分组
db.orders.aggregate([
    {
        $group: {
            _id: "$status",
            count: { $sum: 1 }
        }
    }
])

// 输出
[
    { "_id": "completed", "count": 100 },
    { "_id": "pending", "count": 50 },
    { "_id": "cancelled", "count": 20 }
]

3.2 分组聚合 #

javascript
// 多种聚合操作
db.orders.aggregate([
    {
        $group: {
            _id: "$userId",
            totalOrders: { $sum: 1 },
            totalAmount: { $sum: "$amount" },
            avgAmount: { $avg: "$amount" },
            maxAmount: { $max: "$amount" },
            minAmount: { $min: "$amount" },
            firstOrder: { $first: "$createdAt" },
            lastOrder: { $last: "$createdAt" }
        }
    }
])

3.3 分组字段 #

javascript
// 按多个字段分组
db.orders.aggregate([
    {
        $group: {
            _id: {
                userId: "$userId",
                status: "$status"
            },
            count: { $sum: 1 }
        }
    }
])

// 按日期分组
db.orders.aggregate([
    {
        $group: {
            _id: {
                year: { $year: "$createdAt" },
                month: { $month: "$createdAt" },
                day: { $dayOfMonth: "$createdAt" }
            },
            count: { $sum: 1 }
        }
    }
])

3.4 聚合操作符 #

$sum - 求和

javascript
// 计数
{ $sum: 1 }

// 求和
{ $sum: "$amount" }

// 条件求和
{ $sum: { $cond: [{ $eq: ["$status", "completed"] }, "$amount", 0] } }

$avg - 平均值

javascript
// 平均值
{ $avg: "$amount" }

$max/$min - 最大/最小值

javascript
// 最大值
{ $max: "$amount" }

// 最小值
{ $min: "$amount" }

$first/$last - 第一个/最后一个

javascript
// 第一个
{ $first: "$createdAt" }

// 最后一个
{ $last: "$createdAt" }

$push - 添加到数组

javascript
// 添加到数组
{
    $group: {
        _id: "$userId",
        orders: { $push: "$orderId" }
    }
}

$addToSet - 添加到集合(去重)

javascript
// 添加到集合
{
    $group: {
        _id: "$userId",
        products: { $addToSet: "$productId" }
    }
}

四、$project阶段 #

4.1 基本投影 #

javascript
// 包含字段
db.users.aggregate([
    {
        $project: {
            name: 1,
            email: 1
        }
    }
])

// 排除字段
db.users.aggregate([
    {
        $project: {
            password: 0,
            _id: 0
        }
    }
])

4.2 重命名字段 #

javascript
// 重命名字段
db.users.aggregate([
    {
        $project: {
            userName: "$name",
            userEmail: "$email"
        }
    }
])

4.3 计算字段 #

javascript
// 计算字段
db.orders.aggregate([
    {
        $project: {
            orderId: 1,
            totalAmount: { $multiply: ["$quantity", "$price"] },
            discount: { $cond: [{ $gte: ["$quantity", 10] }, 0.1, 0] }
        }
    }
])

4.4 嵌套字段 #

javascript
// 嵌套字段投影
db.users.aggregate([
    {
        $project: {
            name: 1,
            "address.city": 1,
            "address.street": 1
        }
    }
])

五、$sort阶段 #

5.1 基本排序 #

javascript
// 升序
db.orders.aggregate([
    { $sort: { createdAt: 1 } }
])

// 降序
db.orders.aggregate([
    { $sort: { amount: -1 } }
])

// 多字段排序
db.orders.aggregate([
    { $sort: { status: 1, amount: -1 } }
])

5.2 内存限制 #

javascript
// 排序内存限制(默认100MB)
// 超过限制需要使用allowDiskUse
db.orders.aggregate([
    { $sort: { createdAt: -1 } }
], { allowDiskUse: true })

六、$limit和$skip阶段 #

6.1 分页 #

javascript
// 分页
db.orders.aggregate([
    { $sort: { createdAt: -1 } },
    { $skip: 10 },
    { $limit: 10 }
])

6.2 性能优化 #

javascript
// $match + $limit优化
db.orders.aggregate([
    { $match: { status: "completed" } },
    { $limit: 100 },  // 尽早限制
    { $sort: { amount: -1 } }
])

七、$unwind阶段 #

7.1 展开数组 #

javascript
// 展开数组
db.orders.aggregate([
    { $unwind: "$items" }
])

// 输出
// 原文档:{ _id: 1, items: [1, 2, 3] }
// 展开后:
// { _id: 1, items: 1 }
// { _id: 1, items: 2 }
// { _id: 1, items: 3 }

7.2 展开选项 #

javascript
// 保留空数组
db.orders.aggregate([
    {
        $unwind: {
            path: "$items",
            preserveNullAndEmptyArrays: true
        }
    }
])

// 包含索引
db.orders.aggregate([
    {
        $unwind: {
            path: "$items",
            includeArrayIndex: "index"
        }
    }
])

7.3 展开嵌套数组 #

javascript
// 展开嵌套数组
db.orders.aggregate([
    { $unwind: "$items" },
    { $unwind: "$items.tags" }
])

八、$lookup阶段 #

8.1 基本关联 #

javascript
// 关联查询
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            localField: "userId",
            foreignField: "_id",
            as: "user"
        }
    }
])

// 输出
{
    _id: ObjectId("..."),
    userId: ObjectId("..."),
    amount: 100,
    user: [
        { _id: ObjectId("..."), name: "John", email: "john@example.com" }
    ]
}

8.2 关联后处理 #

javascript
// 展开关联结果
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            localField: "userId",
            foreignField: "_id",
            as: "user"
        }
    },
    { $unwind: "$user" },
    {
        $project: {
            orderId: 1,
            amount: 1,
            userName: "$user.name"
        }
    }
])

8.3 复杂关联 #

javascript
// 使用pipeline进行复杂关联
db.orders.aggregate([
    {
        $lookup: {
            from: "users",
            let: { userId: "$userId", status: "$status" },
            pipeline: [
                {
                    $match: {
                        $expr: {
                            $and: [
                                { $eq: ["$_id", "$$userId"] },
                                { $eq: ["$$status", "completed"] }
                            ]
                        }
                    }
                },
                { $project: { name: 1, email: 1 } }
            ],
            as: "user"
        }
    }
])

九、$addFields阶段 #

9.1 添加字段 #

javascript
// 添加字段
db.orders.aggregate([
    {
        $addFields: {
            totalAmount: { $multiply: ["$quantity", "$price"] },
            year: { $year: "$createdAt" }
        }
    }
])

9.2 修改字段 #

javascript
// 修改字段
db.orders.aggregate([
    {
        $addFields: {
            status: { $toUpper: "$status" }
        }
    }
])

十、其他阶段 #

10.1 $replaceRoot #

javascript
// 替换根文档
db.orders.aggregate([
    {
        $replaceRoot: { newRoot: "$user" }
    }
])

10.2 $count #

javascript
// 统计数量
db.orders.aggregate([
    { $match: { status: "completed" } },
    { $count: "total" }
])

10.3 $facet #

javascript
// 多管道处理
db.orders.aggregate([
    {
        $facet: {
            byStatus: [
                { $group: { _id: "$status", count: { $sum: 1 } } }
            ],
            byYear: [
                { $group: { _id: { $year: "$createdAt" }, count: { $sum: 1 } } }
            ],
            topOrders: [
                { $sort: { amount: -1 } },
                { $limit: 10 }
            ]
        }
    }
])

10.4 $bucket #

javascript
// 分桶
db.users.aggregate([
    {
        $bucket: {
            groupBy: "$age",
            boundaries: [0, 18, 30, 50, 100],
            default: "other",
            output: {
                count: { $sum: 1 },
                names: { $push: "$name" }
            }
        }
    }
])

10.5 $out #

javascript
// 输出到集合
db.orders.aggregate([
    { $match: { status: "completed" } },
    { $out: "completed_orders" }
])

十一、聚合表达式 #

11.1 算术表达式 #

javascript
// 算术运算
{ $add: ["$price", "$tax"] }
{ $subtract: ["$price", "$discount"] }
{ $multiply: ["$quantity", "$price"] }
{ $divide: ["$total", "$count"] }
{ $mod: ["$value", 10] }

11.2 字符串表达式 #

javascript
// 字符串操作
{ $concat: ["$firstName", " ", "$lastName"] }
{ $toUpper: "$name" }
{ $toLower: "$name" }
{ $substr: ["$name", 0, 5] }
{ $split: ["$tags", ","] }
{ $trim: { input: "$name" } }
{ $strLenCP: "$name" }

11.3 日期表达式 #

javascript
// 日期操作
{ $year: "$createdAt" }
{ $month: "$createdAt" }
{ $dayOfMonth: "$createdAt" }
{ $hour: "$createdAt" }
{ $minute: "$createdAt" }
{ $second: "$createdAt" }
{ $dayOfWeek: "$createdAt" }
{ $dayOfYear: "$createdAt" }
{ $week: "$createdAt" }

11.4 条件表达式 #

javascript
// 条件表达式
{
    $cond: {
        if: { $gte: ["$age", 18] },
        then: "adult",
        else: "minor"
    }
}

// switch表达式
{
    $switch: {
        branches: [
            { case: { $eq: ["$status", "pending"] }, then: "P" },
            { case: { $eq: ["$status", "completed"] }, then: "C" }
        ],
        default: "U"
    }
}

// ifNull
{ $ifNull: ["$nickname", "$name"] }

11.5 数组表达式 #

javascript
// 数组操作
{ $size: "$tags" }
{ $arrayElemAt: ["$tags", 0] }
{ $first: "$tags" }
{ $last: "$tags" }
{ $reverseArray: "$tags" }
{ $sortArray: { input: "$scores", sortBy: 1 } }
{ $slice: ["$tags", 0, 5] }
{ $concatArrays: ["$tags1", "$tags2"] }
{ $setUnion: ["$tags1", "$tags2"] }
{ $setIntersection: ["$tags1", "$tags2"] }
{ $setDifference: ["$tags1", "$tags2"] }

十二、聚合最佳实践 #

12.1 管道顺序优化 #

javascript
// 推荐:先过滤再处理
db.orders.aggregate([
    { $match: { status: "completed" } },  // 尽早过滤
    { $group: { _id: "$userId", total: { $sum: "$amount" } } }
])

// 不推荐:先处理再过滤
db.orders.aggregate([
    { $group: { _id: "$userId", total: { $sum: "$amount" } } },
    { $match: { total: { $gt: 100 } } }
])

12.2 使用索引 #

javascript
// $match使用索引
db.orders.createIndex({ status: 1 })
db.orders.aggregate([
    { $match: { status: "completed" } },  // 使用索引
    { $group: { _id: "$userId", count: { $sum: 1 } } }
])

12.3 内存管理 #

javascript
// 使用allowDiskUse处理大数据
db.orders.aggregate([
    { $sort: { createdAt: -1 } }
], { allowDiskUse: true })

// 使用$limit限制内存
db.orders.aggregate([
    { $match: { status: "completed" } },
    { $limit: 10000 },  // 限制处理文档数
    { $group: { _id: "$userId", count: { $sum: 1 } } }
])

十三、总结 #

聚合阶段速查表:

阶段 说明
$match 过滤文档
$group 分组聚合
$project 投影字段
$sort 排序
$limit 限制数量
$skip 跳过文档
$unwind 展开数组
$lookup 关联查询
$addFields 添加字段
$replaceRoot 替换根文档
$count 统计数量
$facet 多管道处理
$bucket 分桶
$out 输出到集合

下一步,让我们学习索引管理!

最后更新:2026-03-27