Spanner数据插入 #
一、插入方式概述 #
1.1 两种插入方式 #
text
Spanner数据插入方式
├── DML语句 (INSERT)
│ ├── 标准SQL语法
│ ├── 支持事务
│ ├── 实时执行
│ └── 适合少量数据
│
└── Mutation API
├── 编程接口
├── 批量操作
├── 延迟提交
└── 适合大量数据
1.2 方式对比 #
| 特性 | DML | Mutation |
|---|---|---|
| 语法 | SQL | 编程API |
| 批量插入 | 较慢 | 快速 |
| 事务支持 | 完整 | 受限 |
| 条件插入 | 支持 | 不支持 |
| 返回值 | 影响行数 | 无 |
二、INSERT语句 #
2.1 基本语法 #
sql
INSERT INTO table_name (column1, column2, ...)
VALUES (value1, value2, ...);
2.2 示例表结构 #
sql
CREATE TABLE users (
user_id INT64 NOT NULL,
name STRING(100) NOT NULL,
email STRING(255),
age INT64,
created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (user_id);
2.3 基本插入 #
sql
-- 插入完整数据
INSERT INTO users (user_id, name, email, age)
VALUES (1, 'John Doe', 'john@example.com', 30);
-- 插入部分列(其他列使用默认值或NULL)
INSERT INTO users (user_id, name)
VALUES (2, 'Jane Doe');
-- 使用DEFAULT关键字
INSERT INTO users (user_id, name, created_at)
VALUES (3, 'Bob Smith', DEFAULT);
2.4 插入表达式 #
sql
-- 使用函数
INSERT INTO users (user_id, name, email, created_at)
VALUES (4, 'Alice', 'alice@example.com', CURRENT_TIMESTAMP());
-- 使用表达式
INSERT INTO products (product_id, name, price, discount_price)
VALUES (1, 'Product A', 100.0, 100.0 * 0.9);
-- 使用序列
INSERT INTO users (user_id, name)
VALUES (NEXTVAL('user_seq'), 'New User');
2.5 插入查询结果 #
sql
-- 从其他表复制数据
INSERT INTO users_backup (user_id, name, email)
SELECT user_id, name, email FROM users;
-- 带条件复制
INSERT INTO users_2024 (user_id, name, email)
SELECT user_id, name, email
FROM users
WHERE EXTRACT(YEAR FROM created_at) = 2024;
-- 聚合后插入
INSERT INTO user_stats (stat_date, user_count)
SELECT CURRENT_DATE(), COUNT(*) FROM users;
三、Mutation API #
3.1 Mutation概念 #
text
Mutation是Spanner的数据修改操作单元
├── Insert: 插入新行
├── Update: 更新现有行
├── InsertOrUpdate: 插入或更新
├── Replace: 替换整行
└── Delete: 删除行
3.2 Java Mutation #
java
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Value;
// 创建Insert Mutation
Mutation mutation = Mutation.newInsertBuilder("users")
.set("user_id").to(1L)
.set("name").to("John Doe")
.set("email").to("john@example.com")
.set("age").to(30)
.build();
// 执行Mutation
DatabaseClient client = spanner.getDatabaseClient(databaseId);
client.write(Collections.singletonList(mutation));
3.3 Python Mutation #
python
from google.cloud import spanner
# 获取客户端
spanner_client = spanner.Client(project_id)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
# 创建Mutation并插入
def insert_user(database, user_id, name, email, age):
with database.batch() as batch:
batch.insert(
table="users",
columns=("user_id", "name", "email", "age"),
values=[
(user_id, name, email, age)
]
)
# 调用
insert_user(database, 1, "John Doe", "john@example.com", 30)
3.4 Go Mutation #
go
import (
"context"
"cloud.google.com/go/spanner"
)
func insertUser(ctx context.Context, client *spanner.Client, userID int64, name, email string, age int64) error {
m := spanner.Insert("users",
[]string{"user_id", "name", "email", "age"},
[]interface{}{userID, name, email, age},
)
_, err := client.Apply(ctx, []*spanner.Mutation{m})
return err
}
3.5 Node.js Mutation #
javascript
const { Spanner } = require('@google-cloud/spanner');
async function insertUser(database, userId, name, email, age) {
const mutation = {
insert: {
table: 'users',
columns: ['user_id', 'name', 'email', 'age'],
values: [[userId.toString(), name, email, age.toString()]]
}
};
await database.runTransactionAsync(async (transaction) => {
await transaction.mutate(mutation);
await transaction.commit();
});
}
四、批量插入 #
4.1 DML批量插入 #
sql
-- Spanner不支持单条INSERT多行VALUES语法
-- 需要使用多条INSERT或UNION ALL
-- 方式1: 多条INSERT
INSERT INTO users (user_id, name) VALUES (1, 'User1');
INSERT INTO users (user_id, name) VALUES (2, 'User2');
INSERT INTO users (user_id, name) VALUES (3, 'User3');
-- 方式2: INSERT SELECT UNION ALL
INSERT INTO users (user_id, name)
SELECT * FROM UNNEST([
(4, 'User4'),
(5, 'User5'),
(6, 'User6')
]) AS t(user_id, name);
4.2 Mutation批量插入 #
java
// Java批量插入
List<Mutation> mutations = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Mutation mutation = Mutation.newInsertBuilder("users")
.set("user_id").to((long) i)
.set("name").to("User" + i)
.build();
mutations.add(mutation);
}
// 批量写入
DatabaseClient client = spanner.getDatabaseClient(databaseId);
client.write(mutations);
python
# Python批量插入
def batch_insert_users(database, users):
with database.batch() as batch:
batch.insert(
table="users",
columns=("user_id", "name", "email"),
values=users # 列表 of 元组
)
# 调用
users = [
(1, "User1", "user1@example.com"),
(2, "User2", "user2@example.com"),
(3, "User3", "user3@example.com"),
]
batch_insert_users(database, users)
4.3 使用Batch API #
java
// Java BatchClient
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchWriteResult;
BatchClient batchClient = spanner.getBatchClient(databaseId);
List<Mutation> mutations = new ArrayList<>();
// ... 添加大量mutations
BatchWriteResult result = batchClient.batchWrite(mutations);
System.out.println("成功插入: " + result.getSuccessfulRowCount());
4.4 批量插入限制 #
text
批量插入限制:
├── 单次Mutation数量: 无限制
├── 单次数据大小: 100MB
├── 单行大小: 10GB
└── 建议: 每批1000-10000行
五、条件插入 #
5.1 INSERT OR IGNORE #
sql
-- 如果主键已存在则忽略
INSERT INTO users (user_id, name)
SELECT 1, 'John'
WHERE NOT EXISTS (
SELECT 1 FROM users WHERE user_id = 1
);
5.2 INSERT OR UPDATE #
java
// 使用InsertOrUpdate Mutation
Mutation mutation = Mutation.newInsertOrUpdateBuilder("users")
.set("user_id").to(1L)
.set("name").to("John Doe Updated")
.set("email").to("john.updated@example.com")
.build();
5.3 REPLACE #
java
// 使用Replace Mutation(删除旧行,插入新行)
Mutation mutation = Mutation.newReplaceBuilder("users")
.set("user_id").to(1L)
.set("name").to("John Doe Replaced")
.set("email").to("john.replaced@example.com")
.build();
六、事务中的插入 #
6.1 读写事务 #
java
// Java读写事务
DatabaseClient client = spanner.getDatabaseClient(databaseId);
TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
// 插入数据
transaction.buffer(Mutation.newInsertBuilder("users")
.set("user_id").to(1L)
.set("name").to("John")
.build());
// 查询刚插入的数据
Struct row = transaction.readRow("users",
Key.of(1L), Arrays.asList("name"));
return null;
});
6.2 Python事务 #
python
# Python事务
def insert_in_transaction(database, user_id, name):
def transaction_func(transaction):
# 检查是否存在
row = transaction.read("users", ["user_id"],
spanner.KeySet([spanner.Key([user_id])]))
# 插入数据
transaction.insert(
table="users",
columns=("user_id", "name"),
values=[(user_id, name)]
)
database.run_in_transaction(transaction_func)
6.3 Go事务 #
go
// Go事务
func insertInTransaction(ctx context.Context, client *spanner.Client, userID int64, name string) error {
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// 插入数据
m := spanner.Insert("users",
[]string{"user_id", "name"},
[]interface{}{userID, name},
)
return txn.BufferWrite([]*spanner.Mutation{m})
})
return err
}
七、插入性能优化 #
7.1 批量大小选择 #
text
批量大小建议:
├── 小批量(< 100行): 延迟低,吞吐量低
├── 中批量(100-1000行): 平衡延迟和吞吐量
├── 大批量(1000-10000行): 吞吐量高,延迟高
└── 超大批量(> 10000行): 可能超时
7.2 主键设计 #
text
主键设计对插入性能影响:
├── 单调递增: 写入热点,性能差
├── 随机分布: 写入均匀,性能好
├── 位反转: 写入均匀,性能好
└── 复合主键: 根据第一列分布
7.3 并发插入 #
java
// Java并发插入
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<Void>> futures = new ArrayList<>();
for (int batch = 0; batch < 10; batch++) {
final int batchNum = batch;
futures.add(executor.submit(() -> {
List<Mutation> mutations = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
long id = batchNum * 1000L + i;
mutations.add(Mutation.newInsertBuilder("users")
.set("user_id").to(id)
.set("name").to("User" + id)
.build());
}
client.write(mutations);
return null;
}));
}
// 等待所有完成
for (Future<Void> future : futures) {
future.get();
}
7.4 使用Commit时间戳 #
sql
-- 创建表时启用提交时间戳
CREATE TABLE logs (
log_id INT64 NOT NULL,
message STRING(MAX),
commit_ts TIMESTAMP OPTIONS (allow_commit_timestamp = true)
) PRIMARY KEY (log_id);
-- 插入时使用PENDING_COMMIT_TIMESTAMP()
INSERT INTO logs (log_id, message, commit_ts)
VALUES (1, 'Test message', PENDING_COMMIT_TIMESTAMP());
八、错误处理 #
8.1 常见错误 #
java
try {
client.write(mutations);
} catch (SpannerException e) {
switch (e.getErrorCode()) {
case ALREADY_EXISTS:
// 主键冲突
System.err.println("主键已存在: " + e.getMessage());
break;
case FAILED_PRECONDITION:
// 条件不满足
System.err.println("条件不满足: " + e.getMessage());
break;
case DEADLINE_EXCEEDED:
// 超时
System.err.println("操作超时: " + e.getMessage());
break;
default:
System.err.println("其他错误: " + e.getMessage());
}
}
8.2 重试策略 #
java
// Java重试策略
import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;
RetrySettings retrySettings = RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(100))
.setMaxRetryDelay(Duration.ofSeconds(5))
.setRetryDelayMultiplier(2.0)
.setMaxAttempts(5)
.build();
SpannerOptions options = SpannerOptions.newBuilder()
.setRetrySettings(retrySettings)
.build();
九、插入最佳实践 #
9.1 选择插入方式 #
text
选择建议:
├── 少量数据(< 100行): DML
├── 中量数据(100-10000行): Mutation
├── 大量数据(> 10000行): Batch API
└── 需要条件判断: DML
9.2 事务建议 #
text
事务使用建议:
├── 减少事务大小
├── 避免长事务
├── 合理设置超时
└── 处理冲突重试
9.3 性能建议 #
text
性能优化建议:
├── 使用批量插入
├── 合理设计主键
├── 控制并发数量
├── 使用提交时间戳
└── 监控写入延迟
十、总结 #
插入方式对比:
| 方式 | 适用场景 | 性能 |
|---|---|---|
| DML INSERT | 少量数据,需要条件 | 一般 |
| Mutation | 中量数据,批量操作 | 好 |
| Batch API | 大量数据,导入场景 | 最好 |
最佳实践:
text
1. 选择合适的插入方式
└── 根据数据量和需求选择
2. 使用批量插入
└── 减少网络往返次数
3. 合理设计主键
└── 避免写入热点
4. 处理错误和重试
└── 保证数据一致性
5. 监控插入性能
└── 及时发现瓶颈
下一步,让我们学习数据更新操作!
最后更新:2026-03-27