RAG 应用实战 #

本章介绍如何使用 Qdrant 构建检索增强生成(RAG)系统。

RAG 概述 #

text
RAG 架构流程:

┌─────────────────────────────────────────────────────────────┐
│                      RAG 系统                                │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  用户问题                                                    │
│      │                                                       │
│      ↓                                                       │
│  ┌─────────────┐                                             │
│  │  Embedding  │                                             │
│  └─────────────┘                                             │
│      │                                                       │
│      ↓                                                       │
│  ┌─────────────┐      ┌─────────────┐                       │
│  │   Qdrant    │  →   │  相关文档   │                       │
│  │  向量搜索   │      │  检索结果   │                       │
│  └─────────────┘      └─────────────┘                       │
│                              │                               │
│                              ↓                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    LLM                               │   │
│  │  问题 + 相关文档 → 生成回答                          │   │
│  └─────────────────────────────────────────────────────┘   │
│                              │                               │
│                              ↓                               │
│                         最终回答                             │
│                                                              │
└─────────────────────────────────────────────────────────────┘

环境准备 #

安装依赖 #

bash
pip install qdrant-client sentence-transformers langchain langchain-openai

导入库 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional
import os

知识库构建 #

文档数据 #

python
knowledge_base = [
    {
        "id": "kb_001",
        "title": "Qdrant 简介",
        "content": "Qdrant 是一个高性能的开源向量数据库,使用 Rust 编写。它支持高维向量的存储、索引和相似性搜索,是构建 AI 应用的核心基础设施。Qdrant 提供 REST API 和 gRPC 接口,支持 Python、JavaScript、Go 等多种语言的 SDK。",
        "category": "产品介绍"
    },
    {
        "id": "kb_002",
        "title": "Qdrant 安装方式",
        "content": "Qdrant 支持多种安装方式:1. Docker 安装最简单,使用 docker run 命令即可启动;2. Kubernetes 部署适合生产环境;3. Qdrant Cloud 是官方托管的云服务,无需运维;4. Python 内存模式适合快速测试。",
        "category": "安装部署"
    },
    {
        "id": "kb_003",
        "title": "Collection 管理",
        "content": "Collection 是 Qdrant 中最顶层的容器,类似于关系数据库中的表。创建 Collection 时需要指定向量维度和距离度量方式。Qdrant 支持余弦相似度、欧几里得距离和点积三种距离度量。",
        "category": "核心概念"
    },
    {
        "id": "kb_004",
        "title": "向量搜索原理",
        "content": "Qdrant 使用 HNSW(Hierarchical Navigable Small World)算法进行向量索引。HNSW 是一种高效的近似最近邻搜索算法,通过构建多层图结构实现快速的向量检索。搜索时从顶层开始,逐层向下细化,最终返回最相似的结果。",
        "category": "技术原理"
    },
    {
        "id": "kb_005",
        "title": "Payload 过滤",
        "content": "Qdrant 支持丰富的 Payload 过滤功能。可以为向量附加元数据,如文本、数值、标签等,并在搜索时根据这些元数据进行过滤。支持的过滤条件包括精确匹配、范围查询、地理位置过滤等。",
        "category": "核心功能"
    },
    {
        "id": "kb_006",
        "title": "分布式部署",
        "content": "Qdrant 支持分布式集群部署,通过分片和复制实现水平扩展和高可用。分片将数据分布到多个节点,复制为每个分片创建多个副本。集群使用 Raft 协议保证一致性。",
        "category": "部署架构"
    },
    {
        "id": "kb_007",
        "title": "性能优化建议",
        "content": "Qdrant 性能优化建议:1. 调整 HNSW 参数 m 和 ef_construct;2. 使用向量量化减少内存占用;3. 为 Payload 字段创建索引;4. 使用批量操作提高吞吐量;5. 合理配置搜索参数 hnsw_ef。",
        "category": "性能优化"
    },
    {
        "id": "kb_008",
        "title": "RAG 应用场景",
        "content": "RAG(检索增强生成)是 Qdrant 的典型应用场景。RAG 系统首先从知识库中检索相关文档,然后将文档作为上下文提供给大语言模型,生成更准确、更有依据的回答。这种方式解决了 LLM 的知识时效性和幻觉问题。",
        "category": "应用场景"
    }
]

创建知识库索引 #

python
class KnowledgeBase:
    def __init__(self, collection_name: str = "rag_knowledge"):
        self.client = QdrantClient(":memory:")
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.collection_name = collection_name
    
    def build_index(self, documents: List[Dict]):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(size=384, distance=Distance.COSINE)
        )
        
        texts = [doc["content"] for doc in documents]
        embeddings = self.model.encode(texts)
        
        points = [
            PointStruct(
                id=i,
                vector=embeddings[i].tolist(),
                payload={
                    "id": documents[i]["id"],
                    "title": documents[i]["title"],
                    "content": documents[i]["content"],
                    "category": documents[i]["category"]
                }
            )
            for i in range(len(documents))
        ]
        
        self.client.upsert(self.collection_name, points)
        print(f"知识库索引完成,共 {len(points)} 条文档")
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        query_embedding = self.model.encode(query)
        
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding.tolist(),
            limit=top_k
        )
        
        return [
            {
                "id": r.payload["id"],
                "title": r.payload["title"],
                "content": r.payload["content"],
                "category": r.payload["category"],
                "score": r.score
            }
            for r in results
        ]

kb = KnowledgeBase()
kb.build_index(knowledge_base)

RAG 系统实现 #

简单 RAG 实现 #

python
class SimpleRAG:
    def __init__(self, knowledge_base: KnowledgeBase):
        self.kb = knowledge_base
    
    def build_prompt(self, query: str, contexts: List[Dict]) -> str:
        context_text = "\n\n".join([
            f"【{c['title']}】\n{c['content']}"
            for c in contexts
        ])
        
        prompt = f"""基于以下知识库内容回答问题。如果知识库中没有相关信息,请说明。

知识库内容:
{context_text}

问题:{query}

请基于知识库内容给出准确、详细的回答:"""
        
        return prompt
    
    def answer(self, query: str, top_k: int = 3) -> Dict:
        contexts = self.kb.retrieve(query, top_k)
        
        prompt = self.build_prompt(query, contexts)
        
        return {
            "query": query,
            "contexts": contexts,
            "prompt": prompt
        }

rag = SimpleRAG(kb)
result = rag.answer("如何安装 Qdrant?")

print("检索到的相关文档:")
for ctx in result["contexts"]:
    print(f"  - {ctx['title']} (相关度: {ctx['score']:.4f})")

print("\n生成的提示词:")
print(result["prompt"][:500] + "...")

LangChain 集成 #

python
from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStoreRetriever
from langchain_core.language_models import BaseLLM
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

class QdrantVectorStore:
    def __init__(self, knowledge_base: KnowledgeBase):
        self.kb = knowledge_base
    
    def similarity_search(self, query: str, k: int = 3) -> List[Document]:
        results = self.kb.retrieve(query, k)
        
        return [
            Document(
                page_content=r["content"],
                metadata={
                    "id": r["id"],
                    "title": r["title"],
                    "category": r["category"],
                    "score": r["score"]
                }
            )
            for r in results
        ]
    
    def as_retriever(self, search_kwargs: Optional[Dict] = None):
        search_kwargs = search_kwargs or {"k": 3}
        
        class Retriever:
            def __init__(self, store, kwargs):
                self.store = store
                self.kwargs = kwargs
            
            def invoke(self, query: str) -> List[Document]:
                return self.store.similarity_search(query, k=self.kwargs.get("k", 3))
        
        return Retriever(self, search_kwargs)

vector_store = QdrantVectorStore(kb)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})

docs = retriever.invoke("Qdrant 是什么?")
print(f"检索到 {len(docs)} 个文档")
for doc in docs:
    print(f"  - {doc.metadata['title']}")

RAG 提示词模板 #

python
RAG_PROMPT_TEMPLATE = """你是一个专业的问答助手。请基于提供的上下文信息回答用户问题。

要求:
1. 只使用上下文中的信息回答
2. 如果上下文中没有相关信息,请明确说明
3. 回答要准确、详细、有条理
4. 可以引用具体的上下文来源

上下文信息:
{context}

用户问题:{question}

请给出回答:"""

def format_docs(docs: List[Document]) -> str:
    return "\n\n".join([
        f"【{doc.metadata.get('title', '未知')}】\n{doc.page_content}"
        for doc in docs
    ])

def build_rag_chain(retriever, llm=None):
    def rag_pipeline(question: str) -> str:
        docs = retriever.invoke(question)
        context = format_docs(docs)
        
        prompt = RAG_PROMPT_TEMPLATE.format(
            context=context,
            question=question
        )
        
        return prompt
    
    return rag_pipeline

rag_chain = build_rag_chain(retriever)

prompt = rag_chain("Qdrant 支持哪些距离度量方式?")
print(prompt)

高级 RAG 功能 #

多轮对话支持 #

python
class ConversationalRAG:
    def __init__(self, knowledge_base: KnowledgeBase):
        self.kb = knowledge_base
        self.conversation_history = []
    
    def build_conversational_prompt(self, query: str, contexts: List[Dict]) -> str:
        history_text = ""
        if self.conversation_history:
            history_text = "\n\n历史对话:\n"
            for turn in self.conversation_history[-3:]:
                history_text += f"问:{turn['query']}\n答:{turn['answer'][:200]}...\n"
        
        context_text = "\n\n".join([
            f"【{c['title']}】\n{c['content']}"
            for c in contexts
        ])
        
        prompt = f"""你是一个专业的问答助手。请基于提供的上下文信息和对话历史回答用户问题。

上下文信息:
{context_text}
{history_text}
当前问题:{query}

请给出回答:"""
        
        return prompt
    
    def chat(self, query: str) -> Dict:
        contexts = self.kb.retrieve(query)
        
        prompt = self.build_conversational_prompt(query, contexts)
        
        self.conversation_history.append({
            "query": query,
            "contexts": contexts
        })
        
        return {
            "query": query,
            "contexts": contexts,
            "prompt": prompt
        }

conv_rag = ConversationalRAG(kb)

result1 = conv_rag.chat("Qdrant 是什么?")
print("第一轮对话提示词已生成")

result2 = conv_rag.chat("它支持哪些安装方式?")
print("第二轮对话提示词已生成(包含历史上下文)")

混合检索 #

python
class HybridRAG:
    def __init__(self, knowledge_base: KnowledgeBase):
        self.kb = knowledge_base
    
    def keyword_search(self, query: str, documents: List[Dict]) -> List[Dict]:
        query_words = set(query.lower().split())
        
        scored = []
        for doc in documents:
            content_words = set(doc["content"].lower().split())
            overlap = len(query_words & content_words)
            scored.append((doc, overlap))
        
        scored.sort(key=lambda x: x[1], reverse=True)
        return [s[0] for s in scored[:3]]
    
    def hybrid_retrieve(self, query: str, alpha: float = 0.5) -> List[Dict]:
        semantic_results = self.kb.retrieve(query, top_k=5)
        
        keyword_results = self.keyword_search(query, knowledge_base)
        
        combined = {}
        
        for i, result in enumerate(semantic_results):
            doc_id = result["id"]
            score = result["score"] * alpha
            combined[doc_id] = {
                **result,
                "semantic_score": result["score"],
                "keyword_score": 0,
                "final_score": score
            }
        
        for i, result in enumerate(keyword_results):
            doc_id = result["id"]
            keyword_score = (len(keyword_results) - i) / len(keyword_results) * (1 - alpha)
            
            if doc_id in combined:
                combined[doc_id]["keyword_score"] = keyword_score
                combined[doc_id]["final_score"] += keyword_score
            else:
                combined[doc_id] = {
                    **result,
                    "semantic_score": 0,
                    "keyword_score": keyword_score,
                    "final_score": keyword_score
                }
        
        results = sorted(combined.values(), key=lambda x: x["final_score"], reverse=True)
        
        return results[:3]

hybrid_rag = HybridRAG(kb)
results = hybrid_rag.hybrid_retrieve("Qdrant 安装部署")

print("混合检索结果:")
for r in results:
    print(f"  - {r['title']}")
    print(f"    语义分数: {r['semantic_score']:.4f}, 关键词分数: {r['keyword_score']:.4f}")
    print(f"    最终分数: {r['final_score']:.4f}")

重排序 #

python
class RerankedRAG:
    def __init__(self, knowledge_base: KnowledgeBase):
        self.kb = knowledge_base
    
    def rerank(self, query: str, documents: List[Dict]) -> List[Dict]:
        query_embedding = self.kb.model.encode(query)
        
        reranked = []
        for doc in documents:
            doc_embedding = self.kb.model.encode(doc["content"])
            
            similarity = np.dot(query_embedding, doc_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
            )
            
            reranked.append({
                **doc,
                "rerank_score": float(similarity)
            })
        
        reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
        
        return reranked
    
    def retrieve_with_rerank(self, query: str, top_k: int = 5, final_k: int = 3) -> List[Dict]:
        initial_results = self.kb.retrieve(query, top_k=top_k)
        
        reranked = self.rerank(query, initial_results)
        
        return reranked[:final_k]

reranked_rag = RerankedRAG(kb)
results = reranked_rag.retrieve_with_rerank("Qdrant 性能优化")

print("重排序后的结果:")
for r in results:
    print(f"  - {r['title']} (重排序分数: {r['rerank_score']:.4f})")

评估与优化 #

检索质量评估 #

python
def evaluate_retrieval(kb: KnowledgeBase, test_cases: List[Dict]) -> Dict:
    total = len(test_cases)
    hits = 0
    mrr_sum = 0
    
    for case in test_cases:
        query = case["query"]
        expected_ids = set(case["relevant_docs"])
        
        results = kb.retrieve(query, top_k=5)
        retrieved_ids = {r["id"] for r in results}
        
        if retrieved_ids & expected_ids:
            hits += 1
        
        for i, result in enumerate(results):
            if result["id"] in expected_ids:
                mrr_sum += 1 / (i + 1)
                break
    
    return {
        "hit_rate": hits / total,
        "mrr": mrr_sum / total
    }

test_cases = [
    {
        "query": "Qdrant 如何安装",
        "relevant_docs": ["kb_002"]
    },
    {
        "query": "向量搜索的原理",
        "relevant_docs": ["kb_004"]
    },
    {
        "query": "Qdrant 性能优化",
        "relevant_docs": ["kb_007"]
    }
]

eval_results = evaluate_retrieval(kb, test_cases)
print(f"命中率: {eval_results['hit_rate']:.2%}")
print(f"MRR: {eval_results['mrr']:.4f}")

完整 RAG 示例 #

python
class ProductionRAG:
    def __init__(self):
        self.kb = KnowledgeBase("production_rag")
        self.kb.build_index(knowledge_base)
        self.conversation_history = []
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        return self.kb.retrieve(query, top_k)
    
    def build_prompt(self, query: str, contexts: List[Dict]) -> str:
        context_text = "\n\n".join([
            f"【{c['title']}】\n{c['content']}"
            for c in contexts
        ])
        
        history_text = ""
        if self.conversation_history:
            history_text = "\n\n历史对话:\n"
            for turn in self.conversation_history[-2:]:
                history_text += f"问:{turn['query']}\n答:{turn['answer'][:100]}...\n"
        
        return f"""基于以下知识库内容回答问题。

知识库内容:
{context_text}
{history_text}
当前问题:{query}

请给出准确、详细的回答:"""
    
    def query(self, question: str) -> Dict:
        contexts = self.retrieve(question)
        
        prompt = self.build_prompt(question, contexts)
        
        return {
            "question": question,
            "contexts": contexts,
            "prompt": prompt,
            "sources": [c["title"] for c in contexts]
        }

prod_rag = ProductionRAG()

result = prod_rag.query("如何在生产环境部署 Qdrant?")

print("问题:", result["question"])
print("\n相关文档:")
for ctx in result["contexts"]:
    print(f"  - {ctx['title']} (相关度: {ctx['score']:.4f})")

print("\n生成的提示词(前 500 字符):")
print(result["prompt"][:500] + "...")

小结 #

本章实现了完整的 RAG 系统:

  • 知识库构建和索引
  • 基础 RAG 实现
  • LangChain 集成
  • 多轮对话支持
  • 混合检索和重排序
  • 检索质量评估

下一步 #

继续学习 推荐系统,了解如何构建个性化推荐应用!

最后更新:2026-04-04