Spanner变更流 #
一、变更流概述 #
1.1 什么是变更流 #
变更流(Change Streams)是Spanner的数据变更捕获(CDC)功能,可以实时捕获表中的数据变更。
text
变更流特点:
├── 实时捕获数据变更
├── 支持INSERT/UPDATE/DELETE
├── 有序的变更记录
├── 可配置保留时间
└── 与BigQuery等集成
1.2 应用场景 #
text
变更流应用场景:
├── 数据同步: 同步到其他系统
├── 实时分析: 流式数据处理
├── 审计日志: 记录数据变更
├── 缓存更新: 自动更新缓存
└── 事件驱动: 触发业务逻辑
二、创建变更流 #
2.1 基本语法 #
sql
CREATE CHANGE STREAM stream_name
FOR table_name
[OPTIONS (retention_period = 'duration')];
2.2 创建示例 #
sql
-- 示例表
CREATE TABLE users (
user_id INT64 NOT NULL,
name STRING(100) NOT NULL,
email STRING(255),
status STRING(20)
) PRIMARY KEY (user_id);
-- 创建变更流
CREATE CHANGE STREAM users_stream
FOR users;
-- 创建带保留时间的变更流
CREATE CHANGE STREAM orders_stream
FOR orders
OPTIONS (retention_period = '7d');
2.3 监控多表 #
sql
-- 监控单个表
CREATE CHANGE STREAM users_stream FOR users;
-- 监控多个表
CREATE CHANGE STREAM all_changes_stream
FOR users, orders, products;
-- 监控所有表
CREATE CHANGE STREAM all_tables_stream FOR ALL;
2.4 监控特定列 #
sql
-- 只监控特定列的变更
CREATE CHANGE STREAM users_email_stream
FOR users(email, status);
三、读取变更流 #
3.1 使用SQL读取 #
sql
-- 读取变更流
SELECT * FROM READ_users_stream (
start_timestamp => TIMESTAMP '2024-03-27T00:00:00Z',
end_timestamp => CURRENT_TIMESTAMP(),
partition_token => 'token',
heartbeat_milliseconds => 10000
);
3.2 变更记录结构 #
sql
-- 变更记录包含以下字段:
-- change_record:
-- ├── change_type: 'INSERT', 'UPDATE', 'DELETE'
-- ├── commit_timestamp: 变更时间戳
-- ├── table_name: 表名
-- ├── column_name: 列名
-- └── value: 变更后的值
SELECT
change_type,
commit_timestamp,
table_name,
column_name,
value
FROM READ_users_stream (...);
3.3 使用客户端库读取 #
java
// Java读取变更流
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ChangeStreamResultSet;
DatabaseClient client = spanner.getDatabaseClient(databaseId);
try (ChangeStreamResultSet resultSet = client.readChangeStream(
"users_stream",
Timestamp.ofTimeMicroseconds(0),
Timestamp.now(),
Duration.ofSeconds(10))) {
for (ChangeStreamRecord record : resultSet) {
System.out.println("Change type: " + record.getChangeType());
System.out.println("Commit timestamp: " + record.getCommitTimestamp());
// 处理变更记录
}
}
python
# Python读取变更流
from google.cloud import spanner
def read_change_stream(database):
with database.snapshot() as snapshot:
results = snapshot.read_change_stream(
"users_stream",
start_timestamp=datetime.utcnow() - timedelta(hours=1),
end_timestamp=datetime.utcnow()
)
for record in results:
print(f"Change type: {record.change_type}")
print(f"Commit timestamp: {record.commit_timestamp}")
四、变更流集成 #
4.1 BigQuery集成 #
bash
# 创建BigQuery数据集
bq mk my_dataset
# 创建变更流到BigQuery的连接
gcloud spanner change-streams create-connection \
--instance=my-instance \
--database=my-database \
--change-stream=users_stream \
--destination=bigquery \
--bigquery-dataset=my_dataset \
--bigquery-table=user_changes
4.2 Dataflow集成 #
java
// Dataflow读取变更流
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
Pipeline pipeline = Pipeline.create();
pipeline.apply(
SpannerIO.readChangeStream()
.withProjectId("my-project")
.withInstanceId("my-instance")
.withDatabaseId("my-database")
.withChangeStreamName("users_stream")
.withInclusiveStartTimestamp(Timestamp.ofTimeMicroseconds(0))
)
.apply(...) // 处理变更记录
;
4.3 Pub/Sub集成 #
bash
# 创建Pub/Sub主题
gcloud pubsub topics create spanner-changes
# 配置变更流发布到Pub/Sub
gcloud spanner change-streams create-subscription \
--instance=my-instance \
--database=my-database \
--change-stream=users_stream \
--destination=pubsub \
--pubsub-topic=spanner-changes
五、变更流管理 #
5.1 查看变更流 #
sql
-- 查看所有变更流
SELECT * FROM INFORMATION_SCHEMA.CHANGE_STREAMS;
-- 查看变更流详情
SELECT
change_stream_name,
retention_period,
creation_time
FROM INFORMATION_SCHEMA.CHANGE_STREAMS
WHERE change_stream_name = 'users_stream';
5.2 修改变更流 #
sql
-- 修改保留时间
ALTER CHANGE STREAM users_stream
SET OPTIONS (retention_period = '14d');
-- 添加监控的表
ALTER CHANGE STREAM users_stream
ADD TABLE orders;
-- 移除监控的表
ALTER CHANGE STREAM users_stream
DROP TABLE orders;
5.3 删除变更流 #
sql
-- 删除变更流
DROP CHANGE STREAM users_stream;
-- 如果存在则删除
DROP CHANGE STREAM IF EXISTS users_stream;
六、变更流性能 #
6.1 性能影响 #
text
变更流性能影响:
├── 写入延迟: 轻微增加
├── 存储成本: 变更记录存储
├── 读取开销: 取决于变更频率
└── 需要合理规划保留时间
6.2 性能优化 #
text
变更流性能优化:
├── 合理设置保留时间
├── 只监控需要的表和列
├── 批量处理变更记录
├── 使用分区并行读取
└── 监控变更流延迟
6.3 监控指标 #
text
变更流监控指标:
├── change_stream_latency: 变更流延迟
├── change_stream_throughput: 吞吐量
├── change_stream_bytes: 数据量
└── change_stream_partitions: 分区数
七、变更流最佳实践 #
7.1 设计建议 #
text
变更流设计建议:
├── 只监控需要的表和列
├── 合理设置保留时间
├── 考虑变更频率
├── 规划下游处理能力
└── 设计错误处理机制
7.2 使用建议 #
text
变更流使用建议:
├── 使用分区并行读取
├── 批量处理变更记录
├── 实现幂等处理
├── 处理心跳记录
└── 监控处理延迟
7.3 错误处理 #
text
变更流错误处理:
├── 实现重试机制
├── 记录处理失败
├── 设置死信队列
├── 监控处理状态
└── 保证至少一次处理
八、总结 #
变更流优势:
| 优势 | 说明 |
|---|---|
| 实时捕获 | 实时获取数据变更 |
| 有序可靠 | 按时间戳有序 |
| 灵活配置 | 可配置监控范围 |
| 集成简单 | 与GCP服务集成 |
最佳实践:
text
1. 合理规划监控范围
└── 只监控需要的表和列
2. 设置合适的保留时间
└── 平衡成本和需求
3. 实现可靠的处理逻辑
└── 幂等处理和错误处理
4. 监控变更流状态
└── 及时发现问题
5. 与下游系统集成
└── 使用GCP集成服务
下一步,让我们学习序列!
最后更新:2026-03-27