事务处理 #

一、事务概述 #

1.1 什么是事务 #

事务是一组操作的逻辑单元,要么全部成功,要么全部失败。

text
事务特性(ACID):
├── 原子性(Atomicity)
│   └── 事务要么全部成功,要么全部回滚
├── 一致性(Consistency)
│   └── 事务前后数据保持一致
├── 隔离性(Isolation)
│   └── 并发事务相互隔离
└── 持久性(Durability)
    └── 提交后数据永久保存

1.2 Neptune事务模型 #

text
Neptune事务特性:
├── 自动提交事务
├── 快照隔离
├── 乐观并发控制
├── 支持读写事务
└── 支持只读事务

二、自动提交事务 #

2.1 Gremlin自动提交 #

gremlin
// 单个操作自动提交
g.addV('person').property('name', 'Tom')

// 多个操作在同一个遍历中
g.addV('person').property('name', 'Tom').
  addV('person').property('name', 'Jerry').
  addE('knows').from(V().has('name', 'Tom')).to(V().has('name', 'Jerry'))

2.2 SPARQL自动提交 #

sparql
PREFIX ex: <http://example.org/>

INSERT DATA {
  ex:Tom foaf:name "Tom" .
  ex:Jerry foaf:name "Jerry" .
  ex:Tom foaf:knows ex:Jerry .
}

三、隔离级别 #

3.1 快照隔离 #

text
快照隔离特性:
├── 读取事务开始时的数据快照
├── 写入时不阻塞读取
├── 写入冲突检测
└── 避免脏读、不可重复读

3.2 隔离级别说明 #

text
Neptune隔离级别:
├── 读已提交:避免脏读
├── 快照隔离:避免不可重复读
└── 无幻读问题(图数据库特性)

四、并发控制 #

4.1 乐观并发控制 #

text
乐观并发控制:
├── 假设冲突很少
├── 写入时检测冲突
├── 冲突时重试或失败
└── 适合读多写少场景

4.2 冲突处理 #

gremlin
// 使用条件更新避免冲突
g.V('1').has('version', 1).
  property('version', 2).
  property('data', 'newData')

// 使用coalesce处理
g.V('1').as('v').
  coalesce(
    has('version', 1).property('version', 2),
    identity()
  )

4.3 重试策略 #

python
# Python重试示例
import time
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

def execute_with_retry(query, max_retries=3):
    for attempt in range(max_retries):
        try:
            return query.next()
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数退避
            else:
                raise e

五、批量事务 #

5.1 批量操作 #

gremlin
// 批量创建
g.inject([
  ['name': 'Tom', 'age': 30],
  ['name': 'Jerry', 'age': 25],
  ['name': 'Mike', 'age': 35]
]).unfold().as('data').
  addV('person').
  property('name', select('data').select('name')).
  property('age', select('data').select('age'))

// 批量更新
g.V().hasLabel('person').has('status', 'pending').
  property('status', 'active')

5.2 分批处理 #

python
# Python分批处理
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

connection = DriverRemoteConnection('wss://endpoint:8182/gremlin', 'g')
g = traversal().withRemote(connection)

batch_size = 100
for i in range(0, len(data), batch_size):
    batch = data[i:i+batch_size]
    for item in batch:
        g.addV('item').property('name', item['name']).iterate()

六、事务最佳实践 #

6.1 事务设计 #

text
事务设计建议:
├── 保持事务简短
├── 避免长事务
├── 合理使用批量操作
├── 处理并发冲突
└── 实现重试机制

6.2 错误处理 #

python
# Python错误处理
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal

try:
    connection = DriverRemoteConnection('wss://endpoint:8182/gremlin', 'g')
    g = traversal().withRemote(connection)
    
    # 执行事务操作
    g.addV('person').property('name', 'Tom').next()
    
except Exception as e:
    print(f"Transaction failed: {e}")
    # 处理错误
finally:
    connection.close()

七、实际应用示例 #

7.1 订单处理 #

gremlin
// 创建订单事务
g.addV('order').property('orderId', 'order_001').as('order').
  property('status', 'pending').
  property('createdAt', datetime()).
  addV('orderItem').property('productId', 'prod_001').as('item').
  property('quantity', 2).
  addE('contains').from('order').to('item').
  addE('references').from('item').to(V().has('productId', 'prod_001'))

// 更新库存(条件更新)
g.V().has('productId', 'prod_001').as('p').
  has('stock', gte(2)).
  property('stock', values('stock').math('_ - 2'))

7.2 用户注册 #

gremlin
// 用户注册事务
g.V().has('email', 'tom@example.com').fold().
  coalesce(
    unfold(),
    addV('user').
      property('userId', 'user_001').
      property('email', 'tom@example.com').
      property('name', 'Tom').
      property('createdAt', datetime()).
      property('status', 'active')
  )

八、总结 #

事务处理要点:

特性 说明
ACID 原子性、一致性、隔离性、持久性
隔离级别 快照隔离
并发控制 乐观并发控制
自动提交 默认行为

最佳实践:

  1. 保持事务简短
  2. 处理并发冲突
  3. 实现重试机制
  4. 合理使用批量操作
  5. 监控事务性能

下一步,让我们学习批量加载!

最后更新:2026-03-27