Couchbase文档插入 #

一、概述 #

1.1 文档结构 #

Couchbase存储的是JSON文档,每个文档由以下部分组成:

text
┌─────────────────────────────────────────┐
│              Document                    │
├─────────────────────────────────────────┤
│  Key(文档键)- 唯一标识                 │
├─────────────────────────────────────────┤
│  Value(文档值)- JSON数据               │
├─────────────────────────────────────────┤
│  Metadata(元数据)                      │
│  ├── CAS(比较交换值)                   │
│  ├── Expiration(过期时间)              │
│  └── Flags(标志位)                     │
└─────────────────────────────────────────┘

1.2 插入方式对比 #

方式 说明 Key存在时行为
INSERT 插入新文档 报错
UPSERT 插入或更新 更新
INSERT IGNORE 忽略错误插入 忽略

二、INSERT语句 #

2.1 基本语法 #

sql
INSERT INTO `keyspace` (KEY, VALUE)
VALUES ('key', {document});

2.2 插入单个文档 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user::001',
    {
        'type': 'user',
        'name': '张三',
        'email': 'zhangsan@example.com',
        'age': 28,
        'city': '北京',
        'created_at': '2024-01-15T10:30:00Z'
    }
);

2.3 使用RETURNING子句 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user::002',
    {
        'type': 'user',
        'name': '李四',
        'email': 'lisi@example.com'
    }
)
RETURNING META().id AS doc_id, *;

2.4 插入时设置过期时间 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE, OPTIONS)
VALUES (
    'session::abc123',
    {
        'user_id': 'user::001',
        'token': 'abc123',
        'created_at': NOW_STR()
    },
    {'expiration': 3600}
);

三、批量插入 #

3.1 多值插入 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES
    ('user::001', {'type': 'user', 'name': '张三', 'age': 28}),
    ('user::002', {'type': 'user', 'name': '李四', 'age': 32}),
    ('user::003', {'type': 'user', 'name': '王五', 'age': 25}),
    ('user::004', {'type': 'user', 'name': '赵六', 'age': 30});

3.2 从查询结果插入 #

sql
INSERT INTO `my-bucket`.`_default`.`users_backup` (KEY, VALUE)
SELECT META().id, _default
FROM `my-bucket`.`_default`.`_default`
WHERE type = 'user';

3.3 带条件的批量插入 #

sql
INSERT INTO `my-bucket`.`_default`.`active_users` (KEY, VALUE)
SELECT 'active::' || META().id, 
       {'name': name, 'email': email, 'status': 'active'}
FROM `my-bucket`.`_default`.`_default`
WHERE type = 'user' AND status = 'active';

四、UPSERT语句 #

4.1 基本语法 #

sql
UPSERT INTO `keyspace` (KEY, VALUE)
VALUES ('key', {document});

4.2 插入或更新 #

sql
UPSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user::001',
    {
        'type': 'user',
        'name': '张三',
        'email': 'zhangsan_new@example.com',
        'age': 29,
        'updated_at': NOW_STR()
    }
);

4.3 UPSERT vs INSERT #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES ('user::001', {'name': '张三'});

INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES ('user::001', {'name': '李四'});

UPSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES ('user::001', {'name': '王五'});

4.4 批量UPSERT #

sql
UPSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES
    ('user::001', {'type': 'user', 'name': '张三', 'version': 2}),
    ('user::002', {'type': 'user', 'name': '李四', 'version': 2}),
    ('user::003', {'type': 'user', 'name': '王五', 'version': 1});

五、INSERT IGNORE #

5.1 忽略重复错误 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES ('user::001', {'name': '张三'});

INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES ('user::001', {'name': '李四'})
ON CONFLICT DO NOTHING;

5.2 批量插入忽略错误 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES
    ('user::001', {'name': '张三'}),
    ('user::002', {'name': '李四'}),
    ('user::003', {'name': '王五'})
ON CONFLICT DO NOTHING;

六、SDK插入操作 #

6.1 Python SDK #

python
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.options import UpsertOptions, InsertOptions
from couchbase.exceptions import DocumentExistsException

cluster = Cluster(
    'couchbase://localhost',
    ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)

bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()

doc = {
    'type': 'user',
    'name': '张三',
    'email': 'zhangsan@example.com',
    'age': 28
}

try:
    collection.insert('user::001', doc)
except DocumentExistsException:
    print('文档已存在')

collection.upsert('user::001', doc)

collection.upsert('session::abc123', 
    {'user_id': 'user::001', 'token': 'abc123'},
    UpsertOptions(timeout=timedelta(seconds=5), 
                  expiry=timedelta(hours=1))
)

6.2 Node.js SDK #

javascript
const couchbase = require('couchbase');

const cluster = new couchbase.Cluster('couchbase://localhost', {
    username: 'Administrator',
    password: 'password'
});

const bucket = cluster.bucket('my-bucket');
const collection = bucket.defaultCollection();

const doc = {
    type: 'user',
    name: '张三',
    email: 'zhangsan@example.com',
    age: 28
};

try {
    await collection.insert('user::001', doc);
} catch (error) {
    if (error instanceof couchbase.DocumentExistsError) {
        console.log('文档已存在');
    }
}

await collection.upsert('user::001', doc);

await collection.insert('session::abc123', 
    { user_id: 'user::001', token: 'abc123' },
    { timeout: 5000, expiry: 3600 }
);

6.3 Java SDK #

java
import com.couchbase.client.java.*;
import com.couchbase.client.java.json.*;
import com.couchbase.client.java.kv.*;
import java.time.Duration;

Cluster cluster = Cluster.connect(
    "localhost",
    ClusterOptions.clusterOptions("Administrator", "password")
);

Bucket bucket = cluster.bucket("my-bucket");
Collection collection = bucket.defaultCollection();

JsonObject doc = JsonObject.create()
    .put("type", "user")
    .put("name", "张三")
    .put("email", "zhangsan@example.com")
    .put("age", 28);

try {
    collection.insert("user::001", doc);
} catch (DocumentExistsException e) {
    System.out.println("文档已存在");
}

collection.upsert("user::001", doc);

collection.insert(
    "session::abc123",
    JsonObject.create()
        .put("user_id", "user::001")
        .put("token", "abc123"),
    InsertOptions.insertOptions()
        .timeout(Duration.ofSeconds(5))
        .expiry(Duration.ofHours(1))
);

七、文档Key设计 #

7.1 Key命名规范 #

text
推荐格式:
[类型]::[唯一标识]

示例:
user::001
user::002
product::SKU123
order::20240115001
session::abc123
cache::user_preferences::001

7.2 Key设计原则 #

text
原则:
1. 唯一性 - 每个Key在Collection中唯一
2. 可读性 - 便于理解和调试
3. 稳定性 - 不随数据变化而改变
4. 简洁性 - 避免过长的Key

避免:
- 使用特殊字符(空格、换行等)
- 过长的Key(建议<250字符)
- 包含敏感信息

7.3 复合Key设计 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'order_item::order_001::item_001',
    {
        'order_id': 'order_001',
        'item_id': 'item_001',
        'product_id': 'prod_001',
        'quantity': 2,
        'price': 99.99
    }
);

INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user_activity::user_001::2024-01-15',
    {
        'user_id': 'user_001',
        'date': '2024-01-15',
        'activities': ['login', 'view_product', 'add_to_cart']
    }
);

八、文档设计最佳实践 #

8.1 添加类型字段 #

json
{
    "type": "user",
    "id": "user_001",
    "name": "张三",
    "email": "zhangsan@example.com"
}
sql
SELECT * FROM `my-bucket`.`_default`.`_default`
WHERE type = 'user';

8.2 添加时间戳 #

json
{
    "type": "user",
    "name": "张三",
    "created_at": "2024-01-15T10:30:00Z",
    "updated_at": "2024-01-20T15:45:00Z",
    "version": 1
}

8.3 嵌套文档设计 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user::001',
    {
        'type': 'user',
        'name': '张三',
        'contact': {
            'email': 'zhangsan@example.com',
            'phone': '13800138000'
        },
        'address': {
            'city': '北京',
            'street': '朝阳路100号',
            'zipcode': '100020'
        },
        'preferences': {
            'language': 'zh-CN',
            'timezone': 'Asia/Shanghai',
            'notifications': true
        }
    }
);

8.4 数组字段设计 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE)
VALUES (
    'user::001',
    {
        'type': 'user',
        'name': '张三',
        'hobbies': ['阅读', '游泳', '编程'],
        'tags': ['vip', 'premium'],
        'order_history': [
            {'order_id': 'order_001', 'date': '2024-01-10'},
            {'order_id': 'order_002', 'date': '2024-01-15'}
        ]
    }
);

九、性能优化 #

9.1 批量插入优化 #

python
from couchbase.options import BatchOptions

docs = [
    ('user::001', {'name': '张三'}),
    ('user::002', {'name': '李四'}),
    ('user::003', {'name': '王五'}),
]

for key, value in docs:
    collection.upsert(key, value)

9.2 使用异步操作 #

python
import asyncio
from acouchbase.cluster import Cluster, ClusterOptions
from acouchbase.auth import PasswordAuthenticator

async def batch_insert():
    cluster = Cluster(
        'couchbase://localhost',
        ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
    )
    
    bucket = cluster.bucket('my-bucket')
    collection = bucket.default_collection()
    
    docs = [
        ('user::001', {'name': '张三'}),
        ('user::002', {'name': '李四'}),
        ('user::003', {'name': '王五'}),
    ]
    
    tasks = [collection.upsert(key, value) for key, value in docs]
    await asyncio.gather(*tasks)

asyncio.run(batch_insert())

9.3 合理设置过期时间 #

sql
INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE, OPTIONS)
VALUES (
    'cache::popular_products',
    {'products': [...]},
    {'expiration': 300}
);

INSERT INTO `my-bucket`.`_default`.`_default` (KEY, VALUE, OPTIONS)
VALUES (
    'session::token_abc123',
    {'user_id': 'user_001', 'token': 'abc123'},
    {'expiration': 86400}
);

十、常见错误处理 #

10.1 文档已存在 #

python
from couchbase.exceptions import DocumentExistsException

try:
    collection.insert('user::001', doc)
except DocumentExistsException:
    collection.upsert('user::001', doc)

10.2 Key格式错误 #

python
from couchbase.exceptions import InvalidArgumentException

try:
    collection.insert('', doc)
except InvalidArgumentException as e:
    print(f'Key格式错误: {e}')

10.3 文档过大 #

python
from couchbase.exceptions import ValueTooLargeException

try:
    collection.insert('large_doc', large_doc)
except ValueTooLargeException:
    print('文档大小超过限制(20MB)')

十一、总结 #

插入操作要点:

操作 说明 使用场景
INSERT 插入新文档 确保Key不存在
UPSERT 插入或更新 不确定Key是否存在
INSERT IGNORE 忽略重复 批量导入数据

最佳实践:

  1. 设计合理的文档Key
  2. 添加type字段区分文档类型
  3. 使用批量操作提高性能
  4. 合理设置过期时间
  5. 处理好并发冲突

下一步,让我们学习文档更新操作!

最后更新:2026-03-27