Spanner迁移 #
一、迁移概述 #
1.1 迁移流程 #
text
Spanner迁移流程
├── 评估阶段
│ ├── 数据量评估
│ ├── Schema评估
│ ├── 应用评估
│ └── 成本评估
│
├── 设计阶段
│ ├── Schema设计
│ ├── 数据映射
│ ├── 应用改造
│ └── 迁移方案
│
├── 迁移阶段
│ ├── Schema迁移
│ ├── 数据迁移
│ ├── 验证测试
│ └── 切换上线
│
└── 优化阶段
├── 性能优化
├── 监控配置
└── 持续改进
1.2 迁移工具 #
text
迁移工具:
├── Spanner迁移工具
│ ├── Spanner Migration Tool (SMT)
│ ├── Dataflow
│ └── Datastream
│
├── 自定义迁移
│ ├── 客户端库
│ ├── 批量导入
│ └── 增量同步
│
└── 第三方工具
├── Striim
├── Qlik
└── 其他ETL工具
二、MySQL迁移 #
2.1 Schema映射 #
text
MySQL到Spanner类型映射:
├── INT, INTEGER → INT64
├── BIGINT → INT64
├── FLOAT, DOUBLE → FLOAT64
├── VARCHAR, TEXT → STRING
├── BLOB → BYTES
├── DATE → DATE
├── DATETIME, TIMESTAMP → TIMESTAMP
├── JSON → JSON
└── ENUM → STRING
2.2 Schema转换 #
sql
-- MySQL Schema
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- Spanner Schema
CREATE TABLE users (
id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE user_seq)),
name STRING(100) NOT NULL,
email STRING(255),
created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (id);
-- 创建序列
CREATE SEQUENCE user_seq;
2.3 使用Spanner Migration Tool #
bash
# 安装Spanner Migration Tool
go install github.com/cloudspannerecosystem/spanner-migration-tool@latest
# 评估MySQL数据库
spanner-migration-tool assess \
--source=mysql \
--source-profile="host=localhost,user=root,passwd=password,database=mydb" \
--target-profile="instance=my-instance"
# 生成Schema
spanner-migration-tool schema \
--source=mysql \
--source-profile="host=localhost,user=root,passwd=password,database=mydb" \
--target-profile="instance=my-instance" \
--output=schema.sql
# 迁移数据
spanner-migration-tool data \
--source=mysql \
--source-profile="host=localhost,user=root,passwd=password,database=mydb" \
--target-profile="instance=my-instance,database=my-database"
2.4 使用Dataflow迁移 #
java
// Dataflow MySQL到Spanner
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
Pipeline pipeline = Pipeline.create();
pipeline.apply(
JdbcIO.<Row>read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/mydb"
).withUsername("root").withPassword("password")
)
.withQuery("SELECT * FROM users")
.withRowMapper(new JdbcIO.RowMapper<Row>() { ... })
)
.apply(
SpannerIO.write()
.withProjectId("my-project")
.withInstanceId("my-instance")
.withDatabaseId("my-database")
);
pipeline.run();
三、PostgreSQL迁移 #
3.1 Schema映射 #
text
PostgreSQL到Spanner类型映射:
├── smallint, integer, bigint → INT64
├── real, double precision → FLOAT64
├── varchar, text → STRING
├── bytea → BYTES
├── date → DATE
├── timestamp, timestamptz → TIMESTAMP
├── jsonb → JSON
├── boolean → BOOL
└── serial → SEQUENCE
3.2 Schema转换 #
sql
-- PostgreSQL Schema
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
total DECIMAL(10,2),
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW()
);
-- Spanner Schema (使用PostgreSQL方言)
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
total NUMERIC(10,2),
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 或使用Google标准SQL
CREATE TABLE orders (
id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE order_seq)),
user_id INT64 NOT NULL,
total FLOAT64,
status STRING(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (id);
3.3 使用Spanner Migration Tool #
bash
# 评估PostgreSQL数据库
spanner-migration-tool assess \
--source=postgresql \
--source-profile="host=localhost,port=5432,user=postgres,passwd=password,database=mydb" \
--target-profile="instance=my-instance"
# 迁移数据
spanner-migration-tool data \
--source=postgresql \
--source-profile="host=localhost,port=5432,user=postgres,passwd=password,database=mydb" \
--target-profile="instance=my-instance,database=my-database"
四、数据迁移策略 #
4.1 全量迁移 #
python
# Python全量迁移
from google.cloud import spanner
import mysql.connector
def migrate_all_data():
# 连接MySQL
mysql_conn = mysql.connector.connect(
host='localhost',
user='root',
password='password',
database='mydb'
)
# 连接Spanner
spanner_client = spanner.Client(project='my-project')
instance = spanner_client.instance('my-instance')
database = instance.database('my-database')
# 读取MySQL数据
cursor = mysql_conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users")
# 批量写入Spanner
batch_size = 1000
batch = []
for row in cursor:
batch.append((row['id'], row['name'], row['email']))
if len(batch) >= batch_size:
with database.batch() as db_batch:
db_batch.insert(
table='users',
columns=('user_id', 'name', 'email'),
values=batch
)
batch = []
# 写入剩余数据
if batch:
with database.batch() as db_batch:
db_batch.insert(
table='users',
columns=('user_id', 'name', 'email'),
values=batch
)
cursor.close()
mysql_conn.close()
4.2 增量同步 #
python
# 使用时间戳增量同步
def incremental_sync(last_sync_time):
mysql_conn = mysql.connector.connect(...)
cursor = mysql_conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM users WHERE updated_at > %s",
(last_sync_time,)
)
# 同步到Spanner
for row in cursor:
# 使用InsertOrUpdate
pass
4.3 使用Datastream #
bash
# 创建Datastream连接
gcloud datastream connection-profiles create mysql-source \
--type=mysql \
--mysql-host=localhost \
--mysql-port=3306 \
--mysql-user=root \
--mysql-password=password
# 创建Spanner目标
gcloud datastream connection-profiles create spanner-target \
--type=spanner \
--spanner-instance=my-instance
# 创建流
gcloud datastream streams create mysql-to-spanner \
--source=mysql-source \
--destination=spanner-target
五、应用迁移 #
5.1 SQL兼容性 #
text
SQL差异注意:
├── 自增ID: 使用SEQUENCE替代
├── AUTO_INCREMENT: 使用DEFAULT + SEQUENCE
├── LIMIT/OFFSET: 支持但建议游标分页
├── JOIN: 支持但注意交错表优化
├── 存储过程: 不支持,需应用层实现
├── 触发器: 不支持,使用变更流替代
└── 外键: 支持,但注意性能影响
5.2 代码改造 #
java
// MySQL代码
String sql = "INSERT INTO users (name, email) VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
stmt.setString(1, "John");
stmt.setString(2, "john@example.com");
stmt.executeUpdate();
ResultSet rs = stmt.getGeneratedKeys();
// Spanner代码
Mutation mutation = Mutation.newInsertBuilder("users")
.set("user_id").to(getNextId()) // 使用序列
.set("name").to("John")
.set("email").to("john@example.com")
.build();
client.write(Collections.singletonList(mutation));
5.3 事务处理 #
java
// MySQL事务
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
try {
// 操作
conn.commit();
} catch (Exception e) {
conn.rollback();
}
// Spanner事务
client.readWriteTransaction().run(transaction -> {
// 操作
return null;
});
六、迁移验证 #
6.1 数据验证 #
python
# 数据验证
def validate_data():
# 比较行数
mysql_count = get_mysql_count()
spanner_count = get_spanner_count()
assert mysql_count == spanner_count, "行数不一致"
# 抽样验证
sample_ids = get_sample_ids()
for id in sample_ids:
mysql_row = get_mysql_row(id)
spanner_row = get_spanner_row(id)
assert mysql_row == spanner_row, f"数据不一致: {id}"
6.2 性能验证 #
python
# 性能验证
def validate_performance():
# 测试查询延迟
start = time.time()
execute_query()
latency = time.time() - start
assert latency < 0.1, f"延迟过高: {latency}s"
七、迁移最佳实践 #
7.1 迁移计划 #
text
迁移计划建议:
├── 制定详细迁移计划
├── 准备回滚方案
├── 安排维护窗口
├── 通知相关人员
└── 准备应急预案
7.2 测试验证 #
text
测试验证建议:
├── 在测试环境验证
├── 进行压力测试
├── 验证数据一致性
├── 测试应用功能
└── 验证性能指标
7.3 切换上线 #
text
切换上线建议:
├── 选择低峰期
├── 停止源库写入
├── 完成最后同步
├── 验证数据一致性
├── 切换应用连接
└── 监控运行状态
八、总结 #
迁移要点:
| 阶段 | 关键点 |
|---|---|
| 评估 | 数据量、Schema、成本 |
| 设计 | Schema映射、应用改造 |
| 迁移 | 数据迁移、验证测试 |
| 切换 | 停写、同步、切换 |
最佳实践:
text
1. 充分评估
└── 数据量、Schema、成本
2. 使用迁移工具
└── SMT、Dataflow、Datastream
3. 测试验证
└── 数据一致性、性能
4. 制定回滚方案
└── 准备应急预案
5. 监控迁移过程
└── 及时发现问题
下一步,让我们学习最佳实践!
最后更新:2026-03-27