Couchbase文档更新 #
一、概述 #
1.1 更新方式 #
Couchbase提供多种文档更新方式:
| 方式 | 说明 | 使用场景 |
|---|---|---|
| UPDATE | N1QL更新语句 | 批量更新、条件更新 |
| UPSERT | 插入或替换 | 不确定文档是否存在 |
| REPLACE | 替换整个文档 | 确定文档存在时 |
| SUBDOC | 部分字段更新 | 高效更新单个字段 |
1.2 乐观锁机制 #
Couchbase使用CAS(Compare And Swap)实现乐观锁:
text
更新流程:
1. 读取文档,获取CAS值
2. 修改文档内容
3. 提交更新时携带CAS值
4. 服务端验证CAS值
- 匹配:更新成功,返回新CAS
- 不匹配:更新失败,需要重试
二、UPDATE语句 #
2.1 基本语法 #
sql
UPDATE `keyspace`
SET field1 = value1, field2 = value2, ...
WHERE condition;
2.2 更新单个字段 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET email = 'zhangsan_new@example.com'
WHERE type = 'user' AND name = '张三';
2.3 更新多个字段 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET
email = 'zhangsan_new@example.com',
age = 29,
updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001';
2.4 使用RETURNING #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET age = age + 1, updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001'
RETURNING META().id, name, age;
2.5 条件更新 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET status = 'inactive'
WHERE type = 'user'
AND last_login < DATE_ADD_STR(NOW_STR(), -30, 'day');
UPDATE `my-bucket`.`_default`.`_default`
SET discount = CASE
WHEN total_spent > 10000 THEN 0.2
WHEN total_spent > 5000 THEN 0.1
ELSE 0.05
END
WHERE type = 'user';
三、批量更新 #
3.1 更新多个文档 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET status = 'active', updated_at = NOW_STR()
WHERE type = 'user' AND status = 'pending';
UPDATE `my-bucket`.`_default`.`_default`
SET price = price * 1.1
WHERE type = 'product' AND category = 'electronics';
3.2 使用子查询更新 #
sql
UPDATE `my-bucket`.`_default`.`_default` AS u
SET order_count = (
SELECT COUNT(*)
FROM `my-bucket`.`_default`.`_default` AS o
WHERE o.type = 'order' AND o.user_id = META(u).id
)
WHERE u.type = 'user';
四、嵌套文档更新 #
4.1 更新嵌套字段 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET address.city = '上海',
address.updated_at = NOW_STR()
WHERE type = 'user' AND META().id = 'user::001';
4.2 更新数组元素 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET hobbies = ARRAY_APPEND(hobbies, '摄影')
WHERE type = 'user' AND META().id = 'user::001';
UPDATE `my-bucket`.`_default`.`_default`
SET tags = ARRAY_PREPEND('premium', tags)
WHERE type = 'user' AND META().id = 'user::001';
UPDATE `my-bucket`.`_default`.`_default`
SET scores = ARRAY_PUT(scores, 100)
WHERE type = 'user' AND META().id = 'user::001';
4.3 删除数组元素 #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET hobbies = ARRAY_REMOVE(hobbies, '游泳')
WHERE type = 'user' AND META().id = 'user::001';
UPDATE `my-bucket`.`_default`.`_default`
SET tags = ARRAY_REMOVE(tags, 'inactive')
WHERE type = 'user';
五、MERGE语句 #
5.1 基本语法 #
sql
MERGE INTO target_keyspace
USING source
ON condition
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
5.2 插入或更新 #
sql
MERGE INTO `my-bucket`.`_default`.`_default` AS target
USING [
{'id': 'user::001', 'name': '张三', 'email': 'zhangsan@example.com'},
{'id': 'user::002', 'name': '李四', 'email': 'lisi@example.com'}
] AS source
ON META(target).id = source.id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.email = source.email,
target.updated_at = NOW_STR()
WHEN NOT MATCHED THEN
INSERT (KEY source.id, VALUE source);
5.3 从查询结果合并 #
sql
MERGE INTO `my-bucket`.`_default`.`users` AS target
USING (
SELECT META().id, name, email, status
FROM `my-bucket`.`_default`.`staging_users`
WHERE processed = true
) AS source
ON META(target).id = source.id
WHEN MATCHED THEN
UPDATE SET target.status = source.status
WHEN NOT MATCHED THEN
INSERT (KEY source.id, VALUE source);
六、SDK更新操作 #
6.1 Python SDK #
python
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ReplaceOptions, UpsertOptions
from couchbase.exceptions import CasMismatchException, DocumentNotFoundException
cluster = Cluster(
'couchbase://localhost',
ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)
bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()
result = collection.get('user::001')
doc = result.content
doc['age'] = 29
doc['updated_at'] = '2024-01-20T15:00:00Z'
try:
collection.replace('user::001', doc, ReplaceOptions(cas=result.cas))
except CasMismatchException:
print('文档已被其他操作修改,请重试')
doc = {
'type': 'user',
'name': '张三',
'email': 'zhangsan@example.com',
'age': 29
}
collection.upsert('user::001', doc)
collection.mutate_in('user::001', [
('age', 30, dict(path='age')),
('updated_at', NOW_STR(), dict(path='updated_at'))
])
6.2 Node.js SDK #
javascript
const couchbase = require('couchbase');
const cluster = new couchbase.Cluster('couchbase://localhost', {
username: 'Administrator',
password: 'password'
});
const bucket = cluster.bucket('my-bucket');
const collection = bucket.defaultCollection();
try {
const result = await collection.get('user::001');
const doc = result.content;
doc.age = 29;
doc.updated_at = new Date().toISOString();
await collection.replace('user::001', doc, { cas: result.cas });
} catch (error) {
if (error instanceof couchbase.CasMismatchError) {
console.log('文档已被修改,请重试');
}
}
await collection.upsert('user::001', {
type: 'user',
name: '张三',
email: 'zhangsan@example.com',
age: 29
});
await collection.mutateIn('user::001', [
couchbase.MutateInSpec.upsert('age', 30),
couchbase.MutateInSpec.upsert('updated_at', new Date().toISOString())
]);
6.3 Java SDK #
java
import com.couchbase.client.java.*;
import com.couchbase.client.java.json.*;
import com.couchbase.client.java.kv.*;
import java.time.Duration;
Cluster cluster = Cluster.connect(
"localhost",
ClusterOptions.clusterOptions("Administrator", "password")
);
Bucket bucket = cluster.bucket("my-bucket");
Collection collection = bucket.defaultCollection();
try {
GetResult result = collection.get("user::001");
JsonObject doc = result.contentAsObject();
doc.put("age", 29);
doc.put("updated_at", Instant.now().toString());
collection.replace(
"user::001",
doc,
ReplaceOptions.replaceOptions().cas(result.cas())
);
} catch (CasMismatchException e) {
System.out.println("文档已被修改,请重试");
}
JsonObject doc = JsonObject.create()
.put("type", "user")
.put("name", "张三")
.put("email", "zhangsan@example.com")
.put("age", 29);
collection.upsert("user::001", doc);
collection.mutateIn("user::001", Arrays.asList(
MutateInSpec.upsert("age", 30),
MutateInSpec.upsert("updated_at", Instant.now().toString())
));
七、Subdocument操作 #
7.1 概述 #
Subdocument操作允许只更新文档的部分字段,减少网络传输:
text
完整文档更新:
客户端 -> 服务端:整个文档(可能很大)
服务端 -> 客户端:整个文档
Subdocument更新:
客户端 -> 服务端:只发送要更新的字段
服务端 -> 客户端:只返回需要的字段
7.2 更新单个字段 #
python
from couchbase.options import MutateInOptions
collection.mutate_in('user::001', [
MutateSpec.upsert('age', 30),
MutateSpec.upsert('updated_at', '2024-01-20T15:00:00Z')
])
7.3 嵌套字段更新 #
python
collection.mutate_in('user::001', [
MutateSpec.upsert('address.city', '上海'),
MutateSpec.upsert('address.updated_at', NOW_STR())
])
7.4 数组操作 #
python
from couchbase.mutate_in_spec import MutateInSpec
collection.mutate_in('user::001', [
MutateInSpec.array_append('hobbies', '摄影'),
MutateInSpec.array_prepend('tags', 'vip'),
MutateInSpec.array_insert('scores', 0, 100)
])
7.5 删除字段 #
python
collection.mutate_in('user::001', [
MutateSpec.remove('temp_field'),
MutateSpec.remove('address.old_street')
])
八、原子计数器 #
8.1 使用UPDATE #
sql
UPDATE `my-bucket`.`_default`.`_default`
SET view_count = view_count + 1
WHERE type = 'article' AND META().id = 'article::001';
8.2 使用SDK #
python
result = collection.lookup_in('counter::view_count', [
LookupInSpec.get('value')
])
collection.mutate_in('counter::view_count', [
MutateSpec.increment('value', 1)
])
8.3 专用计数器 #
python
result = collection.binary().increment('counter::page_views',
IncrementOptions(delta=1, initial=0))
print(f'新值: {result.content}')
九、CAS详解 #
9.1 CAS工作原理 #
text
时间线:
T1: 客户端A读取文档,CAS=100
T2: 客户端B读取文档,CAS=100
T3: 客户端A更新文档(CAS=100),成功,新CAS=101
T4: 客户端B更新文档(CAS=100),失败,CAS不匹配
T5: 客户端B重新读取文档,CAS=101
T6: 客户端B更新文档(CAS=101),成功
9.2 CAS重试模式 #
python
from couchbase.exceptions import CasMismatchException
import time
def update_with_retry(key, update_func, max_retries=3):
for attempt in range(max_retries):
try:
result = collection.get(key)
doc = result.content
doc = update_func(doc)
collection.replace(key, doc, ReplaceOptions(cas=result.cas))
return True
except CasMismatchException:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise
return False
def increment_age(doc):
doc['age'] = doc.get('age', 0) + 1
doc['updated_at'] = NOW_STR()
return doc
update_with_retry('user::001', increment_age)
9.3 无条件更新 #
python
collection.upsert('user::001', doc)
十、更新性能优化 #
10.1 使用Subdocument #
python
collection.mutate_in('user::001', [
MutateSpec.upsert('age', 30)
])
collection.replace('user::001', full_doc)
10.2 批量更新 #
python
from couchbase.options import BatchOptions
updates = [
('user::001', {'age': 30}),
('user::002', {'age': 25}),
('user::003', {'age': 28}),
]
for key, updates in updates:
collection.mutate_in(key, [
MutateSpec.upsert(k, v) for k, v in updates.items()
])
10.3 异步更新 #
python
import asyncio
from acouchbase.cluster import Cluster
async def batch_update():
cluster = await Cluster.connect(
'couchbase://localhost',
ClusterOptions(PasswordAuthenticator('Administrator', 'password'))
)
bucket = cluster.bucket('my-bucket')
collection = bucket.default_collection()
updates = [
('user::001', {'age': 30}),
('user::002', {'age': 25}),
]
tasks = [
collection.mutate_in(key, [
MutateSpec.upsert(k, v) for k, v in updates.items()
])
for key, updates in updates
]
await asyncio.gather(*tasks)
asyncio.run(batch_update())
十一、常见错误处理 #
11.1 文档不存在 #
python
from couchbase.exceptions import DocumentNotFoundException
try:
collection.replace('user::999', doc)
except DocumentNotFoundException:
collection.upsert('user::999', doc)
11.2 CAS不匹配 #
python
from couchbase.exceptions import CasMismatchException
try:
collection.replace('user::001', doc, ReplaceOptions(cas=cas_value))
except CasMismatchException:
result = collection.get('user::001')
doc = merge_changes(result.content, doc)
collection.replace('user::001', doc, ReplaceOptions(cas=result.cas))
11.3 路径不存在 #
python
from couchbase.exceptions import PathNotFoundException
try:
collection.mutate_in('user::001', [
MutateSpec.replace('nonexistent_field', 'value')
])
except PathNotFoundException:
collection.mutate_in('user::001', [
MutateSpec.upsert('nonexistent_field', 'value')
])
十二、总结 #
更新操作要点:
| 操作 | 说明 | 使用场景 |
|---|---|---|
| UPDATE | N1QL批量更新 | 条件更新多个文档 |
| REPLACE | 替换整个文档 | 需要CAS控制 |
| UPSERT | 插入或替换 | 不确定文档是否存在 |
| mutate_in | 部分字段更新 | 高效更新少量字段 |
最佳实践:
- 使用CAS实现乐观锁
- 优先使用Subdocument更新
- 批量更新使用异步操作
- 合理处理并发冲突
- 添加更新时间戳
下一步,让我们学习文档删除操作!
最后更新:2026-03-27