Spanner分布式事务 #

一、事务概述 #

1.1 ACID特性 #

text
Spanner事务ACID保证
├── Atomicity (原子性)
│   └── 事务要么全部成功,要么全部失败
│
├── Consistency (一致性)
│   └── 事务执行前后数据保持一致
│
├── Isolation (隔离性)
│   └── 并发事务互不干扰
│
└── Durability (持久性)
    └── 提交后数据永久保存

1.2 事务类型 #

text
Spanner事务类型
├── 读写事务 (Read-Write Transaction)
│   ├── 支持读写操作
│   ├── 两阶段提交
│   ├── 强一致性
│   └── 可串行化隔离
│
├── 只读事务 (Read-Only Transaction)
│   ├── 只支持读取
│   ├── 快照隔离
│   ├── 可从任意副本读取
│   └── 无锁读取
│
└── 快照读 (Snapshot Read)
    ├── 单次读取
    ├── 指定时间戳
    ├── 无锁
    └── 最佳性能

1.3 隔离级别 #

隔离级别 说明
可串行化 最高隔离级别,完全隔离
外部一致性 跨事务的全局顺序保证

二、读写事务 #

2.1 基本概念 #

text
读写事务特点:
├── 支持读写操作
├── 使用两阶段提交
├── 强一致性保证
├── 可串行化隔离
└── 自动重试冲突

2.2 Java读写事务 #

java
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Key;

DatabaseClient client = spanner.getDatabaseClient(databaseId);

// 读写事务
TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
    // 读取数据
    Struct row = transaction.readRow("users", 
        Key.of(1L), Arrays.asList("balance"));
    
    long balance = row.getLong(0);
    
    // 更新数据
    transaction.buffer(Mutation.newUpdateBuilder("accounts")
        .set("user_id").to(1L)
        .set("balance").to(balance - 100)
        .build());
    
    transaction.buffer(Mutation.newUpdateBuilder("accounts")
        .set("user_id").to(2L)
        .set("balance").to(balance + 100)
        .build());
    
    return null;
});

2.3 Python读写事务 #

python
from google.cloud import spanner

def transfer_funds(database, from_user, to_user, amount):
    def transaction_func(transaction):
        # 读取余额
        from_row = list(transaction.read(
            "accounts", ["balance"],
            spanner.KeySet([spanner.Key([from_user])])
        ))[0]
        
        to_row = list(transaction.read(
            "accounts", ["balance"],
            spanner.KeySet([spanner.Key([to_user])])
        ))[0]
        
        from_balance = from_row[0]
        to_balance = to_row[0]
        
        # 检查余额
        if from_balance < amount:
            raise Exception("Insufficient balance")
        
        # 更新余额
        transaction.update(
            table="accounts",
            columns=("user_id", "balance"),
            values=[
                (from_user, from_balance - amount),
                (to_user, to_balance + amount)
            ]
        )
    
    database.run_in_transaction(transaction_func)

2.4 Go读写事务 #

go
import (
    "context"
    "cloud.google.com/go/spanner"
)

func transferFunds(ctx context.Context, client *spanner.Client, fromUser, toUser int64, amount float64) error {
    _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
        // 读取余额
        var fromBalance, toBalance float64
        
        fromRow, err := txn.ReadRow(ctx, "accounts", spanner.Key{fromUser}, []string{"balance"})
        if err != nil {
            return err
        }
        fromRow.Columns(&fromBalance)
        
        toRow, err := txn.ReadRow(ctx, "accounts", spanner.Key{toUser}, []string{"balance"})
        if err != nil {
            return err
        }
        toRow.Columns(&toBalance)
        
        // 检查余额
        if fromBalance < amount {
            return fmt.Errorf("insufficient balance")
        }
        
        // 更新余额
        mutations := []*spanner.Mutation{
            spanner.Update("accounts", []string{"user_id", "balance"}, 
                []interface{}{fromUser, fromBalance - amount}),
            spanner.Update("accounts", []string{"user_id", "balance"}, 
                []interface{}{toUser, toBalance + amount}),
        }
        
        return txn.BufferWrite(mutations)
    })
    return err
}

2.5 事务限制 #

text
读写事务限制:
├── 最大持续时间: 10分钟
├── 最大行数: 100,000行
├── 最大数据量: 100MB
├── 并发事务: 受节点数限制
└── 冲突时自动重试

三、只读事务 #

3.1 基本概念 #

text
只读事务特点:
├── 只支持读取操作
├── 快照隔离
├── 无锁读取
├── 可从任意副本读取
└── 性能优于读写事务

3.2 Java只读事务 #

java
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;

// 只读事务
try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) {
    // 多次读取看到一致的数据快照
    ResultSet rs1 = transaction.executeQuery(
        Statement.of("SELECT * FROM users WHERE user_id = 1"));
    
    ResultSet rs2 = transaction.executeQuery(
        Statement.of("SELECT * FROM orders WHERE user_id = 1"));
    
    // 处理结果...
}

3.3 Python只读事务 #

python
# 只读事务
with database.snapshot() as snapshot:
    # 多次读取看到一致的数据快照
    users = list(snapshot.execute_sql(
        "SELECT * FROM users WHERE user_id = 1"
    ))
    
    orders = list(snapshot.execute_sql(
        "SELECT * FROM orders WHERE user_id = 1"
    ))

3.4 Go只读事务 #

go
import (
    "context"
    "cloud.google.com/go/spanner"
)

func readOnlyTransaction(ctx context.Context, client *spanner.Client) error {
    ro := client.ReadOnlyTransaction()
    defer ro.Close()
    
    // 多次读取看到一致的数据快照
    iter1 := ro.Query(ctx, spanner.Statement{
        SQL: "SELECT * FROM users WHERE user_id = @id",
        Params: map[string]interface{}{"id": 1},
    })
    defer iter1.Stop()
    
    iter2 := ro.Query(ctx, spanner.Statement{
        SQL: "SELECT * FROM orders WHERE user_id = @id",
        Params: map[string]interface{}{"id": 1},
    })
    defer iter2.Stop()
    
    return nil
}

3.5 强读vs过期读 #

java
// 强读: 从Leader读取最新数据
try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) {
    // 默认是强读
}

// 过期读: 从任意副本读取,可能不是最新
try (ReadOnlyTransaction transaction = 
        client.readOnlyTransaction(TimestampBound.ofMaxStaleness(10, TimeUnit.SECONDS))) {
    // 允许最多10秒的过期
}

四、快照读 #

4.1 基本概念 #

text
快照读特点:
├── 单次读取操作
├── 指定时间戳或范围
├── 无锁
├── 最佳性能
└── 适合读取历史数据

4.2 时间戳读取 #

java
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.SpannerException;

// 读取特定时间点的数据
com.google.cloud.Timestamp timestamp = 
    com.google.cloud.Timestamp.ofTimeMicroseconds(1234567890);

try (ResultSet rs = client.singleUse(
        TimestampBound.ofReadTimestamp(timestamp))
    .executeQuery(Statement.of("SELECT * FROM users"))) {
    // 处理结果
}

4.3 SQL时间戳读取 #

sql
-- 读取特定时间点的数据
SELECT * FROM users
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-03-27T10:00:00Z';

-- 读取时间范围内的数据
SELECT * FROM users
FOR SYSTEM_TIME BETWEEN 
    TIMESTAMP '2024-03-27T09:00:00Z' AND 
    TIMESTAMP '2024-03-27T10:00:00Z';

4.4 Python快照读 #

python
from google.cloud.spanner import TimestampBound
from datetime import datetime, timedelta

# 读取10分钟前的数据
stale_time = datetime.utcnow() - timedelta(minutes=10)
snapshot = database.snapshot(
    read_timestamp_bound=TimestampBound.read_timestamp(stale_time)
)

with snapshot:
    results = snapshot.execute_sql("SELECT * FROM users")

五、事务隔离 #

5.1 可串行化隔离 #

text
可串行化隔离保证:
├── 事务完全隔离
├── 等同于串行执行
├── 无脏读、不可重复读、幻读
└── 最强隔离级别

5.2 外部一致性 #

text
外部一致性保证:
├── 事务T1在T2之前提交
├── T1的时间戳小于T2
├── 所有观察者看到相同顺序
└── 跨区域全局一致

5.3 并发控制 #

java
// Spanner使用乐观并发控制
// 冲突时自动重试

TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
    // 读取数据
    Struct row = transaction.readRow("products", 
        Key.of(1L), Arrays.asList("stock"));
    
    long stock = row.getLong(0);
    
    // 如果其他事务同时修改,会自动重试
    transaction.buffer(Mutation.newUpdateBuilder("products")
        .set("product_id").to(1L)
        .set("stock").to(stock - 1)
        .build());
    
    return null;
});

六、事务最佳实践 #

6.1 事务设计原则 #

text
事务设计原则:
├── 保持事务简短
├── 减少事务大小
├── 避免热点主键
├── 合理设置超时
└── 处理冲突重试

6.2 性能优化 #

java
// 批量操作减少事务次数
List<Mutation> mutations = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
    mutations.add(Mutation.newInsertBuilder("logs")
        .set("log_id").to((long) i)
        .set("message").to("Log " + i)
        .build());
}
client.write(mutations);  // 单次写入

6.3 错误处理 #

java
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.ErrorCode;

try {
    runner.run(transaction -> {
        // 事务操作
        return null;
    });
} catch (SpannerException e) {
    switch (e.getErrorCode()) {
        case ABORTED:
            // 事务被中止,需要重试
            // Spanner客户端通常会自动重试
            break;
        case DEADLINE_EXCEEDED:
            // 超时
            break;
        default:
            // 其他错误
    }
}

6.4 重试策略 #

java
// 设置重试策略
TransactionOptions options = TransactionOptions.newBuilder()
    .setCommitRetrySettings(
        RetrySettings.newBuilder()
            .setInitialRetryDelay(Duration.ofMillis(100))
            .setMaxRetryDelay(Duration.ofSeconds(5))
            .setRetryDelayMultiplier(2.0)
            .setMaxAttempts(5)
            .build())
    .build();

TransactionRunner runner = client.readWriteTransaction(options);

七、分布式事务场景 #

7.1 银行转账 #

java
public void transfer(DatabaseClient client, long fromId, long toId, long amount) {
    client.readWriteTransaction().run(transaction -> {
        // 读取两个账户余额
        Struct fromAccount = transaction.readRow("accounts", 
            Key.of(fromId), Arrays.asList("balance"));
        Struct toAccount = transaction.readRow("accounts", 
            Key.of(toId), Arrays.asList("balance"));
        
        long fromBalance = fromAccount.getLong(0);
        long toBalance = toAccount.getLong(0);
        
        // 检查余额
        if (fromBalance < amount) {
            throw new RuntimeException("Insufficient balance");
        }
        
        // 更新余额
        transaction.buffer(Mutation.newUpdateBuilder("accounts")
            .set("account_id").to(fromId)
            .set("balance").to(fromBalance - amount)
            .build());
        
        transaction.buffer(Mutation.newUpdateBuilder("accounts")
            .set("account_id").to(toId)
            .set("balance").to(toBalance + amount)
            .build());
        
        // 记录转账日志
        transaction.buffer(Mutation.newInsertBuilder("transfer_logs")
            .set("log_id").to(UUID.randomUUID().toString())
            .set("from_account").to(fromId)
            .set("to_account").to(toId)
            .set("amount").to(amount)
            .set("created_at").to(Value.timestamp(Timestamp.now()))
            .build());
        
        return null;
    });
}

7.2 库存扣减 #

java
public boolean deductStock(DatabaseClient client, long productId, int quantity) {
    return client.readWriteTransaction().run(transaction -> {
        // 读取库存
        Struct product = transaction.readRow("products", 
            Key.of(productId), Arrays.asList("stock", "version"));
        
        long stock = product.getLong(0);
        long version = product.getLong(1);
        
        // 检查库存
        if (stock < quantity) {
            return false;
        }
        
        // 更新库存(乐观锁)
        long newVersion = version + 1;
        transaction.buffer(Mutation.newUpdateBuilder("products")
            .set("product_id").to(productId)
            .set("stock").to(stock - quantity)
            .set("version").to(newVersion)
            .build());
        
        return true;
    });
}

八、总结 #

事务类型对比:

类型 性能 使用场景
读写事务 一般 需要原子性写入
只读事务 一致性读取
快照读 最好 历史数据读取

最佳实践:

text
1. 选择合适的事务类型
   └── 根据需求选择

2. 保持事务简短
   └── 减少冲突概率

3. 处理冲突和重试
   └── 保证最终一致性

4. 使用只读事务
   └── 提高读取性能

5. 监控事务性能
   └── 及时发现问题

下一步,让我们学习数据复制!

最后更新:2026-03-27