生态工具 #

一、生态工具概述 #

text
CockroachDB 生态工具
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   数据变更捕获 (CDC):                                       │
│   ├── Changefeed: 实时数据流                               │
│   └── Kafka集成                                            │
│                                                             │
│   数据导入导出:                                             │
│   ├── IMPORT: 数据导入                                     │
│   ├── BACKUP/RESTORE: 备份恢复                             │
│   └── MOLT: 迁移工具                                       │
│                                                             │
│   监控工具:                                                 │
│   ├── Prometheus集成                                       │
│   ├── Grafana仪表板                                        │
│   └── Web UI                                               │
│                                                             │
│   开发工具:                                                 │
│   ├── cockroach sql: SQL客户端                             │
│   ├── cockroach demo: 演示集群                             │
│   └── cockroach debug: 调试工具                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

二、CDC (Change Data Capture) #

2.1 创建Changefeed #

sql
-- 创建基本Changefeed
CREATE CHANGEFEED FOR TABLE users
INTO 'kafka://localhost:9092'
WITH updated, resolved='10s';

-- 创建多表Changefeed
CREATE CHANGEFEED FOR TABLE users, orders
INTO 'kafka://localhost:9092'
WITH updated, resolved='10s';

-- 创建带格式的Changefeed
CREATE CHANGEFEED FOR TABLE users
INTO 'kafka://localhost:9092'
WITH format=json, updated, resolved='10s';

-- 创建带过滤的Changefeed
CREATE CHANGEFEED FOR TABLE orders
INTO 'kafka://localhost:9092'
WITH cursor='2024-01-01 00:00:00.000000Z'
WHERE status = 'completed';

2.2 Changefeed配置 #

sql
-- 配置选项
CREATE CHANGEFEED FOR TABLE users
INTO 'kafka://localhost:9092'
WITH
    format=json,              -- 输出格式
    updated,                  -- 包含更新时间戳
    resolved='10s',           -- 解析时间间隔
    min_checkpoint_frequency='10s',  -- 最小检查点频率
    envelope='wrapped',       -- 信封格式
    diff,                     -- 包含差异
    topic_prefix='myapp_',    -- 主题前缀
    kafka_sink_config='{"topic.auto.create": true}';

2.3 管理Changefeed #

sql
-- 查看Changefeed
SHOW JOBS;

-- 查看特定Changefeed
SHOW JOB 123456789;

-- 暂停Changefeed
PAUSE JOB 123456789;

-- 恢复Changefeed
RESUME JOB 123456789;

-- 取消Changefeed
CANCEL JOB 123456789;

2.4 CDC使用场景 #

text
CDC使用场景
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   实时数据同步:                                             │
│   ├── 同步到数据仓库                                       │
│   ├── 同步到搜索引擎                                       │
│   └── 同步到缓存系统                                       │
│                                                             │
│   事件驱动架构:                                             │
│   ├── 发送消息到Kafka                                      │
│   ├── 触发下游处理                                         │
│   └── 实现事件溯源                                         │
│                                                             │
│   审计日志:                                                 │
│   ├── 记录数据变更                                         │
│   ├── 合规要求                                             │
│   └── 安全审计                                             │
│                                                             │
│   数据分析:                                                 │
│   ├── 实时分析                                             │
│   ├── 流处理                                               │
│   └── 数据管道                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、数据导入 #

3.1 IMPORT INTO #

sql
-- 从CSV导入
IMPORT INTO users (name, email, age)
CSV DATA (
    'nodelocal://1/users.csv'
)
WITH
    skip = '1',           -- 跳过第一行
    delimiter = ',',      -- 分隔符
    quote = '"',          -- 引号
    nullif = '';          -- 空值处理

-- 从多个文件导入
IMPORT INTO users (name, email, age)
CSV DATA (
    'nodelocal://1/users1.csv',
    'nodelocal://1/users2.csv',
    'nodelocal://1/users3.csv'
);

-- 从S3导入
IMPORT INTO users (name, email, age)
CSV DATA (
    's3://my-bucket/users.csv?AWS_ACCESS_KEY_ID=xxx&AWS_SECRET_ACCESS_KEY=xxx'
);

3.2 导入格式 #

sql
-- CSV格式
IMPORT INTO users (name, email, age)
CSV DATA ('nodelocal://1/users.csv');

-- DELIMITED格式
IMPORT INTO users (name, email, age)
DELIMITED DATA ('nodelocal://1/users.txt')
WITH delimiter = E'\t';

-- AVRO格式
IMPORT INTO users
AVRO DATA ('nodelocal://1/users.avro');

-- PGDUMP格式
IMPORT INTO users
PGDUMP DATA ('nodelocal://1/users.sql');

-- MYSQL格式
IMPORT INTO users
MYSQLDUMP DATA ('nodelocal://1/users.sql');

3.3 导入最佳实践 #

text
导入最佳实践
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   1. 准备数据                                               │
│      ├── 清理数据格式                                      │
│      ├── 确保数据类型匹配                                  │
│      └── 处理NULL值                                        │
│                                                             │
│   2. 性能优化                                               │
│      ├── 使用多个文件并行导入                              │
│      ├── 禁用索引 (导入后创建)                             │
│      └── 增加导入并发                                      │
│                                                             │
│   3. 验证数据                                               │
│      ├── 检查导入行数                                      │
│      ├── 验证数据完整性                                    │
│      └── 检查约束                                          │
│                                                             │
│   4. 监控导入                                               │
│      ├── 查看导入进度                                      │
│      ├── 处理错误                                          │
│      └── 估计剩余时间                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

四、数据导出 #

4.1 EXPORT #

sql
-- 导出为CSV
EXPORT INTO CSV 'nodelocal://1/users_export.csv'
FROM SELECT * FROM users;

-- 导出查询结果
EXPORT INTO CSV 'nodelocal://1/active_users.csv'
FROM SELECT * FROM users WHERE status = 'active';

-- 导出带选项
EXPORT INTO CSV 'nodelocal://1/users.csv'
WITH delimiter = ',', nullas = 'NULL'
FROM SELECT * FROM users;

-- 导出到S3
EXPORT INTO CSV 's3://my-bucket/users.csv?AWS_ACCESS_KEY_ID=xxx&AWS_SECRET_ACCESS_KEY=xxx'
FROM SELECT * FROM users;

4.2 使用COPY #

sql
-- 导出数据
COPY (SELECT * FROM users) TO STDOUT WITH CSV HEADER;

-- 导出特定列
COPY (SELECT name, email FROM users) TO STDOUT WITH CSV;

-- 导出带条件
COPY (SELECT * FROM orders WHERE created_at > '2024-01-01') 
TO STDOUT WITH CSV HEADER;

五、数据迁移 #

5.1 MOLT工具 #

bash
# 安装MOLT
curl -L https://github.com/cockroachdb/molt/releases/latest/download/molt_linux_amd64 -o molt
chmod +x molt

# 从PostgreSQL迁移
./molt fetch --pgurl "postgresql://user:pass@host:5432/db"

# 从MySQL迁移
./molt fetch --mysqlurl "mysql://user:pass@host:3306/db"

# 导入到CockroachDB
./molt load --cockroachurl "postgresql://root@localhost:26257/defaultdb"

5.2 迁移步骤 #

text
数据迁移步骤
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   1. 评估迁移                                               │
│      ├── 检查兼容性                                        │
│      ├── 识别不兼容特性                                    │
│      └── 制定迁移计划                                      │
│                                                             │
│   2. Schema迁移                                             │
│      ├── 导出Schema                                        │
│      ├── 转换DDL                                           │
│      └── 在CockroachDB创建                                 │
│                                                             │
│   3. 数据迁移                                               │
│      ├── 导出数据                                          │
│      ├── 转换数据                                          │
│      └── 导入数据                                          │
│                                                             │
│   4. 验证迁移                                               │
│      ├── 检查数据完整性                                    │
│      ├── 验证应用功能                                      │
│      └── 性能测试                                          │
│                                                             │
│   5. 切换流量                                               │
│      ├── 配置CDC同步                                       │
│      ├── 验证数据一致性                                    │
│      └── 切换应用                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

六、命令行工具 #

6.1 cockroach sql #

bash
# 连接集群
cockroach sql --insecure --host=localhost:26257

# 执行SQL文件
cockroach sql --insecure -f script.sql

# 执行单条SQL
cockroach sql --insecure -e "SELECT * FROM users"

# 使用数据库
cockroach sql --insecure -d mydb

# 格式化输出
cockroach sql --insecure -e "SELECT * FROM users" --format=table

6.2 cockroach demo #

bash
# 启动演示集群
cockroach demo

# 启动带示例数据的演示
cockroach demo --with-load

# 指定节点数
cockroach demo --nodes=5

# 指定数据库
cockroach demo --database=bank

6.3 cockroach debug #

bash
# 创建调试包
cockroach debug zip --host=localhost:8080 debug.zip

# 分析日志
cockroach debug debug-zip debug.zip

# 查看集群状态
cockroach debug ballast --dir=/data

七、监控集成 #

7.1 Prometheus指标 #

text
关键指标
┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   SQL指标:                                                  │
│   ├── sql_latency_*: 查询延迟                              │
│   ├── sql_conns: 连接数                                    │
│   └── sql_*_count: SQL操作计数                             │
│                                                             │
│   存储指标:                                                 │
│   ├── storage_*: 存储操作                                  │
│   ├── livebytes: 活跃数据量                                │
│   └── capacity: 存储容量                                   │
│                                                             │
│   集群指标:                                                 │
│   ├── replicas: 副本数                                     │
│   ├── liveness_*: 节点存活                                 │
│   └── rebalance_*: 均衡状态                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

7.2 告警规则 #

yaml
# 关键告警规则
groups:
  - name: cockroachdb
    rules:
      - alert: HighLatency
        expr: histogram_quantile(0.99, rate(sql_latency_bucket[5m])) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High SQL latency"

      - alert: NodeDown
        expr: up{job="cockroachdb"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "CockroachDB node down"

八、总结 #

生态工具要点:

工具 说明
CDC 实时数据变更捕获
IMPORT 数据导入
EXPORT 数据导出
MOLT 数据迁移
CLI 命令行工具

恭喜你完成了 CockroachDB 完全指南的学习!

最后更新:2026-03-27