MongoDB事务 #

一、事务概述 #

1.1 什么是事务 #

事务是一组数据库操作的逻辑单元,要么全部成功,要么全部失败。

text
事务执行流程
┌─────────────────────────────────────┐
│  开始事务                           │
│    ↓                                │
│  操作1 → 操作2 → 操作3              │
│    ↓       ↓       ↓                │
│  成功?   成功?   成功?            │
│    ↓       ↓       ↓                │
│  全部成功 → 提交事务                │
│  任一失败 → 回滚事务                │
└─────────────────────────────────────┘

1.2 ACID特性 #

特性 说明
原子性(Atomicity) 事务中的操作要么全部成功,要么全部失败
一致性(Consistency) 事务执行前后数据保持一致
隔离性(Isolation) 并发事务之间互不影响
持久性(Durability) 事务提交后数据永久保存

1.3 MongoDB事务支持 #

版本 事务支持
MongoDB 4.0 支持复制集上的多文档事务
MongoDB 4.2+ 支持分片集群上的分布式事务

二、事务基本操作 #

2.1 启动事务 #

javascript
// 方式1:使用startSession
const session = db.getMongo().startSession()
session.startTransaction()

try {
    // 执行操作
    db.orders.insertOne({ ... }, { session })
    db.inventory.updateOne({ ... }, { ... }, { session })
    
    // 提交事务
    session.commitTransaction()
} catch (error) {
    // 回滚事务
    session.abortTransaction()
} finally {
    // 结束会话
    session.endSession()
}

2.2 使用withTransaction #

javascript
// 方式2:使用withTransaction(推荐)
const session = db.getMongo().startSession()

try {
    session.withTransaction(() => {
        // 执行操作
        db.orders.insertOne({ ... }, { session })
        db.inventory.updateOne({ ... }, { ... }, { session })
    })
} finally {
    session.endSession()
}

2.3 事务选项 #

javascript
// 设置事务选项
const session = db.getMongo().startSession()

session.startTransaction({
    readConcern: { level: "snapshot" },
    writeConcern: { w: "majority" },
    readPreference: "primary"
})

try {
    db.orders.insertOne({ ... }, { session })
    session.commitTransaction()
} catch (error) {
    session.abortTransaction()
} finally {
    session.endSession()
}

三、事务示例 #

3.1 转账示例 #

javascript
// 银行转账事务
const session = db.getMongo().startSession()

try {
    session.startTransaction()
    
    // 从账户A扣款
    const fromResult = db.accounts.updateOne(
        { _id: ObjectId("..."), balance: { $gte: 100 } },
        { $inc: { balance: -100 } },
        { session }
    )
    
    if (fromResult.matchedCount === 0) {
        throw new Error("余额不足")
    }
    
    // 向账户B存款
    const toResult = db.accounts.updateOne(
        { _id: ObjectId("...") },
        { $inc: { balance: 100 } },
        { session }
    )
    
    if (toResult.matchedCount === 0) {
        throw new Error("目标账户不存在")
    }
    
    // 记录转账日志
    db.transactions.insertOne({
        from: ObjectId("..."),
        to: ObjectId("..."),
        amount: 100,
        createdAt: new Date()
    }, { session })
    
    session.commitTransaction()
    print("转账成功")
} catch (error) {
    session.abortTransaction()
    print("转账失败: " + error.message)
} finally {
    session.endSession()
}

3.2 订单创建示例 #

javascript
// 创建订单事务
const session = db.getMongo().startSession()

try {
    session.withTransaction(() => {
        // 1. 检查库存
        const product = db.products.findOne(
            { _id: ObjectId("..."), stock: { $gte: 1 } },
            { session }
        )
        
        if (!product) {
            throw new Error("库存不足")
        }
        
        // 2. 扣减库存
        db.products.updateOne(
            { _id: ObjectId("...") },
            { $inc: { stock: -1 } },
            { session }
        )
        
        // 3. 创建订单
        const order = {
            userId: ObjectId("..."),
            productId: ObjectId("..."),
            quantity: 1,
            amount: product.price,
            status: "pending",
            createdAt: new Date()
        }
        
        db.orders.insertOne(order, { session })
        
        // 4. 更新用户订单数
        db.users.updateOne(
            { _id: ObjectId("...") },
            { $inc: { orderCount: 1 } },
            { session }
        )
    })
    
    print("订单创建成功")
} catch (error) {
    print("订单创建失败: " + error.message)
} finally {
    session.endSession()
}

3.3 多文档更新示例 #

javascript
// 更新用户信息及相关数据
const session = db.getMongo().startSession()

try {
    session.withTransaction(() => {
        // 更新用户基本信息
        db.users.updateOne(
            { _id: ObjectId("...") },
            { $set: { name: "John Doe", email: "john.doe@example.com" } },
            { session }
        )
        
        // 更新用户订单中的用户名
        db.orders.updateMany(
            { userId: ObjectId("...") },
            { $set: { userName: "John Doe" } },
            { session }
        )
        
        // 更新用户评论中的用户名
        db.comments.updateMany(
            { userId: ObjectId("...") },
            { $set: { userName: "John Doe" } },
            { session }
        )
    })
    
    print("用户信息更新成功")
} catch (error) {
    print("更新失败: " + error.message)
} finally {
    session.endSession()
}

四、事务隔离级别 #

4.1 读关注级别 #

级别 说明
local 读取本地最新数据(默认)
available 读取所有可用数据
majority 读取大多数节点确认的数据
linearizable 线性化读取
snapshot 快照读取(事务中默认)

4.2 写关注级别 #

级别 说明
w: 0 不确认写入
w: 1 确认主节点写入(默认)
w: “majority” 确认大多数节点写入
w: n 确认n个节点写入

4.3 设置隔离级别 #

javascript
// 设置读关注和写关注
const session = db.getMongo().startSession()

session.startTransaction({
    readConcern: { level: "snapshot" },
    writeConcern: { w: "majority", j: true }
})

try {
    // 执行操作
    session.commitTransaction()
} catch (error) {
    session.abortTransaction()
} finally {
    session.endSession()
}

五、事务限制 #

5.1 事务大小限制 #

限制 说明
单个事务操作数 最多1000个操作
事务持续时间 默认60秒
事务大小 不超过16MB

5.2 不支持的操作 #

javascript
// 以下操作不支持事务
// 1. 创建/删除集合
db.createCollection("newCollection", { session })  // 不支持

// 2. 创建/删除索引
db.users.createIndex({ name: 1 }, { session })  // 不支持

// 3. 创建/删除数据库
db.dropDatabase({ session })  // 不支持

// 4. 非集合操作
db.adminCommand({ ... }, { session })  // 不支持

5.3 事务超时 #

javascript
// 设置事务超时时间
const session = db.getMongo().startSession()

session.startTransaction({
    maxTimeMS: 30000  // 30秒
})

try {
    // 执行操作
    session.commitTransaction()
} catch (error) {
    session.abortTransaction()
} finally {
    session.endSession()
}

六、事务监控 #

6.1 查看当前事务 #

javascript
// 查看当前活动事务
db.currentOp({ "transaction": { $exists: true } })

// 查看长时间运行的事务
db.currentOp({
    "transaction": { $exists: true },
    "secs_running": { $gt: 5 }
})

6.2 终止事务 #

javascript
// 终止事务
db.killOp(opId)

6.3 事务统计 #

javascript
// 查看服务器事务统计
db.serverStatus().transactions

// 输出示例
{
    totalStarted: 100,
    totalCommitted: 95,
    totalAborted: 5,
    currentActive: 2,
    currentInactive: 0
}

七、事务最佳实践 #

7.1 事务设计原则 #

javascript
// 1. 保持事务简短
// 推荐
session.withTransaction(() => {
    db.accounts.updateOne({ ... }, { ... }, { session })
    db.accounts.updateOne({ ... }, { ... }, { session })
})

// 不推荐:事务中包含大量操作
session.withTransaction(() => {
    // 大量操作...
})

// 2. 避免事务中执行耗时操作
// 不推荐
session.withTransaction(() => {
    db.users.find().forEach(user => {
        // 耗时操作
    })
})

// 3. 合理设置重试
const session = db.getMongo().startSession()
let retries = 3

while (retries > 0) {
    try {
        session.withTransaction(() => {
            // 执行操作
        })
        break
    } catch (error) {
        retries--
        if (retries === 0) throw error
    }
}

7.2 错误处理 #

javascript
// 完善的错误处理
const session = db.getMongo().startSession()

try {
    session.withTransaction(() => {
        try {
            // 执行操作
            db.orders.insertOne({ ... }, { session })
        } catch (innerError) {
            // 处理业务错误
            if (innerError.code === 11000) {
                throw new Error("订单已存在")
            }
            throw innerError
        }
    })
} catch (error) {
    // 处理事务错误
    if (error.errorLabels && error.errorLabels.includes("TransientTransactionError")) {
        // 临时错误,可以重试
        print("临时错误,请重试")
    } else if (error.errorLabels && error.errorLabels.includes("UnknownTransactionCommitResult")) {
        // 提交结果未知
        print("提交结果未知,请检查数据")
    } else {
        print("事务失败: " + error.message)
    }
} finally {
    session.endSession()
}

7.3 性能优化 #

javascript
// 1. 使用合适的写关注
session.startTransaction({
    writeConcern: { w: "majority" }  // 生产环境推荐
})

// 2. 减少事务中的操作数
// 3. 使用索引加速事务中的查询
db.users.createIndex({ userId: 1 })

// 4. 避免跨分片事务(分片集群)

八、分布式事务 #

8.1 分片集群事务 #

javascript
// 分片集群事务(MongoDB 4.2+)
const session = db.getMongo().startSession()

try {
    session.withTransaction(() => {
        // 跨分片操作
        db.orders.insertOne({ ... }, { session })
        db.inventory.updateOne({ ... }, { ... }, { session })
    })
} finally {
    session.endSession()
}

8.2 分片事务注意事项 #

  1. 性能影响:跨分片事务性能较低
  2. 超时设置:需要更长的超时时间
  3. 重试机制:网络问题可能导致失败
  4. 数据建模:尽量避免跨分片事务

九、总结 #

事务操作速查表:

操作 命令
启动会话 db.getMongo().startSession()
开始事务 session.startTransaction()
提交事务 session.commitTransaction()
回滚事务 session.abortTransaction()
结束会话 session.endSession()
执行事务 session.withTransaction(() => {…})

事务选项:

选项 说明
readConcern 读关注级别
writeConcern 写关注级别
readPreference 读偏好
maxTimeMS 超时时间

下一步,让我们学习复制集!

最后更新:2026-03-27