RAG 应用实战 #

本章介绍如何使用 Weaviate 构建检索增强生成(RAG)系统。

RAG 概述 #

text
RAG 架构流程:

┌─────────────────────────────────────────────────────────────┐
│                      RAG 系统                                │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  用户问题                                                    │
│      │                                                       │
│      ▼                                                       │
│  ┌─────────────┐                                             │
│  │  Embedding  │                                             │
│  │   模型      │                                             │
│  └─────────────┘                                             │
│      │                                                       │
│      ▼                                                       │
│  ┌─────────────┐      ┌─────────────┐                       │
│  │  Weaviate   │  →   │  相关文档   │                       │
│  │  向量搜索   │      │  检索结果   │                       │
│  └─────────────┘      └─────────────┘                       │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    LLM                               │   │
│  │  问题 + 相关文档 → 生成回答                          │   │
│  └─────────────────────────────────────────────────────┘   │
│                              │                               │
│                              ▼                               │
│                         最终回答                             │
│                                                              │
└─────────────────────────────────────────────────────────────┘

环境准备 #

安装依赖 #

bash
pip install weaviate-client sentence-transformers openai

启动服务 #

yaml
version: '3.8'
services:
  weaviate:
    image: cr.weaviate.io/semitechnologies/weaviate:1.25.0
    ports:
      - "8080:8080"
      - "50051:50051"
    environment:
      QUERY_DEFAULTS_LIMIT: 25
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
      DEFAULT_VECTORIZER_MODULE: 'none'

知识库构建 #

创建知识库 Collection #

python
import weaviate
import weaviate.classes as wvc

client = weaviate.connect_to_local()

knowledge_base = client.collections.create(
    name="KnowledgeBase",
    vectorizer_config=wvc.config.Configure.Vectorizer.none(),
    properties=[
        wvc.config.Property(name="question", data_type=wvc.config.DataType.TEXT),
        wvc.config.Property(name="answer", data_type=wvc.config.DataType.TEXT),
        wvc.config.Property(name="category", data_type=wvc.config.DataType.TEXT),
        wvc.config.Property(name="source", data_type=wvc.config.DataType.TEXT)
    ]
)

准备知识库数据 #

python
knowledge_data = [
    {
        "question": "Weaviate 是什么?",
        "answer": "Weaviate 是一个开源的云原生向量数据库,专为 AI 应用设计。它支持语义搜索、RAG、知识图谱等场景,使用 Go 语言编写,具有高性能和易扩展的特点。",
        "category": "产品介绍",
        "source": "官方文档"
    },
    {
        "question": "Weaviate 支持哪些向量化模块?",
        "answer": "Weaviate 支持多种向量化模块:text2vec-openai(OpenAI Embeddings)、text2vec-cohere(Cohere Embeddings)、text2vec-huggingface(HuggingFace 模型)、text2vec-palm(Google PaLM)、multi2vec-clip(多模态 CLIP)等。",
        "category": "功能特性",
        "source": "官方文档"
    },
    {
        "question": "如何安装 Weaviate?",
        "answer": "Weaviate 支持多种安装方式:1. Docker Compose 最简单,适合开发环境;2. Kubernetes 部署适合生产环境;3. Weaviate Cloud 是官方托管服务,无需运维;4. Embedded 模式适合快速测试。",
        "category": "安装部署",
        "source": "官方文档"
    },
    {
        "question": "Weaviate 的核心概念有哪些?",
        "answer": "Weaviate 的核心概念包括:Schema(模式定义)、Collection/Class(集合/类)、Object(对象)、Vector(向量)、Module(模块)、Property(属性)、Reference(引用)等。",
        "category": "核心概念",
        "source": "官方文档"
    },
    {
        "question": "什么是 HNSW 索引?",
        "answer": "HNSW(Hierarchical Navigable Small World)是高效的近似最近邻搜索算法。它通过构建多层图结构实现快速向量检索,时间复杂度 O(log n),是 Weaviate 的核心索引技术。",
        "category": "技术原理",
        "source": "技术博客"
    },
    {
        "question": "Weaviate 如何实现语义搜索?",
        "answer": "Weaviate 通过将文本转换为向量,然后在高维空间中计算向量相似性来实现语义搜索。支持 nearText、nearVector、nearObject 等搜索方式,还可以结合过滤条件进行精确检索。",
        "category": "功能特性",
        "source": "官方文档"
    },
    {
        "question": "什么是 RAG?",
        "answer": "RAG(Retrieval-Augmented Generation)即检索增强生成,是一种结合检索和生成的技术。它首先从知识库检索相关文档,然后将文档作为上下文提供给大语言模型,生成更准确、更有依据的回答。",
        "category": "技术概念",
        "source": "技术博客"
    },
    {
        "question": "Weaviate 支持哪些距离度量方式?",
        "answer": "Weaviate 支持三种距离度量方式:1. 余弦相似度(Cosine)- 最常用,关注向量方向;2. 点积(Dot Product)- 计算最快,适合归一化向量;3. L2 距离(欧几里得)- 计算绝对距离,适合图像特征。",
        "category": "技术原理",
        "source": "官方文档"
    },
    {
        "question": "如何优化 Weaviate 性能?",
        "answer": "Weaviate 性能优化方法:1. 调整 HNSW 参数(efConstruction、maxConnections);2. 使用向量量化(PQ、BQ)减少内存;3. 为过滤字段创建索引;4. 使用批量操作提高吞吐量;5. 合理配置硬件资源。",
        "category": "性能优化",
        "source": "官方文档"
    },
    {
        "question": "Weaviate 如何实现高可用?",
        "answer": "Weaviate 通过分片和复制实现高可用:分片将数据分布到多个节点,复制为每个分片创建多个副本。支持多节点集群部署,使用 Raft 协议保证一致性,可容忍节点故障。",
        "category": "部署架构",
        "source": "官方文档"
    }
]

导入知识库数据 #

python
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

def get_embedding(text):
    return model.encode(text).tolist()

knowledge_base = client.collections.get("KnowledgeBase")

with knowledge_base.batch.dynamic() as batch:
    for item in knowledge_data:
        text = f"{item['question']} {item['answer']}"
        vector = get_embedding(text)
        
        batch.add_object(
            properties={
                "question": item["question"],
                "answer": item["answer"],
                "category": item["category"],
                "source": item["source"]
            },
            vector=vector
        )

print(f"知识库导入完成,共 {len(knowledge_data)} 条数据")

RAG 系统实现 #

基础 RAG 类 #

python
from openai import OpenAI
from typing import List, Dict, Optional

openai_client = OpenAI()

class SimpleRAG:
    def __init__(self, collection_name: str = "KnowledgeBase"):
        self.collection = client.collections.get(collection_name)
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
    
    def get_embedding(self, text: str) -> List[float]:
        return self.model.encode(text).tolist()
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        query_vector = self.get_embedding(query)
        
        response = self.collection.query.near_vector(
            near_vector=query_vector,
            limit=top_k,
            return_properties=["question", "answer", "category", "source"]
        )
        
        return [
            {
                "question": obj.properties["question"],
                "answer": obj.properties["answer"],
                "category": obj.properties["category"],
                "source": obj.properties["source"],
                "distance": obj.metadata.distance
            }
            for obj in response.objects
        ]
    
    def build_prompt(self, query: str, contexts: List[Dict]) -> str:
        context_text = "\n\n".join([
            f"【问题】{c['question']}\n【回答】{c['answer']}"
            for c in contexts
        ])
        
        prompt = f"""基于以下知识库内容回答用户问题。如果知识库中没有相关信息,请说明。

知识库内容:
{context_text}

用户问题:{query}

请基于知识库内容给出准确、详细的回答:"""
        
        return prompt
    
    def generate(self, prompt: str) -> str:
        response = openai_client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "你是一个专业的问答助手,请基于提供的知识库内容回答问题。"},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7,
            max_tokens=1000
        )
        
        return response.choices[0].message.content
    
    def query(self, question: str, top_k: int = 3) -> Dict:
        contexts = self.retrieve(question, top_k)
        
        prompt = self.build_prompt(question, contexts)
        
        answer = self.generate(prompt)
        
        return {
            "question": question,
            "answer": answer,
            "contexts": contexts
        }

rag = SimpleRAG()

使用 RAG 系统 #

python
result = rag.query("Weaviate 是什么?")

print(f"问题: {result['question']}")
print(f"\n回答: {result['answer']}")
print(f"\n参考来源:")
for ctx in result['contexts']:
    print(f"- {ctx['source']}: {ctx['question']}")

高级 RAG 功能 #

带过滤的 RAG #

python
from weaviate.classes.query import Filter

class FilteredRAG(SimpleRAG):
    def retrieve_with_filter(
        self, 
        query: str, 
        category: Optional[str] = None,
        top_k: int = 3
    ) -> List[Dict]:
        query_vector = self.get_embedding(query)
        
        filters = None
        if category:
            filters = Filter.by_property("category").equal(category)
        
        response = self.collection.query.near_vector(
            near_vector=query_vector,
            filters=filters,
            limit=top_k,
            return_properties=["question", "answer", "category", "source"]
        )
        
        return [
            {
                "question": obj.properties["question"],
                "answer": obj.properties["answer"],
                "category": obj.properties["category"],
                "source": obj.properties["source"],
                "distance": obj.metadata.distance
            }
            for obj in response.objects
        ]
    
    def query_with_filter(
        self, 
        question: str, 
        category: Optional[str] = None,
        top_k: int = 3
    ) -> Dict:
        contexts = self.retrieve_with_filter(question, category, top_k)
        prompt = self.build_prompt(question, contexts)
        answer = self.generate(prompt)
        
        return {
            "question": question,
            "answer": answer,
            "contexts": contexts
        }

filtered_rag = FilteredRAG()

result = filtered_rag.query_with_filter(
    "如何优化性能?",
    category="性能优化"
)

print(f"回答: {result['answer']}")

多轮对话 RAG #

python
class ConversationalRAG(SimpleRAG):
    def __init__(self, collection_name: str = "KnowledgeBase"):
        super().__init__(collection_name)
        self.conversation_history = []
    
    def build_conversational_prompt(
        self, 
        query: str, 
        contexts: List[Dict]
    ) -> str:
        context_text = "\n\n".join([
            f"【问题】{c['question']}\n【回答】{c['answer']}"
            for c in contexts
        ])
        
        history_text = ""
        if self.conversation_history:
            history_text = "\n\n历史对话:\n"
            for turn in self.conversation_history[-3:]:
                history_text += f"问:{turn['query']}\n答:{turn['answer'][:200]}...\n"
        
        prompt = f"""你是一个专业的问答助手。请基于提供的知识库内容和对话历史回答用户问题。

知识库内容:
{context_text}
{history_text}
当前问题:{query}

请给出准确、详细的回答:"""
        
        return prompt
    
    def chat(self, question: str, top_k: int = 3) -> Dict:
        contexts = self.retrieve(question, top_k)
        
        prompt = self.build_conversational_prompt(question, contexts)
        
        answer = self.generate(prompt)
        
        self.conversation_history.append({
            "query": question,
            "answer": answer,
            "contexts": contexts
        })
        
        return {
            "question": question,
            "answer": answer,
            "contexts": contexts
        }

conv_rag = ConversationalRAG()

result1 = conv_rag.chat("Weaviate 是什么?")
print(f"回答1: {result1['answer'][:200]}...")

result2 = conv_rag.chat("它支持哪些安装方式?")
print(f"\n回答2: {result2['answer'][:200]}...")

混合检索 RAG #

python
class HybridRAG(SimpleRAG):
    def hybrid_retrieve(
        self, 
        query: str, 
        alpha: float = 0.5,
        top_k: int = 5
    ) -> List[Dict]:
        query_vector = self.get_embedding(query)
        
        response = self.collection.query.hybrid(
            query=query,
            vector=query_vector,
            alpha=alpha,
            limit=top_k,
            query_properties=["question", "answer"],
            return_properties=["question", "answer", "category", "source"]
        )
        
        return [
            {
                "question": obj.properties["question"],
                "answer": obj.properties["answer"],
                "category": obj.properties["category"],
                "source": obj.properties["source"],
                "score": obj.metadata.score
            }
            for obj in response.objects
        ]
    
    def query_hybrid(
        self, 
        question: str, 
        alpha: float = 0.5,
        top_k: int = 5
    ) -> Dict:
        contexts = self.hybrid_retrieve(question, alpha, top_k)
        prompt = self.build_prompt(question, contexts)
        answer = self.generate(prompt)
        
        return {
            "question": question,
            "answer": answer,
            "contexts": contexts
        }

hybrid_rag = HybridRAG()

result = hybrid_rag.query_hybrid("向量数据库安装", alpha=0.7)

print(f"回答: {result['answer']}")

RAG 评估 #

检索质量评估 #

python
def evaluate_retrieval(rag: SimpleRAG, test_cases: List[Dict]) -> Dict:
    total = len(test_cases)
    hits = 0
    mrr_sum = 0
    
    for case in test_cases:
        query = case["query"]
        expected_sources = set(case["relevant_sources"])
        
        contexts = rag.retrieve(query, top_k=5)
        retrieved_sources = {c["source"] for c in contexts}
        
        if retrieved_sources & expected_sources:
            hits += 1
        
        for i, ctx in enumerate(contexts):
            if ctx["source"] in expected_sources:
                mrr_sum += 1 / (i + 1)
                break
    
    return {
        "hit_rate": hits / total,
        "mrr": mrr_sum / total
    }

test_cases = [
    {
        "query": "Weaviate 安装方式",
        "relevant_sources": ["官方文档"]
    },
    {
        "query": "向量索引原理",
        "relevant_sources": ["技术博客", "官方文档"]
    },
    {
        "query": "性能优化方法",
        "relevant_sources": ["官方文档"]
    }
]

eval_results = evaluate_retrieval(rag, test_cases)
print(f"命中率: {eval_results['hit_rate']:.2%}")
print(f"MRR: {eval_results['mrr']:.4f}")

完整示例 #

python
import weaviate
import weaviate.classes as wvc
from sentence_transformers import SentenceTransformer
from openai import OpenAI
from typing import List, Dict

client = weaviate.connect_to_local()

embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
openai_client = OpenAI()

client.collections.delete("KnowledgeBase")

knowledge_base = client.collections.create(
    name="KnowledgeBase",
    vectorizer_config=wvc.config.Configure.Vectorizer.none(),
    properties=[
        wvc.config.Property(name="question", data_type=wvc.config.DataType.TEXT),
        wvc.config.Property(name="answer", data_type=wvc.config.DataType.TEXT),
        wvc.config.Property(name="category", data_type=wvc.config.DataType.TEXT)
    ]
)

knowledge_data = [
    {"question": "Weaviate 是什么?", "answer": "Weaviate 是云原生向量数据库", "category": "介绍"},
    {"question": "如何安装?", "answer": "支持 Docker、Kubernetes、云服务", "category": "安装"},
    {"question": "支持哪些功能?", "answer": "语义搜索、RAG、知识图谱", "category": "功能"}
]

with knowledge_base.batch.dynamic() as batch:
    for item in knowledge_data:
        text = f"{item['question']} {item['answer']}"
        vector = embedding_model.encode(text).tolist()
        batch.add_object(properties=item, vector=vector)

print("知识库构建完成\n")

class RAG:
    def __init__(self, collection_name: str):
        self.collection = client.collections.get(collection_name)
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        vector = embedding_model.encode(query).tolist()
        response = self.collection.query.near_vector(
            near_vector=vector,
            limit=top_k,
            return_properties=["question", "answer"]
        )
        return [{"question": obj.properties["question"], 
                 "answer": obj.properties["answer"]} for obj in response.objects]
    
    def generate(self, query: str, contexts: List[Dict]) -> str:
        context_text = "\n".join([f"Q: {c['question']}\nA: {c['answer']}" for c in contexts])
        prompt = f"基于以下内容回答问题:\n{context_text}\n\n问题:{query}"
        
        response = openai_client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500
        )
        return response.choices[0].message.content
    
    def query(self, question: str) -> Dict:
        contexts = self.retrieve(question)
        answer = self.generate(question, contexts)
        return {"question": question, "answer": answer, "contexts": contexts}

rag = RAG("KnowledgeBase")

result = rag.query("Weaviate 能做什么?")

print(f"问题: {result['question']}")
print(f"\n回答: {result['answer']}")
print(f"\n参考:")
for ctx in result['contexts']:
    print(f"- {ctx['question']}")

client.close()

小结 #

本章介绍了使用 Weaviate 构建 RAG 系统:

  • 知识库构建
  • 基础 RAG 实现
  • 带过滤的 RAG
  • 多轮对话 RAG
  • 混合检索 RAG
  • RAG 评估

下一步 #

继续学习 知识图谱,了解如何构建语义知识图谱!

最后更新:2026-04-04