向量操作 #

本章详细介绍 Qdrant 中向量的增删改查操作。

向量操作概览 #

text
向量操作流程:

┌─────────────────────────────────────────────────────────────┐
│                      向量生命周期                            │
│                                                              │
│   创建 ──→ 插入 ──→ 更新 ──→ 查询 ──→ 删除                  │
│     │        │        │        │        │                   │
│     ↓        ↓        ↓        ↓        ↓                   │
│   Point   Upsert   Payload   Search  Delete                 │
│   Struct           Update            Filter                 │
│                                                              │
└─────────────────────────────────────────────────────────────┘

插入向量 #

单个向量插入 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

client = QdrantClient(":memory:")

client.create_collection(
    collection_name="vectors_demo",
    vectors_config=VectorParams(size=4, distance=Distance.COSINE)
)

point = PointStruct(
    id=1,
    vector=[0.1, 0.2, 0.3, 0.4],
    payload={
        "title": "示例文档",
        "category": "demo",
        "score": 0.95
    }
)

client.upsert(
    collection_name="vectors_demo",
    points=[point]
)

print("单个向量插入成功")

批量向量插入 #

python
import numpy as np

batch_size = 100
vectors = np.random.rand(batch_size, 4).tolist()

points = [
    PointStruct(
        id=i,
        vector=vectors[i],
        payload={
            "batch_index": i,
            "category": "batch",
            "value": np.random.rand()
        }
    )
    for i in range(batch_size)
]

client.upsert(
    collection_name="vectors_demo",
    points=points
)

print(f"批量插入 {batch_size} 个向量成功")

使用 UUID #

python
from uuid import uuid4

points = [
    PointStruct(
        id=str(uuid4()),
        vector=np.random.rand(4).tolist(),
        payload={"type": "uuid"}
    )
    for _ in range(10)
]

client.upsert(
    collection_name="vectors_demo",
    points=points
)

print("UUID 向量插入成功")

多向量插入 #

python
client.create_collection(
    collection_name="multi_vectors",
    vectors_config={
        "text": VectorParams(size=4, distance=Distance.COSINE),
        "image": VectorParams(size=4, distance=Distance.EUCLID)
    }
)

from qdrant_client.models import PointStruct

point = PointStruct(
    id=1,
    vector={
        "text": [0.1, 0.2, 0.3, 0.4],
        "image": [0.5, 0.6, 0.7, 0.8]
    },
    payload={"title": "多模态文档"}
)

client.upsert(
    collection_name="multi_vectors",
    points=[point]
)

print("多向量插入成功")

稀疏向量插入 #

python
from qdrant_client.models import SparseVector

client.create_collection(
    collection_name="sparse_vectors",
    vectors_config=VectorParams(size=4, distance=Distance.COSINE),
    sparse_vectors_config={"text-sparse": {}}
)

point = PointStruct(
    id=1,
    vector=[0.1, 0.2, 0.3, 0.4],
    sparse_vectors={
        "text-sparse": SparseVector(
            indices=[1, 5, 10, 20],
            values=[0.5, 0.8, 0.3, 0.9]
        )
    },
    payload={"type": "hybrid"}
)

client.upsert(
    collection_name="sparse_vectors",
    points=[point]
)

print("稀疏向量插入成功")

查询向量 #

通过 ID 获取 #

python
point = client.retrieve(
    collection_name="vectors_demo",
    ids=[1, 2, 3]
)

for p in point:
    print(f"ID: {p.id}")
    print(f"Payload: {p.payload}")
    print("---")

获取向量值 #

python
points = client.retrieve(
    collection_name="vectors_demo",
    ids=[1],
    with_vectors=True,
    with_payload=True
)

for p in points:
    print(f"ID: {p.id}")
    print(f"Vector: {p.vector}")
    print(f"Payload: {p.payload}")

批量获取 #

python
ids = list(range(1, 11))

points = client.retrieve(
    collection_name="vectors_demo",
    ids=ids,
    with_vectors=False,
    with_payload=True
)

print(f"获取了 {len(points)} 个点")

更新向量 #

完全更新 #

python
point = PointStruct(
    id=1,
    vector=[0.9, 0.8, 0.7, 0.6],
    payload={
        "title": "更新后的文档",
        "category": "updated",
        "version": 2
    }
)

client.upsert(
    collection_name="vectors_demo",
    points=[point]
)

print("向量已完全更新")

仅更新向量 #

python
from qdrant_client.models import PointVectors

client.update_vectors(
    collection_name="vectors_demo",
    points=[
        PointVectors(
            id=1,
            vector=[0.2, 0.3, 0.4, 0.5]
        )
    ]
)

print("向量值已更新")

仅更新 Payload #

python
client.set_payload(
    collection_name="vectors_demo",
    payload={
        "updated_at": "2024-01-01",
        "status": "active"
    },
    points=[1, 2, 3]
)

print("Payload 已更新")

删除 Payload 字段 #

python
client.delete_payload(
    collection_name="vectors_demo",
    keys=["temp_field", "old_field"],
    points=[1, 2, 3]
)

print("指定 Payload 字段已删除")

清空 Payload #

python
client.clear_payload(
    collection_name="vectors_demo",
    points_selector=[1, 2, 3]
)

print("Payload 已清空")

删除向量 #

通过 ID 删除 #

python
client.delete(
    collection_name="vectors_demo",
    points_selector=[1, 2, 3]
)

print("指定 ID 的向量已删除")

通过条件删除 #

python
from qdrant_client.models import Filter, FieldCondition, MatchValue

client.delete(
    collection_name="vectors_demo",
    points_selector=Filter(
        must=[
            FieldCondition(
                key="category",
                match=MatchValue(value="old")
            )
        ]
    )
)

print("符合条件的向量已删除")

删除所有向量 #

python
from qdrant_client.models import Filter

client.delete(
    collection_name="vectors_demo",
    points_selector=Filter()
)

print("所有向量已删除")

批量操作 #

使用迭代器批量插入 #

python
import numpy as np
from tqdm import tqdm

def batch_insert(collection_name, total_points, batch_size=100):
    vector_size = 4
    
    for start in tqdm(range(0, total_points, batch_size)):
        end = min(start + batch_size, total_points)
        
        vectors = np.random.rand(end - start, vector_size).tolist()
        
        points = [
            PointStruct(
                id=start + i,
                vector=vectors[i],
                payload={
                    "batch": start // batch_size,
                    "index": i
                }
            )
            for i in range(end - start)
        ]
        
        client.upsert(
            collection_name=collection_name,
            points=points
        )

batch_insert("vectors_demo", 1000, batch_size=100)
print("批量插入完成")

并行批量插入 #

python
from concurrent.futures import ThreadPoolExecutor
import numpy as np

def insert_batch(collection_name, start_id, batch_size, vector_size):
    vectors = np.random.rand(batch_size, vector_size).tolist()
    
    points = [
        PointStruct(
            id=start_id + i,
            vector=vectors[i],
            payload={"batch": start_id // batch_size}
        )
        for i in range(batch_size)
    ]
    
    client.upsert(collection_name=collection_name, points=points)
    return batch_size

def parallel_insert(collection_name, total_points, batch_size=100, workers=4):
    vector_size = 4
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        
        for start in range(0, total_points, batch_size):
            future = executor.submit(
                insert_batch,
                collection_name,
                start,
                batch_size,
                vector_size
            )
            futures.append(future)
        
        total_inserted = sum(f.result() for f in futures)
    
    return total_inserted

total = parallel_insert("vectors_demo", 1000, batch_size=100, workers=4)
print(f"并行插入 {total} 个向量完成")

批量更新 #

python
from qdrant_client.models import PointIdsList

batch_size = 100
total_points = 1000

for start in range(0, total_points, batch_size):
    ids = list(range(start, min(start + batch_size, total_points)))
    
    client.set_payload(
        collection_name="vectors_demo",
        payload={"processed": True},
        points=PointIdsList(points=ids)
    )

print("批量更新 Payload 完成")

批量删除 #

python
ids_to_delete = list(range(0, 100))

client.delete(
    collection_name="vectors_demo",
    points_selector=ids_to_delete
)

print(f"批量删除 {len(ids_to_delete)} 个向量完成")

滚动查询 #

滚动查询用于遍历大量数据。

基础滚动查询 #

python
from qdrant_client.models import ScrollResult

offset = None
page_size = 100
all_points = []

while True:
    result: ScrollResult = client.scroll(
        collection_name="vectors_demo",
        limit=page_size,
        offset=offset,
        with_payload=True,
        with_vectors=False
    )
    
    points, next_offset = result
    
    all_points.extend(points)
    
    if next_offset is None:
        break
    
    offset = next_offset

print(f"总共获取 {len(all_points)} 个点")

带过滤的滚动查询 #

python
from qdrant_client.models import Filter, FieldCondition, MatchValue

offset = None
filtered_points = []

while True:
    result = client.scroll(
        collection_name="vectors_demo",
        scroll_filter=Filter(
            must=[
                FieldCondition(
                    key="category",
                    match=MatchValue(value="batch")
                )
            ]
        ),
        limit=100,
        offset=offset
    )
    
    points, next_offset = result
    filtered_points.extend(points)
    
    if next_offset is None:
        break
    
    offset = next_offset

print(f"过滤后获取 {len(filtered_points)} 个点")

向量存在性检查 #

检查单个向量 #

python
def point_exists(collection_name, point_id):
    try:
        points = client.retrieve(
            collection_name=collection_name,
            ids=[point_id]
        )
        return len(points) > 0
    except Exception:
        return False

if point_exists("vectors_demo", 1):
    print("向量存在")
else:
    print("向量不存在")

批量检查 #

python
ids_to_check = [1, 2, 3, 999, 1000]

existing_points = client.retrieve(
    collection_name="vectors_demo",
    ids=ids_to_check
)

existing_ids = {p.id for p in existing_points}

for id in ids_to_check:
    status = "存在" if id in existing_ids else "不存在"
    print(f"ID {id}: {status}")

向量计数 #

获取向量数量 #

python
info = client.get_collection("vectors_demo")

print(f"总向量数: {info.points_count}")
print(f"已索引向量数: {info.indexed_vectors_count}")

按条件计数 #

python
from qdrant_client.models import CountResult

result: CountResult = client.count(
    collection_name="vectors_demo",
    count_filter=Filter(
        must=[
            FieldCondition(
                key="category",
                match=MatchValue(value="batch")
            )
        ]
    ),
    exact=True
)

print(f"符合条件的向量数: {result.count}")

最佳实践 #

批量大小选择 #

text
批量大小建议:

小向量(< 100 维):
├── 批量大小:500-1000
└── 网络开销为主

中等向量(100-512 维):
├── 批量大小:100-500
└── 平衡网络和内存

大向量(> 512 维):
├── 批量大小:50-100
└── 内存开销为主

ID 管理策略 #

python
import hashlib

def generate_deterministic_id(content):
    content_hash = hashlib.md5(content.encode()).hexdigest()
    return int(content_hash[:16], 16)

content = "这是一段文档内容"
point_id = generate_deterministic_id(content)

print(f"确定性 ID: {point_id}")

错误处理 #

python
from qdrant_client.http.exceptions import UnexpectedResponse

def safe_upsert(collection_name, points, max_retries=3):
    for attempt in range(max_retries):
        try:
            client.upsert(
                collection_name=collection_name,
                points=points,
                wait=True
            )
            return True
        except UnexpectedResponse as e:
            print(f"尝试 {attempt + 1} 失败: {e}")
            if attempt == max_retries - 1:
                raise
    return False

safe_upsert("vectors_demo", [point])

小结 #

本章详细介绍了向量操作:

  • 插入向量(单个、批量、多向量、稀疏向量)
  • 查询向量
  • 更新向量和 Payload
  • 删除向量
  • 批量操作
  • 滚动查询

下一步 #

掌握向量操作后,继续学习 搜索查询,了解如何高效地进行向量搜索!

最后更新:2026-04-04