MongoDB事务 #
一、事务概述 #
1.1 什么是事务 #
事务是一组数据库操作的逻辑单元,要么全部成功,要么全部失败。
text
事务执行流程
┌─────────────────────────────────────┐
│ 开始事务 │
│ ↓ │
│ 操作1 → 操作2 → 操作3 │
│ ↓ ↓ ↓ │
│ 成功? 成功? 成功? │
│ ↓ ↓ ↓ │
│ 全部成功 → 提交事务 │
│ 任一失败 → 回滚事务 │
└─────────────────────────────────────┘
1.2 ACID特性 #
| 特性 | 说明 |
|---|---|
| 原子性(Atomicity) | 事务中的操作要么全部成功,要么全部失败 |
| 一致性(Consistency) | 事务执行前后数据保持一致 |
| 隔离性(Isolation) | 并发事务之间互不影响 |
| 持久性(Durability) | 事务提交后数据永久保存 |
1.3 MongoDB事务支持 #
| 版本 | 事务支持 |
|---|---|
| MongoDB 4.0 | 支持复制集上的多文档事务 |
| MongoDB 4.2+ | 支持分片集群上的分布式事务 |
二、事务基本操作 #
2.1 启动事务 #
javascript
// 方式1:使用startSession
const session = db.getMongo().startSession()
session.startTransaction()
try {
// 执行操作
db.orders.insertOne({ ... }, { session })
db.inventory.updateOne({ ... }, { ... }, { session })
// 提交事务
session.commitTransaction()
} catch (error) {
// 回滚事务
session.abortTransaction()
} finally {
// 结束会话
session.endSession()
}
2.2 使用withTransaction #
javascript
// 方式2:使用withTransaction(推荐)
const session = db.getMongo().startSession()
try {
session.withTransaction(() => {
// 执行操作
db.orders.insertOne({ ... }, { session })
db.inventory.updateOne({ ... }, { ... }, { session })
})
} finally {
session.endSession()
}
2.3 事务选项 #
javascript
// 设置事务选项
const session = db.getMongo().startSession()
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" },
readPreference: "primary"
})
try {
db.orders.insertOne({ ... }, { session })
session.commitTransaction()
} catch (error) {
session.abortTransaction()
} finally {
session.endSession()
}
三、事务示例 #
3.1 转账示例 #
javascript
// 银行转账事务
const session = db.getMongo().startSession()
try {
session.startTransaction()
// 从账户A扣款
const fromResult = db.accounts.updateOne(
{ _id: ObjectId("..."), balance: { $gte: 100 } },
{ $inc: { balance: -100 } },
{ session }
)
if (fromResult.matchedCount === 0) {
throw new Error("余额不足")
}
// 向账户B存款
const toResult = db.accounts.updateOne(
{ _id: ObjectId("...") },
{ $inc: { balance: 100 } },
{ session }
)
if (toResult.matchedCount === 0) {
throw new Error("目标账户不存在")
}
// 记录转账日志
db.transactions.insertOne({
from: ObjectId("..."),
to: ObjectId("..."),
amount: 100,
createdAt: new Date()
}, { session })
session.commitTransaction()
print("转账成功")
} catch (error) {
session.abortTransaction()
print("转账失败: " + error.message)
} finally {
session.endSession()
}
3.2 订单创建示例 #
javascript
// 创建订单事务
const session = db.getMongo().startSession()
try {
session.withTransaction(() => {
// 1. 检查库存
const product = db.products.findOne(
{ _id: ObjectId("..."), stock: { $gte: 1 } },
{ session }
)
if (!product) {
throw new Error("库存不足")
}
// 2. 扣减库存
db.products.updateOne(
{ _id: ObjectId("...") },
{ $inc: { stock: -1 } },
{ session }
)
// 3. 创建订单
const order = {
userId: ObjectId("..."),
productId: ObjectId("..."),
quantity: 1,
amount: product.price,
status: "pending",
createdAt: new Date()
}
db.orders.insertOne(order, { session })
// 4. 更新用户订单数
db.users.updateOne(
{ _id: ObjectId("...") },
{ $inc: { orderCount: 1 } },
{ session }
)
})
print("订单创建成功")
} catch (error) {
print("订单创建失败: " + error.message)
} finally {
session.endSession()
}
3.3 多文档更新示例 #
javascript
// 更新用户信息及相关数据
const session = db.getMongo().startSession()
try {
session.withTransaction(() => {
// 更新用户基本信息
db.users.updateOne(
{ _id: ObjectId("...") },
{ $set: { name: "John Doe", email: "john.doe@example.com" } },
{ session }
)
// 更新用户订单中的用户名
db.orders.updateMany(
{ userId: ObjectId("...") },
{ $set: { userName: "John Doe" } },
{ session }
)
// 更新用户评论中的用户名
db.comments.updateMany(
{ userId: ObjectId("...") },
{ $set: { userName: "John Doe" } },
{ session }
)
})
print("用户信息更新成功")
} catch (error) {
print("更新失败: " + error.message)
} finally {
session.endSession()
}
四、事务隔离级别 #
4.1 读关注级别 #
| 级别 | 说明 |
|---|---|
| local | 读取本地最新数据(默认) |
| available | 读取所有可用数据 |
| majority | 读取大多数节点确认的数据 |
| linearizable | 线性化读取 |
| snapshot | 快照读取(事务中默认) |
4.2 写关注级别 #
| 级别 | 说明 |
|---|---|
| w: 0 | 不确认写入 |
| w: 1 | 确认主节点写入(默认) |
| w: “majority” | 确认大多数节点写入 |
| w: n | 确认n个节点写入 |
4.3 设置隔离级别 #
javascript
// 设置读关注和写关注
const session = db.getMongo().startSession()
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority", j: true }
})
try {
// 执行操作
session.commitTransaction()
} catch (error) {
session.abortTransaction()
} finally {
session.endSession()
}
五、事务限制 #
5.1 事务大小限制 #
| 限制 | 说明 |
|---|---|
| 单个事务操作数 | 最多1000个操作 |
| 事务持续时间 | 默认60秒 |
| 事务大小 | 不超过16MB |
5.2 不支持的操作 #
javascript
// 以下操作不支持事务
// 1. 创建/删除集合
db.createCollection("newCollection", { session }) // 不支持
// 2. 创建/删除索引
db.users.createIndex({ name: 1 }, { session }) // 不支持
// 3. 创建/删除数据库
db.dropDatabase({ session }) // 不支持
// 4. 非集合操作
db.adminCommand({ ... }, { session }) // 不支持
5.3 事务超时 #
javascript
// 设置事务超时时间
const session = db.getMongo().startSession()
session.startTransaction({
maxTimeMS: 30000 // 30秒
})
try {
// 执行操作
session.commitTransaction()
} catch (error) {
session.abortTransaction()
} finally {
session.endSession()
}
六、事务监控 #
6.1 查看当前事务 #
javascript
// 查看当前活动事务
db.currentOp({ "transaction": { $exists: true } })
// 查看长时间运行的事务
db.currentOp({
"transaction": { $exists: true },
"secs_running": { $gt: 5 }
})
6.2 终止事务 #
javascript
// 终止事务
db.killOp(opId)
6.3 事务统计 #
javascript
// 查看服务器事务统计
db.serverStatus().transactions
// 输出示例
{
totalStarted: 100,
totalCommitted: 95,
totalAborted: 5,
currentActive: 2,
currentInactive: 0
}
七、事务最佳实践 #
7.1 事务设计原则 #
javascript
// 1. 保持事务简短
// 推荐
session.withTransaction(() => {
db.accounts.updateOne({ ... }, { ... }, { session })
db.accounts.updateOne({ ... }, { ... }, { session })
})
// 不推荐:事务中包含大量操作
session.withTransaction(() => {
// 大量操作...
})
// 2. 避免事务中执行耗时操作
// 不推荐
session.withTransaction(() => {
db.users.find().forEach(user => {
// 耗时操作
})
})
// 3. 合理设置重试
const session = db.getMongo().startSession()
let retries = 3
while (retries > 0) {
try {
session.withTransaction(() => {
// 执行操作
})
break
} catch (error) {
retries--
if (retries === 0) throw error
}
}
7.2 错误处理 #
javascript
// 完善的错误处理
const session = db.getMongo().startSession()
try {
session.withTransaction(() => {
try {
// 执行操作
db.orders.insertOne({ ... }, { session })
} catch (innerError) {
// 处理业务错误
if (innerError.code === 11000) {
throw new Error("订单已存在")
}
throw innerError
}
})
} catch (error) {
// 处理事务错误
if (error.errorLabels && error.errorLabels.includes("TransientTransactionError")) {
// 临时错误,可以重试
print("临时错误,请重试")
} else if (error.errorLabels && error.errorLabels.includes("UnknownTransactionCommitResult")) {
// 提交结果未知
print("提交结果未知,请检查数据")
} else {
print("事务失败: " + error.message)
}
} finally {
session.endSession()
}
7.3 性能优化 #
javascript
// 1. 使用合适的写关注
session.startTransaction({
writeConcern: { w: "majority" } // 生产环境推荐
})
// 2. 减少事务中的操作数
// 3. 使用索引加速事务中的查询
db.users.createIndex({ userId: 1 })
// 4. 避免跨分片事务(分片集群)
八、分布式事务 #
8.1 分片集群事务 #
javascript
// 分片集群事务(MongoDB 4.2+)
const session = db.getMongo().startSession()
try {
session.withTransaction(() => {
// 跨分片操作
db.orders.insertOne({ ... }, { session })
db.inventory.updateOne({ ... }, { ... }, { session })
})
} finally {
session.endSession()
}
8.2 分片事务注意事项 #
- 性能影响:跨分片事务性能较低
- 超时设置:需要更长的超时时间
- 重试机制:网络问题可能导致失败
- 数据建模:尽量避免跨分片事务
九、总结 #
事务操作速查表:
| 操作 | 命令 |
|---|---|
| 启动会话 | db.getMongo().startSession() |
| 开始事务 | session.startTransaction() |
| 提交事务 | session.commitTransaction() |
| 回滚事务 | session.abortTransaction() |
| 结束会话 | session.endSession() |
| 执行事务 | session.withTransaction(() => {…}) |
事务选项:
| 选项 | 说明 |
|---|---|
| readConcern | 读关注级别 |
| writeConcern | 写关注级别 |
| readPreference | 读偏好 |
| maxTimeMS | 超时时间 |
下一步,让我们学习复制集!
最后更新:2026-03-27