事务处理 #
一、事务概述 #
1.1 什么是事务 #
事务是一组操作的逻辑单元,要么全部成功,要么全部失败。
text
事务特性(ACID):
├── 原子性(Atomicity)
│ └── 事务要么全部成功,要么全部回滚
├── 一致性(Consistency)
│ └── 事务前后数据保持一致
├── 隔离性(Isolation)
│ └── 并发事务相互隔离
└── 持久性(Durability)
└── 提交后数据永久保存
1.2 Neptune事务模型 #
text
Neptune事务特性:
├── 自动提交事务
├── 快照隔离
├── 乐观并发控制
├── 支持读写事务
└── 支持只读事务
二、自动提交事务 #
2.1 Gremlin自动提交 #
gremlin
// 单个操作自动提交
g.addV('person').property('name', 'Tom')
// 多个操作在同一个遍历中
g.addV('person').property('name', 'Tom').
addV('person').property('name', 'Jerry').
addE('knows').from(V().has('name', 'Tom')).to(V().has('name', 'Jerry'))
2.2 SPARQL自动提交 #
sparql
PREFIX ex: <http://example.org/>
INSERT DATA {
ex:Tom foaf:name "Tom" .
ex:Jerry foaf:name "Jerry" .
ex:Tom foaf:knows ex:Jerry .
}
三、隔离级别 #
3.1 快照隔离 #
text
快照隔离特性:
├── 读取事务开始时的数据快照
├── 写入时不阻塞读取
├── 写入冲突检测
└── 避免脏读、不可重复读
3.2 隔离级别说明 #
text
Neptune隔离级别:
├── 读已提交:避免脏读
├── 快照隔离:避免不可重复读
└── 无幻读问题(图数据库特性)
四、并发控制 #
4.1 乐观并发控制 #
text
乐观并发控制:
├── 假设冲突很少
├── 写入时检测冲突
├── 冲突时重试或失败
└── 适合读多写少场景
4.2 冲突处理 #
gremlin
// 使用条件更新避免冲突
g.V('1').has('version', 1).
property('version', 2).
property('data', 'newData')
// 使用coalesce处理
g.V('1').as('v').
coalesce(
has('version', 1).property('version', 2),
identity()
)
4.3 重试策略 #
python
# Python重试示例
import time
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
def execute_with_retry(query, max_retries=3):
for attempt in range(max_retries):
try:
return query.next()
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
else:
raise e
五、批量事务 #
5.1 批量操作 #
gremlin
// 批量创建
g.inject([
['name': 'Tom', 'age': 30],
['name': 'Jerry', 'age': 25],
['name': 'Mike', 'age': 35]
]).unfold().as('data').
addV('person').
property('name', select('data').select('name')).
property('age', select('data').select('age'))
// 批量更新
g.V().hasLabel('person').has('status', 'pending').
property('status', 'active')
5.2 分批处理 #
python
# Python分批处理
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
connection = DriverRemoteConnection('wss://endpoint:8182/gremlin', 'g')
g = traversal().withRemote(connection)
batch_size = 100
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
for item in batch:
g.addV('item').property('name', item['name']).iterate()
六、事务最佳实践 #
6.1 事务设计 #
text
事务设计建议:
├── 保持事务简短
├── 避免长事务
├── 合理使用批量操作
├── 处理并发冲突
└── 实现重试机制
6.2 错误处理 #
python
# Python错误处理
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
try:
connection = DriverRemoteConnection('wss://endpoint:8182/gremlin', 'g')
g = traversal().withRemote(connection)
# 执行事务操作
g.addV('person').property('name', 'Tom').next()
except Exception as e:
print(f"Transaction failed: {e}")
# 处理错误
finally:
connection.close()
七、实际应用示例 #
7.1 订单处理 #
gremlin
// 创建订单事务
g.addV('order').property('orderId', 'order_001').as('order').
property('status', 'pending').
property('createdAt', datetime()).
addV('orderItem').property('productId', 'prod_001').as('item').
property('quantity', 2).
addE('contains').from('order').to('item').
addE('references').from('item').to(V().has('productId', 'prod_001'))
// 更新库存(条件更新)
g.V().has('productId', 'prod_001').as('p').
has('stock', gte(2)).
property('stock', values('stock').math('_ - 2'))
7.2 用户注册 #
gremlin
// 用户注册事务
g.V().has('email', 'tom@example.com').fold().
coalesce(
unfold(),
addV('user').
property('userId', 'user_001').
property('email', 'tom@example.com').
property('name', 'Tom').
property('createdAt', datetime()).
property('status', 'active')
)
八、总结 #
事务处理要点:
| 特性 | 说明 |
|---|---|
| ACID | 原子性、一致性、隔离性、持久性 |
| 隔离级别 | 快照隔离 |
| 并发控制 | 乐观并发控制 |
| 自动提交 | 默认行为 |
最佳实践:
- 保持事务简短
- 处理并发冲突
- 实现重试机制
- 合理使用批量操作
- 监控事务性能
下一步,让我们学习批量加载!
最后更新:2026-03-27