Spanner迁移 #

一、迁移概述 #

1.1 迁移流程 #

text
Spanner迁移流程
├── 评估阶段
│   ├── 数据量评估
│   ├── Schema评估
│   ├── 应用评估
│   └── 成本评估
│
├── 设计阶段
│   ├── Schema设计
│   ├── 数据映射
│   ├── 应用改造
│   └── 迁移方案
│
├── 迁移阶段
│   ├── Schema迁移
│   ├── 数据迁移
│   ├── 验证测试
│   └── 切换上线
│
└── 优化阶段
    ├── 性能优化
    ├── 监控配置
    └── 持续改进

1.2 迁移工具 #

text
迁移工具:
├── Spanner迁移工具
│   ├── Spanner Migration Tool (SMT)
│   ├── Dataflow
│   └── Datastream
│
├── 自定义迁移
│   ├── 客户端库
│   ├── 批量导入
│   └── 增量同步
│
└── 第三方工具
    ├── Striim
    ├── Qlik
    └── 其他ETL工具

二、MySQL迁移 #

2.1 Schema映射 #

text
MySQL到Spanner类型映射:
├── INT, INTEGER → INT64
├── BIGINT → INT64
├── FLOAT, DOUBLE → FLOAT64
├── VARCHAR, TEXT → STRING
├── BLOB → BYTES
├── DATE → DATE
├── DATETIME, TIMESTAMP → TIMESTAMP
├── JSON → JSON
└── ENUM → STRING

2.2 Schema转换 #

sql
-- MySQL Schema
CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(255),
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- Spanner Schema
CREATE TABLE users (
    id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE user_seq)),
    name STRING(100) NOT NULL,
    email STRING(255),
    created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (id);

-- 创建序列
CREATE SEQUENCE user_seq;

2.3 使用Spanner Migration Tool #

bash
# 安装Spanner Migration Tool
go install github.com/cloudspannerecosystem/spanner-migration-tool@latest

# 评估MySQL数据库
spanner-migration-tool assess \
    --source=mysql \
    --source-profile="host=localhost,user=root,passwd=password,database=mydb" \
    --target-profile="instance=my-instance"

# 生成Schema
spanner-migration-tool schema \
    --source=mysql \
    --source-profile="host=localhost,user=root,passwd=password,database=mydb" \
    --target-profile="instance=my-instance" \
    --output=schema.sql

# 迁移数据
spanner-migration-tool data \
    --source=mysql \
    --source-profile="host=localhost,user=root,passwd=password,database=mydb" \
    --target-profile="instance=my-instance,database=my-database"

2.4 使用Dataflow迁移 #

java
// Dataflow MySQL到Spanner
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;

Pipeline pipeline = Pipeline.create();

pipeline.apply(
    JdbcIO.<Row>read()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create(
                "com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost:3306/mydb"
            ).withUsername("root").withPassword("password")
        )
        .withQuery("SELECT * FROM users")
        .withRowMapper(new JdbcIO.RowMapper<Row>() { ... })
)
.apply(
    SpannerIO.write()
        .withProjectId("my-project")
        .withInstanceId("my-instance")
        .withDatabaseId("my-database")
);

pipeline.run();

三、PostgreSQL迁移 #

3.1 Schema映射 #

text
PostgreSQL到Spanner类型映射:
├── smallint, integer, bigint → INT64
├── real, double precision → FLOAT64
├── varchar, text → STRING
├── bytea → BYTES
├── date → DATE
├── timestamp, timestamptz → TIMESTAMP
├── jsonb → JSON
├── boolean → BOOL
└── serial → SEQUENCE

3.2 Schema转换 #

sql
-- PostgreSQL Schema
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    total DECIMAL(10,2),
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT NOW()
);

-- Spanner Schema (使用PostgreSQL方言)
CREATE TABLE orders (
    id BIGSERIAL PRIMARY KEY,
    user_id BIGINT NOT NULL,
    total NUMERIC(10,2),
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 或使用Google标准SQL
CREATE TABLE orders (
    id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE order_seq)),
    user_id INT64 NOT NULL,
    total FLOAT64,
    status STRING(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (id);

3.3 使用Spanner Migration Tool #

bash
# 评估PostgreSQL数据库
spanner-migration-tool assess \
    --source=postgresql \
    --source-profile="host=localhost,port=5432,user=postgres,passwd=password,database=mydb" \
    --target-profile="instance=my-instance"

# 迁移数据
spanner-migration-tool data \
    --source=postgresql \
    --source-profile="host=localhost,port=5432,user=postgres,passwd=password,database=mydb" \
    --target-profile="instance=my-instance,database=my-database"

四、数据迁移策略 #

4.1 全量迁移 #

python
# Python全量迁移
from google.cloud import spanner
import mysql.connector

def migrate_all_data():
    # 连接MySQL
    mysql_conn = mysql.connector.connect(
        host='localhost',
        user='root',
        password='password',
        database='mydb'
    )
    
    # 连接Spanner
    spanner_client = spanner.Client(project='my-project')
    instance = spanner_client.instance('my-instance')
    database = instance.database('my-database')
    
    # 读取MySQL数据
    cursor = mysql_conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM users")
    
    # 批量写入Spanner
    batch_size = 1000
    batch = []
    
    for row in cursor:
        batch.append((row['id'], row['name'], row['email']))
        
        if len(batch) >= batch_size:
            with database.batch() as db_batch:
                db_batch.insert(
                    table='users',
                    columns=('user_id', 'name', 'email'),
                    values=batch
                )
            batch = []
    
    # 写入剩余数据
    if batch:
        with database.batch() as db_batch:
            db_batch.insert(
                table='users',
                columns=('user_id', 'name', 'email'),
                values=batch
            )
    
    cursor.close()
    mysql_conn.close()

4.2 增量同步 #

python
# 使用时间戳增量同步
def incremental_sync(last_sync_time):
    mysql_conn = mysql.connector.connect(...)
    
    cursor = mysql_conn.cursor(dictionary=True)
    cursor.execute(
        "SELECT * FROM users WHERE updated_at > %s",
        (last_sync_time,)
    )
    
    # 同步到Spanner
    for row in cursor:
        # 使用InsertOrUpdate
        pass

4.3 使用Datastream #

bash
# 创建Datastream连接
gcloud datastream connection-profiles create mysql-source \
    --type=mysql \
    --mysql-host=localhost \
    --mysql-port=3306 \
    --mysql-user=root \
    --mysql-password=password

# 创建Spanner目标
gcloud datastream connection-profiles create spanner-target \
    --type=spanner \
    --spanner-instance=my-instance

# 创建流
gcloud datastream streams create mysql-to-spanner \
    --source=mysql-source \
    --destination=spanner-target

五、应用迁移 #

5.1 SQL兼容性 #

text
SQL差异注意:
├── 自增ID: 使用SEQUENCE替代
├── AUTO_INCREMENT: 使用DEFAULT + SEQUENCE
├── LIMIT/OFFSET: 支持但建议游标分页
├── JOIN: 支持但注意交错表优化
├── 存储过程: 不支持,需应用层实现
├── 触发器: 不支持,使用变更流替代
└── 外键: 支持,但注意性能影响

5.2 代码改造 #

java
// MySQL代码
String sql = "INSERT INTO users (name, email) VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
stmt.setString(1, "John");
stmt.setString(2, "john@example.com");
stmt.executeUpdate();
ResultSet rs = stmt.getGeneratedKeys();

// Spanner代码
Mutation mutation = Mutation.newInsertBuilder("users")
    .set("user_id").to(getNextId())  // 使用序列
    .set("name").to("John")
    .set("email").to("john@example.com")
    .build();
client.write(Collections.singletonList(mutation));

5.3 事务处理 #

java
// MySQL事务
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
try {
    // 操作
    conn.commit();
} catch (Exception e) {
    conn.rollback();
}

// Spanner事务
client.readWriteTransaction().run(transaction -> {
    // 操作
    return null;
});

六、迁移验证 #

6.1 数据验证 #

python
# 数据验证
def validate_data():
    # 比较行数
    mysql_count = get_mysql_count()
    spanner_count = get_spanner_count()
    
    assert mysql_count == spanner_count, "行数不一致"
    
    # 抽样验证
    sample_ids = get_sample_ids()
    for id in sample_ids:
        mysql_row = get_mysql_row(id)
        spanner_row = get_spanner_row(id)
        assert mysql_row == spanner_row, f"数据不一致: {id}"

6.2 性能验证 #

python
# 性能验证
def validate_performance():
    # 测试查询延迟
    start = time.time()
    execute_query()
    latency = time.time() - start
    
    assert latency < 0.1, f"延迟过高: {latency}s"

七、迁移最佳实践 #

7.1 迁移计划 #

text
迁移计划建议:
├── 制定详细迁移计划
├── 准备回滚方案
├── 安排维护窗口
├── 通知相关人员
└── 准备应急预案

7.2 测试验证 #

text
测试验证建议:
├── 在测试环境验证
├── 进行压力测试
├── 验证数据一致性
├── 测试应用功能
└── 验证性能指标

7.3 切换上线 #

text
切换上线建议:
├── 选择低峰期
├── 停止源库写入
├── 完成最后同步
├── 验证数据一致性
├── 切换应用连接
└── 监控运行状态

八、总结 #

迁移要点:

阶段 关键点
评估 数据量、Schema、成本
设计 Schema映射、应用改造
迁移 数据迁移、验证测试
切换 停写、同步、切换

最佳实践:

text
1. 充分评估
   └── 数据量、Schema、成本

2. 使用迁移工具
   └── SMT、Dataflow、Datastream

3. 测试验证
   └── 数据一致性、性能

4. 制定回滚方案
   └── 准备应急预案

5. 监控迁移过程
   └── 及时发现问题

下一步,让我们学习最佳实践!

最后更新:2026-03-27