快速开始 #

本章通过实际代码示例带你快速上手 Qdrant,掌握基本操作流程。

准备工作 #

安装依赖 #

bash
pip install qdrant-client sentence-transformers

导入库 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    PointStruct,
    Filter,
    FieldCondition,
    MatchValue,
    Range
)
from sentence_transformers import SentenceTransformer

第一个示例 #

1. 创建客户端 #

python
client = QdrantClient(":memory:")

print("Qdrant 客户端已创建")

2. 创建集合 #

python
client.create_collection(
    collection_name="hello_qdrant",
    vectors_config=VectorParams(
        size=384,
        distance=Distance.COSINE
    )
)

print("集合创建成功")

3. 插入向量 #

python
points = [
    PointStruct(
        id=1,
        vector=[0.1, 0.2, 0.3, 0.4],
        payload={"text": "Hello World", "category": "greeting"}
    ),
    PointStruct(
        id=2,
        vector=[0.5, 0.6, 0.7, 0.8],
        payload={"text": "Goodbye", "category": "farewell"}
    )
]

client.upsert(
    collection_name="hello_qdrant",
    points=points
)

print("向量插入成功")

4. 搜索向量 #

python
results = client.search(
    collection_name="hello_qdrant",
    query_vector=[0.1, 0.2, 0.3, 0.4],
    limit=2
)

for result in results:
    print(f"ID: {result.id}, Score: {result.score}")
    print(f"Payload: {result.payload}")
    print("---")

输出:

text
ID: 1, Score: 1.0
Payload: {'text': 'Hello World', 'category': 'greeting'}
---
ID: 2, Score: 0.9899
Payload: {'text': 'Goodbye', 'category': 'farewell'}
---

文档搜索示例 #

下面是一个完整的文档语义搜索示例。

1. 准备数据 #

python
documents = [
    "机器学习是人工智能的一个分支,它使计算机能够从数据中学习。",
    "深度学习使用神经网络来模拟人脑的学习过程。",
    "自然语言处理让计算机能够理解和生成人类语言。",
    "计算机视觉使机器能够从图像和视频中提取信息。",
    "强化学习通过奖励机制训练智能体做出决策。",
    "推荐系统根据用户行为预测用户可能感兴趣的内容。",
    "时间序列分析用于处理随时间变化的数据。",
    "异常检测识别数据中的异常模式或离群点。"
]

print(f"准备了 {len(documents)} 个文档")

2. 生成向量 #

python
model = SentenceTransformer('all-MiniLM-L6-v2')

embeddings = model.encode(documents)

print(f"向量维度: {embeddings.shape[1]}")
print(f"向量数量: {embeddings.shape[0]}")

3. 创建集合并插入数据 #

python
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(
        size=384,
        distance=Distance.COSINE
    )
)

points = [
    PointStruct(
        id=i,
        vector=embeddings[i].tolist(),
        payload={"text": documents[i], "index": i}
    )
    for i in range(len(documents))
]

client.upsert(
    collection_name="documents",
    points=points
)

print("文档向量已插入")

4. 语义搜索 #

python
def search_documents(query, limit=3):
    query_vector = model.encode(query)
    
    results = client.search(
        collection_name="documents",
        query_vector=query_vector.tolist(),
        limit=limit
    )
    
    return results

queries = [
    "什么是 AI?",
    "如何处理图像数据?",
    "推荐算法是怎么工作的?"
]

for query in queries:
    print(f"\n查询: {query}")
    print("-" * 50)
    
    results = search_documents(query)
    
    for i, result in enumerate(results, 1):
        print(f"{i}. {result.payload['text']}")
        print(f"   相似度: {result.score:.4f}")

输出:

text
查询: 什么是 AI?
--------------------------------------------------
1. 机器学习是人工智能的一个分支,它使计算机能够从数据中学习。
   相似度: 0.6523
2. 深度学习使用神经网络来模拟人脑的学习过程。
   相似度: 0.5891
3. 自然语言处理让计算机能够理解和生成人类语言。
   相似度: 0.5123

查询: 如何处理图像数据?
--------------------------------------------------
1. 计算机视觉使机器能够从图像和视频中提取信息。
   相似度: 0.7821
2. 深度学习使用神经网络来模拟人脑的学习过程。
   相似度: 0.4532
3. 机器学习是人工智能的一个分支,它使计算机能够从数据中学习。
   相似度: 0.4123

带过滤的搜索 #

1. 创建带分类的数据 #

python
products = [
    {"name": "iPhone 15", "category": "手机", "price": 7999, "brand": "Apple"},
    {"name": "MacBook Pro", "category": "电脑", "price": 14999, "brand": "Apple"},
    {"name": "Galaxy S24", "category": "手机", "price": 5999, "brand": "Samsung"},
    {"name": "ThinkPad X1", "category": "电脑", "price": 9999, "brand": "Lenovo"},
    {"name": "iPad Pro", "category": "平板", "price": 6999, "brand": "Apple"},
    {"name": "Surface Pro", "category": "平板", "price": 7999, "brand": "Microsoft"},
]

product_texts = [f"{p['name']} {p['category']} {p['brand']}" for p in products]
product_vectors = model.encode(product_texts)

client.create_collection(
    collection_name="products",
    vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

points = [
    PointStruct(
        id=i,
        vector=product_vectors[i].tolist(),
        payload=products[i]
    )
    for i in range(len(products))
]

client.upsert(collection_name="products", points=points)

print("产品数据已插入")

2. 创建 Payload 索引 #

python
client.create_payload_index(
    collection_name="products",
    field_name="category",
    field_schema="keyword"
)

client.create_payload_index(
    collection_name="products",
    field_name="price",
    field_schema="float"
)

client.create_payload_index(
    collection_name="products",
    field_name="brand",
    field_schema="keyword"
)

print("索引创建成功")

3. 带过滤条件的搜索 #

python
def search_products(query, category=None, max_price=None, brand=None):
    query_vector = model.encode(query)
    
    must_conditions = []
    
    if category:
        must_conditions.append(
            FieldCondition(
                key="category",
                match=MatchValue(value=category)
            )
        )
    
    if max_price:
        must_conditions.append(
            FieldCondition(
                key="price",
                range=Range(lte=max_price)
            )
        )
    
    if brand:
        must_conditions.append(
            FieldCondition(
                key="brand",
                match=MatchValue(value=brand)
            )
        )
    
    query_filter = Filter(must=must_conditions) if must_conditions else None
    
    results = client.search(
        collection_name="products",
        query_vector=query_vector.tolist(),
        query_filter=query_filter,
        limit=5
    )
    
    return results

print("搜索手机产品:")
results = search_products("手机", category="手机")
for r in results:
    print(f"  {r.payload['name']} - ¥{r.payload['price']}")

print("\n搜索 Apple 产品,价格低于 10000:")
results = search_products("Apple", brand="Apple", max_price=10000)
for r in results:
    print(f"  {r.payload['name']} - ¥{r.payload['price']}")

print("\n搜索电脑产品:")
results = search_products("电脑", category="电脑")
for r in results:
    print(f"  {r.payload['name']} - ¥{r.payload['price']}")

批量操作 #

批量插入 #

python
import numpy as np

batch_size = 100
total_points = 1000

client.create_collection(
    collection_name="batch_demo",
    vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

for i in range(0, total_points, batch_size):
    batch_vectors = np.random.rand(batch_size, 384).tolist()
    
    points = [
        PointStruct(
            id=i + j,
            vector=batch_vectors[j],
            payload={"batch": i // batch_size, "index": j}
        )
        for j in range(batch_size)
    ]
    
    client.upsert(
        collection_name="batch_demo",
        points=points
    )
    
    print(f"已插入 {i + batch_size} 个点")

print(f"总共插入 {total_points} 个点")

批量删除 #

python
ids_to_delete = list(range(0, 100))

client.delete(
    collection_name="batch_demo",
    points_selector=ids_to_delete
)

print(f"已删除 {len(ids_to_delete)} 个点")

批量更新 Payload #

python
from qdrant_client.models import PointIdsList

client.set_payload(
    collection_name="batch_demo",
    payload={"updated": True, "timestamp": "2024-01-01"},
    points=PointIdsList(points=list(range(100, 200)))
)

print("已更新 100 个点的 Payload")

分页查询 #

python
def paginated_search(query_vector, page=1, page_size=10):
    offset = (page - 1) * page_size
    
    results = client.search(
        collection_name="batch_demo",
        query_vector=query_vector,
        limit=page_size,
        offset=offset
    )
    
    return results

query_vec = np.random.rand(384).tolist()

print("第 1 页:")
page1 = paginated_search(query_vec, page=1)
for r in page1:
    print(f"  ID: {r.id}, Score: {r.score:.4f}")

print("\n第 2 页:")
page2 = paginated_search(query_vec, page=2)
for r in page2:
    print(f"  ID: {r.id}, Score: {r.score:.4f}")

获取集合信息 #

python
collection_info = client.get_collection("documents")

print(f"集合名称: documents")
print(f"向量数量: {collection_info.points_count}")
print(f"向量维度: {collection_info.config.params.vectors.size}")
print(f"距离度量: {collection_info.config.params.vectors.distance}")
print(f"状态: {collection_info.status}")
print(f"索引向量数: {collection_info.indexed_vectors_count}")

删除操作 #

删除单个点 #

python
client.delete(
    collection_name="documents",
    points_selector=[1, 2, 3]
)

print("已删除 ID 为 1, 2, 3 的点")

按条件删除 #

python
from qdrant_client.models import Filter, FieldCondition, MatchValue

client.delete(
    collection_name="products",
    points_selector=Filter(
        must=[
            FieldCondition(
                key="brand",
                match=MatchValue(value="Samsung")
            )
        ]
    )
)

print("已删除所有 Samsung 产品")

删除集合 #

python
client.delete_collection("batch_demo")

print("集合已删除")

完整示例 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer

client = QdrantClient(":memory:")

model = SentenceTransformer('all-MiniLM-L6-v2')

documents = [
    "Python 是一种流行的编程语言",
    "JavaScript 是网页开发的核心语言",
    "Rust 注重安全和性能",
    "Go 语言适合并发编程",
]

client.create_collection(
    collection_name="programming_languages",
    vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

vectors = model.encode(documents)

points = [
    PointStruct(
        id=i,
        vector=vectors[i].tolist(),
        payload={"text": documents[i]}
    )
    for i in range(len(documents))
]

client.upsert(
    collection_name="programming_languages",
    points=points
)

query = "最适合系统编程的语言"
query_vector = model.encode(query)

results = client.search(
    collection_name="programming_languages",
    query_vector=query_vector.tolist(),
    limit=3
)

print(f"查询: {query}\n")
print("搜索结果:")
for i, result in enumerate(results, 1):
    print(f"{i}. {result.payload['text']}")
    print(f"   相似度: {result.score:.4f}\n")

小结 #

本章通过实际示例学习了:

  • 创建 Qdrant 客户端
  • 创建和管理 Collection
  • 插入和搜索向量
  • 使用 Payload 过滤
  • 批量操作
  • 分页查询

下一步 #

掌握了基本操作后,继续学习 Collection 管理,深入了解集合的配置和管理!

最后更新:2026-04-04