Spanner变更流 #

一、变更流概述 #

1.1 什么是变更流 #

变更流(Change Streams)是Spanner的数据变更捕获(CDC)功能,可以实时捕获表中的数据变更。

text
变更流特点:
├── 实时捕获数据变更
├── 支持INSERT/UPDATE/DELETE
├── 有序的变更记录
├── 可配置保留时间
└── 与BigQuery等集成

1.2 应用场景 #

text
变更流应用场景:
├── 数据同步: 同步到其他系统
├── 实时分析: 流式数据处理
├── 审计日志: 记录数据变更
├── 缓存更新: 自动更新缓存
└── 事件驱动: 触发业务逻辑

二、创建变更流 #

2.1 基本语法 #

sql
CREATE CHANGE STREAM stream_name
FOR table_name
[OPTIONS (retention_period = 'duration')];

2.2 创建示例 #

sql
-- 示例表
CREATE TABLE users (
    user_id INT64 NOT NULL,
    name STRING(100) NOT NULL,
    email STRING(255),
    status STRING(20)
) PRIMARY KEY (user_id);

-- 创建变更流
CREATE CHANGE STREAM users_stream
FOR users;

-- 创建带保留时间的变更流
CREATE CHANGE STREAM orders_stream
FOR orders
OPTIONS (retention_period = '7d');

2.3 监控多表 #

sql
-- 监控单个表
CREATE CHANGE STREAM users_stream FOR users;

-- 监控多个表
CREATE CHANGE STREAM all_changes_stream
FOR users, orders, products;

-- 监控所有表
CREATE CHANGE STREAM all_tables_stream FOR ALL;

2.4 监控特定列 #

sql
-- 只监控特定列的变更
CREATE CHANGE STREAM users_email_stream
FOR users(email, status);

三、读取变更流 #

3.1 使用SQL读取 #

sql
-- 读取变更流
SELECT * FROM READ_users_stream (
    start_timestamp => TIMESTAMP '2024-03-27T00:00:00Z',
    end_timestamp => CURRENT_TIMESTAMP(),
    partition_token => 'token',
    heartbeat_milliseconds => 10000
);

3.2 变更记录结构 #

sql
-- 变更记录包含以下字段:
-- change_record:
-- ├── change_type: 'INSERT', 'UPDATE', 'DELETE'
-- ├── commit_timestamp: 变更时间戳
-- ├── table_name: 表名
-- ├── column_name: 列名
-- └── value: 变更后的值

SELECT 
    change_type,
    commit_timestamp,
    table_name,
    column_name,
    value
FROM READ_users_stream (...);

3.3 使用客户端库读取 #

java
// Java读取变更流
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ChangeStreamResultSet;

DatabaseClient client = spanner.getDatabaseClient(databaseId);

try (ChangeStreamResultSet resultSet = client.readChangeStream(
        "users_stream",
        Timestamp.ofTimeMicroseconds(0),
        Timestamp.now(),
        Duration.ofSeconds(10))) {
    
    for (ChangeStreamRecord record : resultSet) {
        System.out.println("Change type: " + record.getChangeType());
        System.out.println("Commit timestamp: " + record.getCommitTimestamp());
        // 处理变更记录
    }
}
python
# Python读取变更流
from google.cloud import spanner

def read_change_stream(database):
    with database.snapshot() as snapshot:
        results = snapshot.read_change_stream(
            "users_stream",
            start_timestamp=datetime.utcnow() - timedelta(hours=1),
            end_timestamp=datetime.utcnow()
        )
        
        for record in results:
            print(f"Change type: {record.change_type}")
            print(f"Commit timestamp: {record.commit_timestamp}")

四、变更流集成 #

4.1 BigQuery集成 #

bash
# 创建BigQuery数据集
bq mk my_dataset

# 创建变更流到BigQuery的连接
gcloud spanner change-streams create-connection \
    --instance=my-instance \
    --database=my-database \
    --change-stream=users_stream \
    --destination=bigquery \
    --bigquery-dataset=my_dataset \
    --bigquery-table=user_changes

4.2 Dataflow集成 #

java
// Dataflow读取变更流
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;

Pipeline pipeline = Pipeline.create();

pipeline.apply(
    SpannerIO.readChangeStream()
        .withProjectId("my-project")
        .withInstanceId("my-instance")
        .withDatabaseId("my-database")
        .withChangeStreamName("users_stream")
        .withInclusiveStartTimestamp(Timestamp.ofTimeMicroseconds(0))
)
.apply(...)  // 处理变更记录
;

4.3 Pub/Sub集成 #

bash
# 创建Pub/Sub主题
gcloud pubsub topics create spanner-changes

# 配置变更流发布到Pub/Sub
gcloud spanner change-streams create-subscription \
    --instance=my-instance \
    --database=my-database \
    --change-stream=users_stream \
    --destination=pubsub \
    --pubsub-topic=spanner-changes

五、变更流管理 #

5.1 查看变更流 #

sql
-- 查看所有变更流
SELECT * FROM INFORMATION_SCHEMA.CHANGE_STREAMS;

-- 查看变更流详情
SELECT 
    change_stream_name,
    retention_period,
    creation_time
FROM INFORMATION_SCHEMA.CHANGE_STREAMS
WHERE change_stream_name = 'users_stream';

5.2 修改变更流 #

sql
-- 修改保留时间
ALTER CHANGE STREAM users_stream
SET OPTIONS (retention_period = '14d');

-- 添加监控的表
ALTER CHANGE STREAM users_stream
ADD TABLE orders;

-- 移除监控的表
ALTER CHANGE STREAM users_stream
DROP TABLE orders;

5.3 删除变更流 #

sql
-- 删除变更流
DROP CHANGE STREAM users_stream;

-- 如果存在则删除
DROP CHANGE STREAM IF EXISTS users_stream;

六、变更流性能 #

6.1 性能影响 #

text
变更流性能影响:
├── 写入延迟: 轻微增加
├── 存储成本: 变更记录存储
├── 读取开销: 取决于变更频率
└── 需要合理规划保留时间

6.2 性能优化 #

text
变更流性能优化:
├── 合理设置保留时间
├── 只监控需要的表和列
├── 批量处理变更记录
├── 使用分区并行读取
└── 监控变更流延迟

6.3 监控指标 #

text
变更流监控指标:
├── change_stream_latency: 变更流延迟
├── change_stream_throughput: 吞吐量
├── change_stream_bytes: 数据量
└── change_stream_partitions: 分区数

七、变更流最佳实践 #

7.1 设计建议 #

text
变更流设计建议:
├── 只监控需要的表和列
├── 合理设置保留时间
├── 考虑变更频率
├── 规划下游处理能力
└── 设计错误处理机制

7.2 使用建议 #

text
变更流使用建议:
├── 使用分区并行读取
├── 批量处理变更记录
├── 实现幂等处理
├── 处理心跳记录
└── 监控处理延迟

7.3 错误处理 #

text
变更流错误处理:
├── 实现重试机制
├── 记录处理失败
├── 设置死信队列
├── 监控处理状态
└── 保证至少一次处理

八、总结 #

变更流优势:

优势 说明
实时捕获 实时获取数据变更
有序可靠 按时间戳有序
灵活配置 可配置监控范围
集成简单 与GCP服务集成

最佳实践:

text
1. 合理规划监控范围
   └── 只监控需要的表和列

2. 设置合适的保留时间
   └── 平衡成本和需求

3. 实现可靠的处理逻辑
   └── 幂等处理和错误处理

4. 监控变更流状态
   └── 及时发现问题

5. 与下游系统集成
   └── 使用GCP集成服务

下一步,让我们学习序列!

最后更新:2026-03-27