向量操作 #
本章详细介绍 Qdrant 中向量的增删改查操作。
向量操作概览 #
text
向量操作流程:
┌─────────────────────────────────────────────────────────────┐
│ 向量生命周期 │
│ │
│ 创建 ──→ 插入 ──→ 更新 ──→ 查询 ──→ 删除 │
│ │ │ │ │ │ │
│ ↓ ↓ ↓ ↓ ↓ │
│ Point Upsert Payload Search Delete │
│ Struct Update Filter │
│ │
└─────────────────────────────────────────────────────────────┘
插入向量 #
单个向量插入 #
python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
client = QdrantClient(":memory:")
client.create_collection(
collection_name="vectors_demo",
vectors_config=VectorParams(size=4, distance=Distance.COSINE)
)
point = PointStruct(
id=1,
vector=[0.1, 0.2, 0.3, 0.4],
payload={
"title": "示例文档",
"category": "demo",
"score": 0.95
}
)
client.upsert(
collection_name="vectors_demo",
points=[point]
)
print("单个向量插入成功")
批量向量插入 #
python
import numpy as np
batch_size = 100
vectors = np.random.rand(batch_size, 4).tolist()
points = [
PointStruct(
id=i,
vector=vectors[i],
payload={
"batch_index": i,
"category": "batch",
"value": np.random.rand()
}
)
for i in range(batch_size)
]
client.upsert(
collection_name="vectors_demo",
points=points
)
print(f"批量插入 {batch_size} 个向量成功")
使用 UUID #
python
from uuid import uuid4
points = [
PointStruct(
id=str(uuid4()),
vector=np.random.rand(4).tolist(),
payload={"type": "uuid"}
)
for _ in range(10)
]
client.upsert(
collection_name="vectors_demo",
points=points
)
print("UUID 向量插入成功")
多向量插入 #
python
client.create_collection(
collection_name="multi_vectors",
vectors_config={
"text": VectorParams(size=4, distance=Distance.COSINE),
"image": VectorParams(size=4, distance=Distance.EUCLID)
}
)
from qdrant_client.models import PointStruct
point = PointStruct(
id=1,
vector={
"text": [0.1, 0.2, 0.3, 0.4],
"image": [0.5, 0.6, 0.7, 0.8]
},
payload={"title": "多模态文档"}
)
client.upsert(
collection_name="multi_vectors",
points=[point]
)
print("多向量插入成功")
稀疏向量插入 #
python
from qdrant_client.models import SparseVector
client.create_collection(
collection_name="sparse_vectors",
vectors_config=VectorParams(size=4, distance=Distance.COSINE),
sparse_vectors_config={"text-sparse": {}}
)
point = PointStruct(
id=1,
vector=[0.1, 0.2, 0.3, 0.4],
sparse_vectors={
"text-sparse": SparseVector(
indices=[1, 5, 10, 20],
values=[0.5, 0.8, 0.3, 0.9]
)
},
payload={"type": "hybrid"}
)
client.upsert(
collection_name="sparse_vectors",
points=[point]
)
print("稀疏向量插入成功")
查询向量 #
通过 ID 获取 #
python
point = client.retrieve(
collection_name="vectors_demo",
ids=[1, 2, 3]
)
for p in point:
print(f"ID: {p.id}")
print(f"Payload: {p.payload}")
print("---")
获取向量值 #
python
points = client.retrieve(
collection_name="vectors_demo",
ids=[1],
with_vectors=True,
with_payload=True
)
for p in points:
print(f"ID: {p.id}")
print(f"Vector: {p.vector}")
print(f"Payload: {p.payload}")
批量获取 #
python
ids = list(range(1, 11))
points = client.retrieve(
collection_name="vectors_demo",
ids=ids,
with_vectors=False,
with_payload=True
)
print(f"获取了 {len(points)} 个点")
更新向量 #
完全更新 #
python
point = PointStruct(
id=1,
vector=[0.9, 0.8, 0.7, 0.6],
payload={
"title": "更新后的文档",
"category": "updated",
"version": 2
}
)
client.upsert(
collection_name="vectors_demo",
points=[point]
)
print("向量已完全更新")
仅更新向量 #
python
from qdrant_client.models import PointVectors
client.update_vectors(
collection_name="vectors_demo",
points=[
PointVectors(
id=1,
vector=[0.2, 0.3, 0.4, 0.5]
)
]
)
print("向量值已更新")
仅更新 Payload #
python
client.set_payload(
collection_name="vectors_demo",
payload={
"updated_at": "2024-01-01",
"status": "active"
},
points=[1, 2, 3]
)
print("Payload 已更新")
删除 Payload 字段 #
python
client.delete_payload(
collection_name="vectors_demo",
keys=["temp_field", "old_field"],
points=[1, 2, 3]
)
print("指定 Payload 字段已删除")
清空 Payload #
python
client.clear_payload(
collection_name="vectors_demo",
points_selector=[1, 2, 3]
)
print("Payload 已清空")
删除向量 #
通过 ID 删除 #
python
client.delete(
collection_name="vectors_demo",
points_selector=[1, 2, 3]
)
print("指定 ID 的向量已删除")
通过条件删除 #
python
from qdrant_client.models import Filter, FieldCondition, MatchValue
client.delete(
collection_name="vectors_demo",
points_selector=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="old")
)
]
)
)
print("符合条件的向量已删除")
删除所有向量 #
python
from qdrant_client.models import Filter
client.delete(
collection_name="vectors_demo",
points_selector=Filter()
)
print("所有向量已删除")
批量操作 #
使用迭代器批量插入 #
python
import numpy as np
from tqdm import tqdm
def batch_insert(collection_name, total_points, batch_size=100):
vector_size = 4
for start in tqdm(range(0, total_points, batch_size)):
end = min(start + batch_size, total_points)
vectors = np.random.rand(end - start, vector_size).tolist()
points = [
PointStruct(
id=start + i,
vector=vectors[i],
payload={
"batch": start // batch_size,
"index": i
}
)
for i in range(end - start)
]
client.upsert(
collection_name=collection_name,
points=points
)
batch_insert("vectors_demo", 1000, batch_size=100)
print("批量插入完成")
并行批量插入 #
python
from concurrent.futures import ThreadPoolExecutor
import numpy as np
def insert_batch(collection_name, start_id, batch_size, vector_size):
vectors = np.random.rand(batch_size, vector_size).tolist()
points = [
PointStruct(
id=start_id + i,
vector=vectors[i],
payload={"batch": start_id // batch_size}
)
for i in range(batch_size)
]
client.upsert(collection_name=collection_name, points=points)
return batch_size
def parallel_insert(collection_name, total_points, batch_size=100, workers=4):
vector_size = 4
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for start in range(0, total_points, batch_size):
future = executor.submit(
insert_batch,
collection_name,
start,
batch_size,
vector_size
)
futures.append(future)
total_inserted = sum(f.result() for f in futures)
return total_inserted
total = parallel_insert("vectors_demo", 1000, batch_size=100, workers=4)
print(f"并行插入 {total} 个向量完成")
批量更新 #
python
from qdrant_client.models import PointIdsList
batch_size = 100
total_points = 1000
for start in range(0, total_points, batch_size):
ids = list(range(start, min(start + batch_size, total_points)))
client.set_payload(
collection_name="vectors_demo",
payload={"processed": True},
points=PointIdsList(points=ids)
)
print("批量更新 Payload 完成")
批量删除 #
python
ids_to_delete = list(range(0, 100))
client.delete(
collection_name="vectors_demo",
points_selector=ids_to_delete
)
print(f"批量删除 {len(ids_to_delete)} 个向量完成")
滚动查询 #
滚动查询用于遍历大量数据。
基础滚动查询 #
python
from qdrant_client.models import ScrollResult
offset = None
page_size = 100
all_points = []
while True:
result: ScrollResult = client.scroll(
collection_name="vectors_demo",
limit=page_size,
offset=offset,
with_payload=True,
with_vectors=False
)
points, next_offset = result
all_points.extend(points)
if next_offset is None:
break
offset = next_offset
print(f"总共获取 {len(all_points)} 个点")
带过滤的滚动查询 #
python
from qdrant_client.models import Filter, FieldCondition, MatchValue
offset = None
filtered_points = []
while True:
result = client.scroll(
collection_name="vectors_demo",
scroll_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="batch")
)
]
),
limit=100,
offset=offset
)
points, next_offset = result
filtered_points.extend(points)
if next_offset is None:
break
offset = next_offset
print(f"过滤后获取 {len(filtered_points)} 个点")
向量存在性检查 #
检查单个向量 #
python
def point_exists(collection_name, point_id):
try:
points = client.retrieve(
collection_name=collection_name,
ids=[point_id]
)
return len(points) > 0
except Exception:
return False
if point_exists("vectors_demo", 1):
print("向量存在")
else:
print("向量不存在")
批量检查 #
python
ids_to_check = [1, 2, 3, 999, 1000]
existing_points = client.retrieve(
collection_name="vectors_demo",
ids=ids_to_check
)
existing_ids = {p.id for p in existing_points}
for id in ids_to_check:
status = "存在" if id in existing_ids else "不存在"
print(f"ID {id}: {status}")
向量计数 #
获取向量数量 #
python
info = client.get_collection("vectors_demo")
print(f"总向量数: {info.points_count}")
print(f"已索引向量数: {info.indexed_vectors_count}")
按条件计数 #
python
from qdrant_client.models import CountResult
result: CountResult = client.count(
collection_name="vectors_demo",
count_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="batch")
)
]
),
exact=True
)
print(f"符合条件的向量数: {result.count}")
最佳实践 #
批量大小选择 #
text
批量大小建议:
小向量(< 100 维):
├── 批量大小:500-1000
└── 网络开销为主
中等向量(100-512 维):
├── 批量大小:100-500
└── 平衡网络和内存
大向量(> 512 维):
├── 批量大小:50-100
└── 内存开销为主
ID 管理策略 #
python
import hashlib
def generate_deterministic_id(content):
content_hash = hashlib.md5(content.encode()).hexdigest()
return int(content_hash[:16], 16)
content = "这是一段文档内容"
point_id = generate_deterministic_id(content)
print(f"确定性 ID: {point_id}")
错误处理 #
python
from qdrant_client.http.exceptions import UnexpectedResponse
def safe_upsert(collection_name, points, max_retries=3):
for attempt in range(max_retries):
try:
client.upsert(
collection_name=collection_name,
points=points,
wait=True
)
return True
except UnexpectedResponse as e:
print(f"尝试 {attempt + 1} 失败: {e}")
if attempt == max_retries - 1:
raise
return False
safe_upsert("vectors_demo", [point])
小结 #
本章详细介绍了向量操作:
- 插入向量(单个、批量、多向量、稀疏向量)
- 查询向量
- 更新向量和 Payload
- 删除向量
- 批量操作
- 滚动查询
下一步 #
掌握向量操作后,继续学习 搜索查询,了解如何高效地进行向量搜索!
最后更新:2026-04-04