Spanner分布式事务 #
一、事务概述 #
1.1 ACID特性 #
text
Spanner事务ACID保证
├── Atomicity (原子性)
│ └── 事务要么全部成功,要么全部失败
│
├── Consistency (一致性)
│ └── 事务执行前后数据保持一致
│
├── Isolation (隔离性)
│ └── 并发事务互不干扰
│
└── Durability (持久性)
└── 提交后数据永久保存
1.2 事务类型 #
text
Spanner事务类型
├── 读写事务 (Read-Write Transaction)
│ ├── 支持读写操作
│ ├── 两阶段提交
│ ├── 强一致性
│ └── 可串行化隔离
│
├── 只读事务 (Read-Only Transaction)
│ ├── 只支持读取
│ ├── 快照隔离
│ ├── 可从任意副本读取
│ └── 无锁读取
│
└── 快照读 (Snapshot Read)
├── 单次读取
├── 指定时间戳
├── 无锁
└── 最佳性能
1.3 隔离级别 #
| 隔离级别 | 说明 |
|---|---|
| 可串行化 | 最高隔离级别,完全隔离 |
| 外部一致性 | 跨事务的全局顺序保证 |
二、读写事务 #
2.1 基本概念 #
text
读写事务特点:
├── 支持读写操作
├── 使用两阶段提交
├── 强一致性保证
├── 可串行化隔离
└── 自动重试冲突
2.2 Java读写事务 #
java
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Key;
DatabaseClient client = spanner.getDatabaseClient(databaseId);
// 读写事务
TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
// 读取数据
Struct row = transaction.readRow("users",
Key.of(1L), Arrays.asList("balance"));
long balance = row.getLong(0);
// 更新数据
transaction.buffer(Mutation.newUpdateBuilder("accounts")
.set("user_id").to(1L)
.set("balance").to(balance - 100)
.build());
transaction.buffer(Mutation.newUpdateBuilder("accounts")
.set("user_id").to(2L)
.set("balance").to(balance + 100)
.build());
return null;
});
2.3 Python读写事务 #
python
from google.cloud import spanner
def transfer_funds(database, from_user, to_user, amount):
def transaction_func(transaction):
# 读取余额
from_row = list(transaction.read(
"accounts", ["balance"],
spanner.KeySet([spanner.Key([from_user])])
))[0]
to_row = list(transaction.read(
"accounts", ["balance"],
spanner.KeySet([spanner.Key([to_user])])
))[0]
from_balance = from_row[0]
to_balance = to_row[0]
# 检查余额
if from_balance < amount:
raise Exception("Insufficient balance")
# 更新余额
transaction.update(
table="accounts",
columns=("user_id", "balance"),
values=[
(from_user, from_balance - amount),
(to_user, to_balance + amount)
]
)
database.run_in_transaction(transaction_func)
2.4 Go读写事务 #
go
import (
"context"
"cloud.google.com/go/spanner"
)
func transferFunds(ctx context.Context, client *spanner.Client, fromUser, toUser int64, amount float64) error {
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// 读取余额
var fromBalance, toBalance float64
fromRow, err := txn.ReadRow(ctx, "accounts", spanner.Key{fromUser}, []string{"balance"})
if err != nil {
return err
}
fromRow.Columns(&fromBalance)
toRow, err := txn.ReadRow(ctx, "accounts", spanner.Key{toUser}, []string{"balance"})
if err != nil {
return err
}
toRow.Columns(&toBalance)
// 检查余额
if fromBalance < amount {
return fmt.Errorf("insufficient balance")
}
// 更新余额
mutations := []*spanner.Mutation{
spanner.Update("accounts", []string{"user_id", "balance"},
[]interface{}{fromUser, fromBalance - amount}),
spanner.Update("accounts", []string{"user_id", "balance"},
[]interface{}{toUser, toBalance + amount}),
}
return txn.BufferWrite(mutations)
})
return err
}
2.5 事务限制 #
text
读写事务限制:
├── 最大持续时间: 10分钟
├── 最大行数: 100,000行
├── 最大数据量: 100MB
├── 并发事务: 受节点数限制
└── 冲突时自动重试
三、只读事务 #
3.1 基本概念 #
text
只读事务特点:
├── 只支持读取操作
├── 快照隔离
├── 无锁读取
├── 可从任意副本读取
└── 性能优于读写事务
3.2 Java只读事务 #
java
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
// 只读事务
try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) {
// 多次读取看到一致的数据快照
ResultSet rs1 = transaction.executeQuery(
Statement.of("SELECT * FROM users WHERE user_id = 1"));
ResultSet rs2 = transaction.executeQuery(
Statement.of("SELECT * FROM orders WHERE user_id = 1"));
// 处理结果...
}
3.3 Python只读事务 #
python
# 只读事务
with database.snapshot() as snapshot:
# 多次读取看到一致的数据快照
users = list(snapshot.execute_sql(
"SELECT * FROM users WHERE user_id = 1"
))
orders = list(snapshot.execute_sql(
"SELECT * FROM orders WHERE user_id = 1"
))
3.4 Go只读事务 #
go
import (
"context"
"cloud.google.com/go/spanner"
)
func readOnlyTransaction(ctx context.Context, client *spanner.Client) error {
ro := client.ReadOnlyTransaction()
defer ro.Close()
// 多次读取看到一致的数据快照
iter1 := ro.Query(ctx, spanner.Statement{
SQL: "SELECT * FROM users WHERE user_id = @id",
Params: map[string]interface{}{"id": 1},
})
defer iter1.Stop()
iter2 := ro.Query(ctx, spanner.Statement{
SQL: "SELECT * FROM orders WHERE user_id = @id",
Params: map[string]interface{}{"id": 1},
})
defer iter2.Stop()
return nil
}
3.5 强读vs过期读 #
java
// 强读: 从Leader读取最新数据
try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) {
// 默认是强读
}
// 过期读: 从任意副本读取,可能不是最新
try (ReadOnlyTransaction transaction =
client.readOnlyTransaction(TimestampBound.ofMaxStaleness(10, TimeUnit.SECONDS))) {
// 允许最多10秒的过期
}
四、快照读 #
4.1 基本概念 #
text
快照读特点:
├── 单次读取操作
├── 指定时间戳或范围
├── 无锁
├── 最佳性能
└── 适合读取历史数据
4.2 时间戳读取 #
java
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.SpannerException;
// 读取特定时间点的数据
com.google.cloud.Timestamp timestamp =
com.google.cloud.Timestamp.ofTimeMicroseconds(1234567890);
try (ResultSet rs = client.singleUse(
TimestampBound.ofReadTimestamp(timestamp))
.executeQuery(Statement.of("SELECT * FROM users"))) {
// 处理结果
}
4.3 SQL时间戳读取 #
sql
-- 读取特定时间点的数据
SELECT * FROM users
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-03-27T10:00:00Z';
-- 读取时间范围内的数据
SELECT * FROM users
FOR SYSTEM_TIME BETWEEN
TIMESTAMP '2024-03-27T09:00:00Z' AND
TIMESTAMP '2024-03-27T10:00:00Z';
4.4 Python快照读 #
python
from google.cloud.spanner import TimestampBound
from datetime import datetime, timedelta
# 读取10分钟前的数据
stale_time = datetime.utcnow() - timedelta(minutes=10)
snapshot = database.snapshot(
read_timestamp_bound=TimestampBound.read_timestamp(stale_time)
)
with snapshot:
results = snapshot.execute_sql("SELECT * FROM users")
五、事务隔离 #
5.1 可串行化隔离 #
text
可串行化隔离保证:
├── 事务完全隔离
├── 等同于串行执行
├── 无脏读、不可重复读、幻读
└── 最强隔离级别
5.2 外部一致性 #
text
外部一致性保证:
├── 事务T1在T2之前提交
├── T1的时间戳小于T2
├── 所有观察者看到相同顺序
└── 跨区域全局一致
5.3 并发控制 #
java
// Spanner使用乐观并发控制
// 冲突时自动重试
TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
// 读取数据
Struct row = transaction.readRow("products",
Key.of(1L), Arrays.asList("stock"));
long stock = row.getLong(0);
// 如果其他事务同时修改,会自动重试
transaction.buffer(Mutation.newUpdateBuilder("products")
.set("product_id").to(1L)
.set("stock").to(stock - 1)
.build());
return null;
});
六、事务最佳实践 #
6.1 事务设计原则 #
text
事务设计原则:
├── 保持事务简短
├── 减少事务大小
├── 避免热点主键
├── 合理设置超时
└── 处理冲突重试
6.2 性能优化 #
java
// 批量操作减少事务次数
List<Mutation> mutations = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
mutations.add(Mutation.newInsertBuilder("logs")
.set("log_id").to((long) i)
.set("message").to("Log " + i)
.build());
}
client.write(mutations); // 单次写入
6.3 错误处理 #
java
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.ErrorCode;
try {
runner.run(transaction -> {
// 事务操作
return null;
});
} catch (SpannerException e) {
switch (e.getErrorCode()) {
case ABORTED:
// 事务被中止,需要重试
// Spanner客户端通常会自动重试
break;
case DEADLINE_EXCEEDED:
// 超时
break;
default:
// 其他错误
}
}
6.4 重试策略 #
java
// 设置重试策略
TransactionOptions options = TransactionOptions.newBuilder()
.setCommitRetrySettings(
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100))
.setMaxRetryDelay(Duration.ofSeconds(5))
.setRetryDelayMultiplier(2.0)
.setMaxAttempts(5)
.build())
.build();
TransactionRunner runner = client.readWriteTransaction(options);
七、分布式事务场景 #
7.1 银行转账 #
java
public void transfer(DatabaseClient client, long fromId, long toId, long amount) {
client.readWriteTransaction().run(transaction -> {
// 读取两个账户余额
Struct fromAccount = transaction.readRow("accounts",
Key.of(fromId), Arrays.asList("balance"));
Struct toAccount = transaction.readRow("accounts",
Key.of(toId), Arrays.asList("balance"));
long fromBalance = fromAccount.getLong(0);
long toBalance = toAccount.getLong(0);
// 检查余额
if (fromBalance < amount) {
throw new RuntimeException("Insufficient balance");
}
// 更新余额
transaction.buffer(Mutation.newUpdateBuilder("accounts")
.set("account_id").to(fromId)
.set("balance").to(fromBalance - amount)
.build());
transaction.buffer(Mutation.newUpdateBuilder("accounts")
.set("account_id").to(toId)
.set("balance").to(toBalance + amount)
.build());
// 记录转账日志
transaction.buffer(Mutation.newInsertBuilder("transfer_logs")
.set("log_id").to(UUID.randomUUID().toString())
.set("from_account").to(fromId)
.set("to_account").to(toId)
.set("amount").to(amount)
.set("created_at").to(Value.timestamp(Timestamp.now()))
.build());
return null;
});
}
7.2 库存扣减 #
java
public boolean deductStock(DatabaseClient client, long productId, int quantity) {
return client.readWriteTransaction().run(transaction -> {
// 读取库存
Struct product = transaction.readRow("products",
Key.of(productId), Arrays.asList("stock", "version"));
long stock = product.getLong(0);
long version = product.getLong(1);
// 检查库存
if (stock < quantity) {
return false;
}
// 更新库存(乐观锁)
long newVersion = version + 1;
transaction.buffer(Mutation.newUpdateBuilder("products")
.set("product_id").to(productId)
.set("stock").to(stock - quantity)
.set("version").to(newVersion)
.build());
return true;
});
}
八、总结 #
事务类型对比:
| 类型 | 读 | 写 | 性能 | 使用场景 |
|---|---|---|---|---|
| 读写事务 | ✓ | ✓ | 一般 | 需要原子性写入 |
| 只读事务 | ✓ | ✗ | 好 | 一致性读取 |
| 快照读 | ✓ | ✗ | 最好 | 历史数据读取 |
最佳实践:
text
1. 选择合适的事务类型
└── 根据需求选择
2. 保持事务简短
└── 减少冲突概率
3. 处理冲突和重试
└── 保证最终一致性
4. 使用只读事务
└── 提高读取性能
5. 监控事务性能
└── 及时发现问题
下一步,让我们学习数据复制!
最后更新:2026-03-27