Spanner数据插入 #

一、插入方式概述 #

1.1 两种插入方式 #

text
Spanner数据插入方式
├── DML语句 (INSERT)
│   ├── 标准SQL语法
│   ├── 支持事务
│   ├── 实时执行
│   └── 适合少量数据
│
└── Mutation API
    ├── 编程接口
    ├── 批量操作
    ├── 延迟提交
    └── 适合大量数据

1.2 方式对比 #

特性 DML Mutation
语法 SQL 编程API
批量插入 较慢 快速
事务支持 完整 受限
条件插入 支持 不支持
返回值 影响行数

二、INSERT语句 #

2.1 基本语法 #

sql
INSERT INTO table_name (column1, column2, ...)
VALUES (value1, value2, ...);

2.2 示例表结构 #

sql
CREATE TABLE users (
    user_id INT64 NOT NULL,
    name STRING(100) NOT NULL,
    email STRING(255),
    age INT64,
    created_at TIMESTAMP DEFAULT (CURRENT_TIMESTAMP())
) PRIMARY KEY (user_id);

2.3 基本插入 #

sql
-- 插入完整数据
INSERT INTO users (user_id, name, email, age)
VALUES (1, 'John Doe', 'john@example.com', 30);

-- 插入部分列(其他列使用默认值或NULL)
INSERT INTO users (user_id, name)
VALUES (2, 'Jane Doe');

-- 使用DEFAULT关键字
INSERT INTO users (user_id, name, created_at)
VALUES (3, 'Bob Smith', DEFAULT);

2.4 插入表达式 #

sql
-- 使用函数
INSERT INTO users (user_id, name, email, created_at)
VALUES (4, 'Alice', 'alice@example.com', CURRENT_TIMESTAMP());

-- 使用表达式
INSERT INTO products (product_id, name, price, discount_price)
VALUES (1, 'Product A', 100.0, 100.0 * 0.9);

-- 使用序列
INSERT INTO users (user_id, name)
VALUES (NEXTVAL('user_seq'), 'New User');

2.5 插入查询结果 #

sql
-- 从其他表复制数据
INSERT INTO users_backup (user_id, name, email)
SELECT user_id, name, email FROM users;

-- 带条件复制
INSERT INTO users_2024 (user_id, name, email)
SELECT user_id, name, email 
FROM users 
WHERE EXTRACT(YEAR FROM created_at) = 2024;

-- 聚合后插入
INSERT INTO user_stats (stat_date, user_count)
SELECT CURRENT_DATE(), COUNT(*) FROM users;

三、Mutation API #

3.1 Mutation概念 #

text
Mutation是Spanner的数据修改操作单元
├── Insert: 插入新行
├── Update: 更新现有行
├── InsertOrUpdate: 插入或更新
├── Replace: 替换整行
└── Delete: 删除行

3.2 Java Mutation #

java
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Value;

// 创建Insert Mutation
Mutation mutation = Mutation.newInsertBuilder("users")
    .set("user_id").to(1L)
    .set("name").to("John Doe")
    .set("email").to("john@example.com")
    .set("age").to(30)
    .build();

// 执行Mutation
DatabaseClient client = spanner.getDatabaseClient(databaseId);
client.write(Collections.singletonList(mutation));

3.3 Python Mutation #

python
from google.cloud import spanner

# 获取客户端
spanner_client = spanner.Client(project_id)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

# 创建Mutation并插入
def insert_user(database, user_id, name, email, age):
    with database.batch() as batch:
        batch.insert(
            table="users",
            columns=("user_id", "name", "email", "age"),
            values=[
                (user_id, name, email, age)
            ]
        )

# 调用
insert_user(database, 1, "John Doe", "john@example.com", 30)

3.4 Go Mutation #

go
import (
    "context"
    "cloud.google.com/go/spanner"
)

func insertUser(ctx context.Context, client *spanner.Client, userID int64, name, email string, age int64) error {
    m := spanner.Insert("users",
        []string{"user_id", "name", "email", "age"},
        []interface{}{userID, name, email, age},
    )
    
    _, err := client.Apply(ctx, []*spanner.Mutation{m})
    return err
}

3.5 Node.js Mutation #

javascript
const { Spanner } = require('@google-cloud/spanner');

async function insertUser(database, userId, name, email, age) {
    const mutation = {
        insert: {
            table: 'users',
            columns: ['user_id', 'name', 'email', 'age'],
            values: [[userId.toString(), name, email, age.toString()]]
        }
    };
    
    await database.runTransactionAsync(async (transaction) => {
        await transaction.mutate(mutation);
        await transaction.commit();
    });
}

四、批量插入 #

4.1 DML批量插入 #

sql
-- Spanner不支持单条INSERT多行VALUES语法
-- 需要使用多条INSERT或UNION ALL

-- 方式1: 多条INSERT
INSERT INTO users (user_id, name) VALUES (1, 'User1');
INSERT INTO users (user_id, name) VALUES (2, 'User2');
INSERT INTO users (user_id, name) VALUES (3, 'User3');

-- 方式2: INSERT SELECT UNION ALL
INSERT INTO users (user_id, name)
SELECT * FROM UNNEST([
    (4, 'User4'),
    (5, 'User5'),
    (6, 'User6')
]) AS t(user_id, name);

4.2 Mutation批量插入 #

java
// Java批量插入
List<Mutation> mutations = new ArrayList<>();

for (int i = 0; i < 1000; i++) {
    Mutation mutation = Mutation.newInsertBuilder("users")
        .set("user_id").to((long) i)
        .set("name").to("User" + i)
        .build();
    mutations.add(mutation);
}

// 批量写入
DatabaseClient client = spanner.getDatabaseClient(databaseId);
client.write(mutations);
python
# Python批量插入
def batch_insert_users(database, users):
    with database.batch() as batch:
        batch.insert(
            table="users",
            columns=("user_id", "name", "email"),
            values=users  # 列表 of 元组
        )

# 调用
users = [
    (1, "User1", "user1@example.com"),
    (2, "User2", "user2@example.com"),
    (3, "User3", "user3@example.com"),
]
batch_insert_users(database, users)

4.3 使用Batch API #

java
// Java BatchClient
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchWriteResult;

BatchClient batchClient = spanner.getBatchClient(databaseId);

List<Mutation> mutations = new ArrayList<>();
// ... 添加大量mutations

BatchWriteResult result = batchClient.batchWrite(mutations);
System.out.println("成功插入: " + result.getSuccessfulRowCount());

4.4 批量插入限制 #

text
批量插入限制:
├── 单次Mutation数量: 无限制
├── 单次数据大小: 100MB
├── 单行大小: 10GB
└── 建议: 每批1000-10000行

五、条件插入 #

5.1 INSERT OR IGNORE #

sql
-- 如果主键已存在则忽略
INSERT INTO users (user_id, name)
SELECT 1, 'John'
WHERE NOT EXISTS (
    SELECT 1 FROM users WHERE user_id = 1
);

5.2 INSERT OR UPDATE #

java
// 使用InsertOrUpdate Mutation
Mutation mutation = Mutation.newInsertOrUpdateBuilder("users")
    .set("user_id").to(1L)
    .set("name").to("John Doe Updated")
    .set("email").to("john.updated@example.com")
    .build();

5.3 REPLACE #

java
// 使用Replace Mutation(删除旧行,插入新行)
Mutation mutation = Mutation.newReplaceBuilder("users")
    .set("user_id").to(1L)
    .set("name").to("John Doe Replaced")
    .set("email").to("john.replaced@example.com")
    .build();

六、事务中的插入 #

6.1 读写事务 #

java
// Java读写事务
DatabaseClient client = spanner.getDatabaseClient(databaseId);

TransactionRunner runner = client.readWriteTransaction();
runner.run(transaction -> {
    // 插入数据
    transaction.buffer(Mutation.newInsertBuilder("users")
        .set("user_id").to(1L)
        .set("name").to("John")
        .build());
    
    // 查询刚插入的数据
    Struct row = transaction.readRow("users", 
        Key.of(1L), Arrays.asList("name"));
    
    return null;
});

6.2 Python事务 #

python
# Python事务
def insert_in_transaction(database, user_id, name):
    def transaction_func(transaction):
        # 检查是否存在
        row = transaction.read("users", ["user_id"], 
                              spanner.KeySet([spanner.Key([user_id])]))
        
        # 插入数据
        transaction.insert(
            table="users",
            columns=("user_id", "name"),
            values=[(user_id, name)]
        )
    
    database.run_in_transaction(transaction_func)

6.3 Go事务 #

go
// Go事务
func insertInTransaction(ctx context.Context, client *spanner.Client, userID int64, name string) error {
    _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
        // 插入数据
        m := spanner.Insert("users",
            []string{"user_id", "name"},
            []interface{}{userID, name},
        )
        return txn.BufferWrite([]*spanner.Mutation{m})
    })
    return err
}

七、插入性能优化 #

7.1 批量大小选择 #

text
批量大小建议:
├── 小批量(< 100行): 延迟低,吞吐量低
├── 中批量(100-1000行): 平衡延迟和吞吐量
├── 大批量(1000-10000行): 吞吐量高,延迟高
└── 超大批量(> 10000行): 可能超时

7.2 主键设计 #

text
主键设计对插入性能影响:
├── 单调递增: 写入热点,性能差
├── 随机分布: 写入均匀,性能好
├── 位反转: 写入均匀,性能好
└── 复合主键: 根据第一列分布

7.3 并发插入 #

java
// Java并发插入
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<Void>> futures = new ArrayList<>();

for (int batch = 0; batch < 10; batch++) {
    final int batchNum = batch;
    futures.add(executor.submit(() -> {
        List<Mutation> mutations = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            long id = batchNum * 1000L + i;
            mutations.add(Mutation.newInsertBuilder("users")
                .set("user_id").to(id)
                .set("name").to("User" + id)
                .build());
        }
        client.write(mutations);
        return null;
    }));
}

// 等待所有完成
for (Future<Void> future : futures) {
    future.get();
}

7.4 使用Commit时间戳 #

sql
-- 创建表时启用提交时间戳
CREATE TABLE logs (
    log_id INT64 NOT NULL,
    message STRING(MAX),
    commit_ts TIMESTAMP OPTIONS (allow_commit_timestamp = true)
) PRIMARY KEY (log_id);

-- 插入时使用PENDING_COMMIT_TIMESTAMP()
INSERT INTO logs (log_id, message, commit_ts)
VALUES (1, 'Test message', PENDING_COMMIT_TIMESTAMP());

八、错误处理 #

8.1 常见错误 #

java
try {
    client.write(mutations);
} catch (SpannerException e) {
    switch (e.getErrorCode()) {
        case ALREADY_EXISTS:
            // 主键冲突
            System.err.println("主键已存在: " + e.getMessage());
            break;
        case FAILED_PRECONDITION:
            // 条件不满足
            System.err.println("条件不满足: " + e.getMessage());
            break;
        case DEADLINE_EXCEEDED:
            // 超时
            System.err.println("操作超时: " + e.getMessage());
            break;
        default:
            System.err.println("其他错误: " + e.getMessage());
    }
}

8.2 重试策略 #

java
// Java重试策略
import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(Duration.ofMillis(100))
    .setMaxRetryDelay(Duration.ofSeconds(5))
    .setRetryDelayMultiplier(2.0)
    .setMaxAttempts(5)
    .build();

SpannerOptions options = SpannerOptions.newBuilder()
    .setRetrySettings(retrySettings)
    .build();

九、插入最佳实践 #

9.1 选择插入方式 #

text
选择建议:
├── 少量数据(< 100行): DML
├── 中量数据(100-10000行): Mutation
├── 大量数据(> 10000行): Batch API
└── 需要条件判断: DML

9.2 事务建议 #

text
事务使用建议:
├── 减少事务大小
├── 避免长事务
├── 合理设置超时
└── 处理冲突重试

9.3 性能建议 #

text
性能优化建议:
├── 使用批量插入
├── 合理设计主键
├── 控制并发数量
├── 使用提交时间戳
└── 监控写入延迟

十、总结 #

插入方式对比:

方式 适用场景 性能
DML INSERT 少量数据,需要条件 一般
Mutation 中量数据,批量操作
Batch API 大量数据,导入场景 最好

最佳实践:

text
1. 选择合适的插入方式
   └── 根据数据量和需求选择

2. 使用批量插入
   └── 减少网络往返次数

3. 合理设计主键
   └── 避免写入热点

4. 处理错误和重试
   └── 保证数据一致性

5. 监控插入性能
   └── 及时发现瓶颈

下一步,让我们学习数据更新操作!

最后更新:2026-03-27