Couchbase文档更新 #

一、概述 #

1.1 更新方式 #

Couchbase提供多种文档更新方式:

方式 说明 使用场景
UPDATE N1QL更新语句 批量更新、条件更新
UPSERT 插入或替换 不确定文档是否存在
REPLACE 替换整个文档 确定文档存在时
SUBDOC 部分字段更新 高效更新单个字段

1.2 乐观锁机制 #

Couchbase使用CAS(Compare And Swap)实现乐观锁:

text
更新流程:
1. 读取文档,获取CAS值
2. 修改文档内容
3. 提交更新时携带CAS值
4. 服务端验证CAS值
   - 匹配:更新成功,返回新CAS
   - 不匹配:更新失败,需要重试

二、UPDATE语句 #

2.1 基本语法 #

sql
UPDATE `keyspace`
SET field1 = value1, field2 = value2, ...
WHERE condition;

2.2 更新单个字段 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET email = 'zhangsan_new@example.com'
WHERE type = 'user' AND name = '张三';

2.3 更新多个字段 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET 
    email = 'zhangsan_new@example.com',
    age = 29,
    updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001';

2.4 使用RETURNING #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET age = age + 1, updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001'
RETURNING META().id, name, age;

2.5 条件更新 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET status = 'inactive'
WHERE type = 'user' 
  AND last_login < DATE_ADD_STR(NOW_STR(), -30, 'day');

UPDATE `my-bucket`.`_default`.`_default`
SET discount = CASE
    WHEN total_spent > 10000 THEN 0.2
    WHEN total_spent > 5000 THEN 0.1
    ELSE 0.05
END
WHERE type = 'user';

三、批量更新 #

3.1 更新多个文档 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET status = 'active', updated_at = NOW_STR()
WHERE type = 'user' AND status = 'pending';

UPDATE `my-bucket`.`_default`.`_default`
SET price = price * 1.1
WHERE type = 'product' AND category = 'electronics';

3.2 使用子查询更新 #

sql
UPDATE `my-bucket`.`_default`.`_default` AS u
SET order_count = (
    SELECT COUNT(*) 
    FROM `my-bucket`.`_default`.`_default` AS o
    WHERE o.type = 'order' AND o.user_id = META(u).id
)
WHERE u.type = 'user';

四、嵌套文档更新 #

4.1 更新嵌套字段 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET address.city = '上海',
    address.updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001';

4.2 更新数组元素 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET hobbies = ARRAY_APPEND(hobbies, '摄影')
WHERE type = 'user' AND META().id = 'user::001';

UPDATE `my-bucket`.`_default`.`_default`
SET tags = ARRAY_PREPEND('premium', tags)
WHERE type = 'user' AND META().id = 'user::001';

UPDATE `my-bucket`.`_default`.`_default`
SET scores = ARRAY_PUT(scores, 100)
WHERE type = 'user' AND META().id = 'user::001';

4.3 删除数组元素 #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET hobbies = ARRAY_REMOVE(hobbies, '游泳')
WHERE type = 'user' AND META().id = 'user::001';

UPDATE `my-bucket`.`_default`.`_default`
SET tags = ARRAY_REMOVE(tags, 'inactive')
WHERE type = 'user';

五、MERGE语句 #

5.1 基本语法 #

sql
MERGE INTO target_keyspace
USING source
ON condition
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

5.2 插入或更新 #

sql
MERGE INTO `my-bucket`.`_default`.`_default` AS target
USING [
    {'id': 'user::001', 'name': '张三', 'email': 'zhangsan@example.com'},
    {'id': 'user::002', 'name': '李四', 'email': 'lisi@example.com'}
] AS source
ON META(target).id = source.id
WHEN MATCHED THEN
    UPDATE SET 
        target.name = source.name,
        target.email = source.email,
        target.updated_at = NOW_STR()
WHEN NOT MATCHED THEN
    INSERT (KEY source.id, VALUE source);

5.3 从查询结果合并 #

sql
MERGE INTO `my-bucket`.`_default`.`users` AS target
USING (
    SELECT META().id, name, email, status
    FROM `my-bucket`.`_default`.`staging_users`
    WHERE processed = true
) AS source
ON META(target).id = source.id
WHEN MATCHED THEN
    UPDATE SET target.status = source.status
WHEN NOT MATCHED THEN
    INSERT (KEY source.id, VALUE source);

六、SDK更新操作 #

6.1 Python SDK #

python
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ReplaceOptions, UpsertOptions
from couchbase.exceptions import CasMismatchException, DocumentNotFoundException

cluster = Cluster(
    'couchbase://localhost',
    ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)

bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()

result = collection.get('user::001')
doc = result.content
doc['age'] = 29
doc['updated_at'] = '2024-01-20T15:00:00Z'

try:
    collection.replace('user::001', doc, ReplaceOptions(cas=result.cas))
except CasMismatchException:
    print('文档已被其他操作修改,请重试')

doc = {
    'type': 'user',
    'name': '张三',
    'email': 'zhangsan@example.com',
    'age': 29
}
collection.upsert('user::001', doc)

collection.mutate_in('user::001', [
    ('age', 30, dict(path='age')),
    ('updated_at', NOW_STR(), dict(path='updated_at'))
])

6.2 Node.js SDK #

javascript
const couchbase = require('couchbase');

const cluster = new couchbase.Cluster('couchbase://localhost', {
    username: 'Administrator',
    password: 'password'
});

const bucket = cluster.bucket('my-bucket');
const collection = bucket.defaultCollection();

try {
    const result = await collection.get('user::001');
    const doc = result.content;
    doc.age = 29;
    doc.updated_at = new Date().toISOString();
    
    await collection.replace('user::001', doc, { cas: result.cas });
} catch (error) {
    if (error instanceof couchbase.CasMismatchError) {
        console.log('文档已被修改,请重试');
    }
}

await collection.upsert('user::001', {
    type: 'user',
    name: '张三',
    email: 'zhangsan@example.com',
    age: 29
});

await collection.mutateIn('user::001', [
    couchbase.MutateInSpec.upsert('age', 30),
    couchbase.MutateInSpec.upsert('updated_at', new Date().toISOString())
]);

6.3 Java SDK #

java
import com.couchbase.client.java.*;
import com.couchbase.client.java.json.*;
import com.couchbase.client.java.kv.*;
import java.time.Duration;

Cluster cluster = Cluster.connect(
    "localhost",
    ClusterOptions.clusterOptions("Administrator", "password")
);

Bucket bucket = cluster.bucket("my-bucket");
Collection collection = bucket.defaultCollection();

try {
    GetResult result = collection.get("user::001");
    JsonObject doc = result.contentAsObject();
    doc.put("age", 29);
    doc.put("updated_at", Instant.now().toString());
    
    collection.replace(
        "user::001", 
        doc, 
        ReplaceOptions.replaceOptions().cas(result.cas())
    );
} catch (CasMismatchException e) {
    System.out.println("文档已被修改,请重试");
}

JsonObject doc = JsonObject.create()
    .put("type", "user")
    .put("name", "张三")
    .put("email", "zhangsan@example.com")
    .put("age", 29);
collection.upsert("user::001", doc);

collection.mutateIn("user::001", Arrays.asList(
    MutateInSpec.upsert("age", 30),
    MutateInSpec.upsert("updated_at", Instant.now().toString())
));

七、Subdocument操作 #

7.1 概述 #

Subdocument操作允许只更新文档的部分字段,减少网络传输:

text
完整文档更新:
客户端 -> 服务端:整个文档(可能很大)
服务端 -> 客户端:整个文档

Subdocument更新:
客户端 -> 服务端:只发送要更新的字段
服务端 -> 客户端:只返回需要的字段

7.2 更新单个字段 #

python
from couchbase.options import MutateInOptions

collection.mutate_in('user::001', [
    MutateSpec.upsert('age', 30),
    MutateSpec.upsert('updated_at', '2024-01-20T15:00:00Z')
])

7.3 嵌套字段更新 #

python
collection.mutate_in('user::001', [
    MutateSpec.upsert('address.city', '上海'),
    MutateSpec.upsert('address.updated_at', NOW_STR())
])

7.4 数组操作 #

python
from couchbase.mutate_in_spec import MutateInSpec

collection.mutate_in('user::001', [
    MutateInSpec.array_append('hobbies', '摄影'),
    MutateInSpec.array_prepend('tags', 'vip'),
    MutateInSpec.array_insert('scores', 0, 100)
])

7.5 删除字段 #

python
collection.mutate_in('user::001', [
    MutateSpec.remove('temp_field'),
    MutateSpec.remove('address.old_street')
])

八、原子计数器 #

8.1 使用UPDATE #

sql
UPDATE `my-bucket`.`_default`.`_default`
SET view_count = view_count + 1
WHERE type = 'article' AND META().id = 'article::001';

8.2 使用SDK #

python
result = collection.lookup_in('counter::view_count', [
    LookupInSpec.get('value')
])

collection.mutate_in('counter::view_count', [
    MutateSpec.increment('value', 1)
])

8.3 专用计数器 #

python
result = collection.binary().increment('counter::page_views', 
    IncrementOptions(delta=1, initial=0))
print(f'新值: {result.content}')

九、CAS详解 #

9.1 CAS工作原理 #

text
时间线:
T1: 客户端A读取文档,CAS=100
T2: 客户端B读取文档,CAS=100
T3: 客户端A更新文档(CAS=100),成功,新CAS=101
T4: 客户端B更新文档(CAS=100),失败,CAS不匹配
T5: 客户端B重新读取文档,CAS=101
T6: 客户端B更新文档(CAS=101),成功

9.2 CAS重试模式 #

python
from couchbase.exceptions import CasMismatchException
import time

def update_with_retry(key, update_func, max_retries=3):
    for attempt in range(max_retries):
        try:
            result = collection.get(key)
            doc = result.content
            
            doc = update_func(doc)
            
            collection.replace(key, doc, ReplaceOptions(cas=result.cas))
            return True
        except CasMismatchException:
            if attempt < max_retries - 1:
                time.sleep(0.1 * (attempt + 1))
                continue
            raise
    return False

def increment_age(doc):
    doc['age'] = doc.get('age', 0) + 1
    doc['updated_at'] = NOW_STR()
    return doc

update_with_retry('user::001', increment_age)

9.3 无条件更新 #

python
collection.upsert('user::001', doc)

十、更新性能优化 #

10.1 使用Subdocument #

python
collection.mutate_in('user::001', [
    MutateSpec.upsert('age', 30)
])

collection.replace('user::001', full_doc)

10.2 批量更新 #

python
from couchbase.options import BatchOptions

updates = [
    ('user::001', {'age': 30}),
    ('user::002', {'age': 25}),
    ('user::003', {'age': 28}),
]

for key, updates in updates:
    collection.mutate_in(key, [
        MutateSpec.upsert(k, v) for k, v in updates.items()
    ])

10.3 异步更新 #

python
import asyncio
from acouchbase.cluster import Cluster

async def batch_update():
    cluster = await Cluster.connect(
        'couchbase://localhost',
        ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
    )
    
    bucket = cluster.bucket('my-bucket')
    collection = bucket.default_collection()
    
    updates = [
        ('user::001', {'age': 30}),
        ('user::002', {'age': 25}),
    ]
    
    tasks = [
        collection.mutate_in(key, [
            MutateSpec.upsert(k, v) for k, v in updates.items()
        ])
        for key, updates in updates
    ]
    await asyncio.gather(*tasks)

asyncio.run(batch_update())

十一、常见错误处理 #

11.1 文档不存在 #

python
from couchbase.exceptions import DocumentNotFoundException

try:
    collection.replace('user::999', doc)
except DocumentNotFoundException:
    collection.upsert('user::999', doc)

11.2 CAS不匹配 #

python
from couchbase.exceptions import CasMismatchException

try:
    collection.replace('user::001', doc, ReplaceOptions(cas=cas_value))
except CasMismatchException:
    result = collection.get('user::001')
    doc = merge_changes(result.content, doc)
    collection.replace('user::001', doc, ReplaceOptions(cas=result.cas))

11.3 路径不存在 #

python
from couchbase.exceptions import PathNotFoundException

try:
    collection.mutate_in('user::001', [
        MutateSpec.replace('nonexistent_field', 'value')
    ])
except PathNotFoundException:
    collection.mutate_in('user::001', [
        MutateSpec.upsert('nonexistent_field', 'value')
    ])

十二、总结 #

更新操作要点:

操作 说明 使用场景
UPDATE N1QL批量更新 条件更新多个文档
REPLACE 替换整个文档 需要CAS控制
UPSERT 插入或替换 不确定文档是否存在
mutate_in 部分字段更新 高效更新少量字段

最佳实践:

  1. 使用CAS实现乐观锁
  2. 优先使用Subdocument更新
  3. 批量更新使用异步操作
  4. 合理处理并发冲突
  5. 添加更新时间戳

下一步,让我们学习文档删除操作!

最后更新:2026-03-27