语义搜索实战 #

本章通过一个完整的语义搜索项目,展示 Qdrant 的实际应用。

项目概述 #

text
语义搜索系统架构:

┌─────────────────────────────────────────────────────────────┐
│                    语义搜索系统                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  数据准备              向量索引              搜索服务        │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐ │
│  │ 文档收集    │      │ Embedding   │      │ 查询处理    │ │
│  │ 文本清洗    │  →   │ 向量存储    │  →   │ 相似性搜索  │ │
│  │ 分块处理    │      │ Qdrant      │      │ 结果排序    │ │
│  └─────────────┘      └─────────────┘      └─────────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

环境准备 #

安装依赖 #

bash
pip install qdrant-client sentence-transformers numpy pandas tqdm

导入库 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    PointStruct,
    Filter,
    FieldCondition,
    MatchValue
)
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Optional

文档处理 #

文档数据结构 #

python
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Document:
    id: str
    title: str
    content: str
    category: str
    tags: List[str]
    created_at: datetime
    metadata: Dict

sample_documents = [
    Document(
        id="doc_001",
        title="机器学习入门指南",
        content="机器学习是人工智能的核心技术之一,它使计算机能够从数据中学习模式并做出预测。本文介绍机器学习的基本概念、常用算法和实践方法。",
        category="AI",
        tags=["机器学习", "AI", "教程"],
        created_at=datetime(2024, 1, 1),
        metadata={"author": "张三", "views": 1000}
    ),
    Document(
        id="doc_002",
        title="深度学习与神经网络",
        content="深度学习使用多层神经网络来学习数据的层次表示。卷积神经网络适合处理图像,循环神经网络适合处理序列数据。",
        category="AI",
        tags=["深度学习", "神经网络", "CNN"],
        created_at=datetime(2024, 1, 2),
        metadata={"author": "李四", "views": 800}
    ),
    Document(
        id="doc_003",
        title="Python 编程最佳实践",
        content="Python 是一门简洁优雅的编程语言。本文介绍 Python 的代码风格、性能优化技巧和常见陷阱。",
        category="编程",
        tags=["Python", "编程", "最佳实践"],
        created_at=datetime(2024, 1, 3),
        metadata={"author": "王五", "views": 1500}
    ),
    Document(
        id="doc_004",
        title="向量数据库原理与应用",
        content="向量数据库专门用于存储和检索高维向量数据。Qdrant 是一个高性能的开源向量数据库,支持相似性搜索和过滤查询。",
        category="数据库",
        tags=["向量数据库", "Qdrant", "搜索"],
        created_at=datetime(2024, 1, 4),
        metadata={"author": "赵六", "views": 600}
    ),
    Document(
        id="doc_005",
        title="自然语言处理技术概览",
        content="自然语言处理(NLP)让计算机能够理解和生成人类语言。主要技术包括分词、命名实体识别、情感分析等。",
        category="AI",
        tags=["NLP", "自然语言处理", "文本分析"],
        created_at=datetime(2024, 1, 5),
        metadata={"author": "张三", "views": 900}
    )
]

文本分块 #

python
from typing import List

def chunk_text(text: str, chunk_size: int = 200, overlap: int = 50) -> List[str]:
    words = text.split()
    chunks = []
    
    for i in range(0, len(words), chunk_size - overlap):
        chunk = " ".join(words[i:i + chunk_size])
        chunks.append(chunk)
    
    return chunks

def process_document(document: Document) -> List[Dict]:
    chunks = chunk_text(document.content)
    
    processed = []
    for i, chunk in enumerate(chunks):
        processed.append({
            "id": f"{document.id}_chunk_{i}",
            "document_id": document.id,
            "title": document.title,
            "content": chunk,
            "category": document.category,
            "tags": document.tags,
            "chunk_index": i,
            "total_chunks": len(chunks),
            "created_at": document.created_at.isoformat(),
            "metadata": document.metadata
        })
    
    return processed

processed_docs = []
for doc in sample_documents:
    processed_docs.extend(process_document(doc))

print(f"处理了 {len(sample_documents)} 个文档,生成 {len(processed_docs)} 个块")

向量索引 #

创建 Collection #

python
client = QdrantClient(":memory:")

client.create_collection(
    collection_name="semantic_search",
    vectors_config=VectorParams(
        size=384,
        distance=Distance.COSINE
    )
)

print("Collection 创建成功")

生成 Embedding #

python
model = SentenceTransformer('all-MiniLM-L6-v2')

def generate_embeddings(texts: List[str]) -> np.ndarray:
    embeddings = model.encode(texts, show_progress_bar=True)
    return embeddings

texts = [doc["content"] for doc in processed_docs]
embeddings = generate_embeddings(texts)

print(f"生成了 {len(embeddings)} 个向量,维度: {embeddings.shape[1]}")

索引文档 #

python
def index_documents(client: QdrantClient, collection_name: str, documents: List[Dict], embeddings: np.ndarray):
    points = []
    
    for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
        point = PointStruct(
            id=i,
            vector=embedding.tolist(),
            payload=doc
        )
        points.append(point)
    
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    
    print(f"索引了 {len(points)} 个文档块")

index_documents(client, "semantic_search", processed_docs, embeddings)

搜索功能 #

基础搜索 #

python
def search(query: str, limit: int = 5) -> List[Dict]:
    query_embedding = model.encode(query)
    
    results = client.search(
        collection_name="semantic_search",
        query_vector=query_embedding.tolist(),
        limit=limit
    )
    
    return [
        {
            "id": result.id,
            "score": result.score,
            "title": result.payload["title"],
            "content": result.payload["content"],
            "category": result.payload["category"],
            "tags": result.payload["tags"]
        }
        for result in results
    ]

results = search("如何学习人工智能")
for i, result in enumerate(results, 1):
    print(f"\n{i}. {result['title']} (分数: {result['score']:.4f})")
    print(f"   内容: {result['content'][:100]}...")
    print(f"   标签: {', '.join(result['tags'])}")

带过滤的搜索 #

python
def search_with_filter(query: str, category: Optional[str] = None, limit: int = 5) -> List[Dict]:
    query_embedding = model.encode(query)
    
    query_filter = None
    if category:
        query_filter = Filter(
            must=[
                FieldCondition(
                    key="category",
                    match=MatchValue(value=category)
                )
            ]
        )
    
    results = client.search(
        collection_name="semantic_search",
        query_vector=query_embedding.tolist(),
        query_filter=query_filter,
        limit=limit
    )
    
    return [
        {
            "id": result.id,
            "score": result.score,
            "title": result.payload["title"],
            "content": result.payload["content"],
            "category": result.payload["category"]
        }
        for result in results
    ]

ai_results = search_with_filter("编程技术", category="AI")
print(f"AI 分类下的搜索结果: {len(ai_results)} 条")

混合搜索 #

python
def hybrid_search(query: str, keywords: List[str], limit: int = 5) -> List[Dict]:
    query_embedding = model.encode(query)
    
    results = client.search(
        collection_name="semantic_search",
        query_vector=query_embedding.tolist(),
        limit=limit * 2
    )
    
    scored_results = []
    for result in results:
        score = result.score
        
        keyword_bonus = 0
        content_lower = result.payload["content"].lower()
        for keyword in keywords:
            if keyword.lower() in content_lower:
                keyword_bonus += 0.1
        
        final_score = score + keyword_bonus
        
        scored_results.append({
            "id": result.id,
            "score": final_score,
            "semantic_score": result.score,
            "keyword_bonus": keyword_bonus,
            "title": result.payload["title"],
            "content": result.payload["content"]
        })
    
    scored_results.sort(key=lambda x: x["score"], reverse=True)
    
    return scored_results[:limit]

hybrid_results = hybrid_search("机器学习算法", keywords=["机器学习", "算法"])
for result in hybrid_results:
    print(f"{result['title']}: {result['score']:.4f} (语义: {result['semantic_score']:.4f}, 关键词: +{result['keyword_bonus']:.2f})")

搜索结果处理 #

结果去重 #

python
def deduplicate_results(results: List[Dict]) -> List[Dict]:
    seen_documents = set()
    unique_results = []
    
    for result in results:
        doc_id = result.get("document_id", result["id"])
        
        if doc_id not in seen_documents:
            seen_documents.add(doc_id)
            unique_results.append(result)
    
    return unique_results

all_results = search("人工智能", limit=10)
unique_results = deduplicate_results(all_results)
print(f"去重前: {len(all_results)}, 去重后: {len(unique_results)}")

结果高亮 #

python
def highlight_text(text: str, query: str, max_length: int = 200) -> str:
    query_words = query.lower().split()
    
    text_lower = text.lower()
    
    highlighted = text
    for word in query_words:
        if len(word) > 1:
            import re
            pattern = re.compile(re.escape(word), re.IGNORECASE)
            highlighted = pattern.sub(f"**{word}**", highlighted)
    
    if len(highlighted) > max_length:
        highlighted = highlighted[:max_length] + "..."
    
    return highlighted

query = "机器学习"
results = search(query, limit=3)

for result in results:
    highlighted = highlight_text(result["content"], query)
    print(f"\n{result['title']}")
    print(f"  {highlighted}")

分页支持 #

python
def paginated_search(query: str, page: int = 1, page_size: int = 10) -> Dict:
    query_embedding = model.encode(query)
    
    offset = (page - 1) * page_size
    
    results = client.search(
        collection_name="semantic_search",
        query_vector=query_embedding.tolist(),
        limit=page_size,
        offset=offset
    )
    
    total = client.get_collection("semantic_search").points_count
    
    return {
        "results": [
            {
                "id": r.id,
                "score": r.score,
                "title": r.payload["title"],
                "content": r.payload["content"]
            }
            for r in results
        ],
        "page": page,
        "page_size": page_size,
        "total": total,
        "total_pages": (total + page_size - 1) // page_size
    }

page1 = paginated_search("人工智能", page=1, page_size=3)
print(f"第 1 页,共 {page1['total_pages']} 页")
for result in page1["results"]:
    print(f"  - {result['title']}")

搜索建议 #

查询扩展 #

python
def expand_query(query: str) -> List[str]:
    expansions = {
        "AI": ["人工智能", "机器学习", "深度学习"],
        "机器学习": ["ML", "人工智能", "深度学习"],
        "Python": ["编程", "开发", "脚本"],
        "数据库": ["DB", "存储", "SQL"]
    }
    
    expanded = [query]
    
    for key, synonyms in expansions.items():
        if key.lower() in query.lower():
            expanded.extend(synonyms)
    
    return expanded

expanded = expand_query("AI 技术")
print(f"扩展查询: {expanded}")

相关搜索 #

python
def get_related_searches(query: str, limit: int = 5) -> List[str]:
    results = search(query, limit=limit)
    
    related = set()
    for result in results:
        related.update(result["tags"])
    
    return list(related)[:limit]

related = get_related_searches("机器学习")
print(f"相关搜索: {related}")

性能优化 #

批量搜索 #

python
from concurrent.futures import ThreadPoolExecutor

def batch_search(queries: List[str], limit: int = 5, workers: int = 4) -> Dict[str, List[Dict]]:
    def search_one(query: str) -> tuple:
        results = search(query, limit)
        return (query, results)
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(search_one, queries))
    
    return dict(results)

queries = ["人工智能", "Python", "数据库"]
batch_results = batch_search(queries)

for query, results in batch_results.items():
    print(f"\n查询: {query}")
    for r in results[:2]:
        print(f"  - {r['title']}")

缓存机制 #

python
from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def cached_search(query: str, limit: int = 5) -> List[Dict]:
    return search(query, limit)

def search_with_cache(query: str, limit: int = 5) -> List[Dict]:
    cache_key = hashlib.md5(f"{query}_{limit}".encode()).hexdigest()
    return cached_search(query, limit)

import time

start = time.time()
results1 = search_with_cache("机器学习", 5)
first_time = time.time() - start

start = time.time()
results2 = search_with_cache("机器学习", 5)
cached_time = time.time() - start

print(f"首次查询: {first_time*1000:.2f}ms, 缓存查询: {cached_time*1000:.2f}ms")

完整示例 #

python
class SemanticSearchEngine:
    def __init__(self, collection_name: str = "semantic_search"):
        self.client = QdrantClient(":memory:")
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.collection_name = collection_name
    
    def create_index(self, documents: List[Document]):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(size=384, distance=Distance.COSINE)
        )
        
        processed = []
        for doc in documents:
            processed.extend(process_document(doc))
        
        texts = [p["content"] for p in processed]
        embeddings = self.model.encode(texts)
        
        points = [
            PointStruct(id=i, vector=embeddings[i].tolist(), payload=processed[i])
            for i in range(len(processed))
        ]
        
        self.client.upsert(self.collection_name, points)
        print(f"索引了 {len(points)} 个文档块")
    
    def search(self, query: str, category: Optional[str] = None, limit: int = 5) -> List[Dict]:
        query_embedding = self.model.encode(query)
        
        query_filter = None
        if category:
            query_filter = Filter(
                must=[FieldCondition(key="category", match=MatchValue(value=category))]
            )
        
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding.tolist(),
            query_filter=query_filter,
            limit=limit
        )
        
        return [
            {
                "score": r.score,
                "title": r.payload["title"],
                "content": r.payload["content"],
                "category": r.payload["category"],
                "tags": r.payload["tags"]
            }
            for r in results
        ]

engine = SemanticSearchEngine()
engine.create_index(sample_documents)

results = engine.search("如何学习人工智能")
print("\n搜索结果:")
for i, r in enumerate(results, 1):
    print(f"{i}. {r['title']} (分数: {r['score']:.4f})")

小结 #

本章实现了一个完整的语义搜索系统:

  • 文档处理和分块
  • 向量索引和 Embedding
  • 多种搜索模式
  • 结果处理和优化
  • 性能优化技巧

下一步 #

继续学习 RAG 应用,了解如何构建检索增强生成系统!

最后更新:2026-04-04