集成与扩展 #

LangChain 集成 #

安装依赖 #

bash
pip install langchain langchain-community chromadb

基本集成 #

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import TextLoader

loader = TextLoader("./documents/example.txt")
documents = loader.load()

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)

embeddings = OpenAIEmbeddings()

vectorstore = Chroma.from_documents(
    documents=texts,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

results = vectorstore.similarity_search("查询内容", k=3)

for doc in results:
    print(doc.page_content)

作为检索器使用 #

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_community.llms import OpenAI

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings
)

retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}
)

qa_chain = RetrievalQA.from_chain_type(
    llm=OpenAI(),
    chain_type="stuff",
    retriever=retriever
)

answer = qa_chain.run("什么是机器学习?")
print(answer)

RAG 应用示例 #

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

template = """你是一个有帮助的助手。根据以下上下文回答问题:

上下文:
{context}

问题:{question}

请用中文回答:"""

prompt = ChatPromptTemplate.from_template(template)

llm = ChatOpenAI(model_name="gpt-3.5-turbo")

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

answer = rag_chain.invoke("什么是向量数据库?")
print(answer)

带记忆的对话 #

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings
)

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

qa = ConversationalRetrievalChain.from_llm(
    llm=ChatOpenAI(),
    retriever=vectorstore.as_retriever(),
    memory=memory
)

chat_history = []

question = "什么是 Chroma?"
result = qa({"question": question, "chat_history": chat_history})
print(result["answer"])

chat_history.append((question, result["answer"]))

follow_up = "它有什么特点?"
result = qa({"question": follow_up, "chat_history": chat_history})
print(result["answer"])

LlamaIndex 集成 #

安装依赖 #

bash
pip install llama-index llama-index-vector-stores-chroma chromadb

基本集成 #

python
import chromadb
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext
from llama_index.core.embeddings import resolve_embed_model

documents = SimpleDirectoryReader("./documents").load_data()

chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("llamaindex_docs")

embed_model = resolve_embed_model("local:BAAI/bge-small-en-v1.5")

vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model
)

query_engine = index.as_query_engine()
response = query_engine.query("什么是向量数据库?")
print(response)

持久化索引 #

python
import chromadb
from llama_index.core import VectorStoreIndex, load_index_from_storage
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext
from llama_index.core.embeddings import resolve_embed_model

embed_model = resolve_embed_model("local:BAAI/bge-small-en-v1.5")

chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("persistent_docs")

vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

try:
    index = load_index_from_storage(storage_context)
    print("加载已有索引")
except:
    from llama_index.core import SimpleDirectoryReader
    documents = SimpleDirectoryReader("./documents").load_data()
    index = VectorStoreIndex.from_documents(
        documents,
        storage_context=storage_context,
        embed_model=embed_model
    )
    print("创建新索引")

query_engine = index.as_query_engine()

高级查询 #

python
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor

retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=5
)

query_engine = RetrieverQueryEngine.from_args(
    retriever=retriever,
    node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.7)]
)

response = query_engine.query("查询内容")
print(response)

其他框架集成 #

Haystack 集成 #

python
from haystack.document_stores import ChromaDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.pipelines import DocumentSearchPipeline

document_store = ChromaDocumentStore(
    persist_path="./chroma_haystack",
    collection_name="haystack_docs"
)

retriever = EmbeddingRetriever(
    document_store=document_store,
    embedding_model="sentence-transformers/all-MiniLM-L6-v2"
)

pipeline = DocumentSearchPipeline(retriever)

result = pipeline.run(
    query="查询内容",
    params={"Retriever": {"top_k": 5}}
)

for doc in result["documents"]:
    print(doc.content)

Semantic Kernel 集成 #

python
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.connectors.memory.chroma import ChromaMemoryStore

kernel = sk.Kernel()

kernel.add_chat_service(
    "chat",
    OpenAIChatCompletion("gpt-3.5-turbo", api_key="your-api-key")
)

kernel.register_memory_store(
    memory_store=ChromaMemoryStore("./chroma_kernel")
)

await kernel.memory.save_information_async(
    collection="docs",
    text="Chroma 是一个向量数据库",
    id="doc1"
)

result = await kernel.memory.search_async(
    collection="docs",
    query="什么是 Chroma?",
    limit=3
)

for item in result:
    print(item.text)

DSPy 集成 #

python
import dspy
from chromadb import Client
from chromadb.utils import embedding_functions

chroma_client = Client()
collection = chroma_client.get_or_create_collection("dspy_docs")

ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)

class ChromaRM(dspy.Retrieve):
    def __init__(self, collection, k=3):
        super().__init__(k=k)
        self.collection = collection
    
    def forward(self, query):
        results = self.collection.query(
            query_texts=[query],
            n_results=self.k
        )
        
        passages = []
        for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
            passages.append(dspy.Passage(content=doc, metadata=meta))
        
        return dspy.Prediction(passages=passages)

rm = ChromaRM(collection, k=3)

class RAG(dspy.Module):
    def __init__(self):
        super().__init__()
        self.retrieve = rm
        self.generate = dspy.ChainOfThought("context, question -> answer")
    
    def forward(self, question):
        context = self.retrieve(question).passages
        return self.generate(context=context, question=question)

rag = RAG()

自定义扩展 #

自定义嵌入函数包装器 #

python
from chromadb.api.types import EmbeddingFunction, Documents, Embeddings
from typing import List

class CustomEmbeddingWrapper(EmbeddingFunction):
    def __init__(self, model_name: str = "custom-model"):
        self.model_name = model_name
        self._model = None
    
    def _load_model(self):
        if self._model is None:
            pass
        return self._model
    
    def __call__(self, input: Documents) -> Embeddings:
        model = self._load_model()
        embeddings = []
        
        for text in input:
            embedding = self._compute(text)
            embeddings.append(embedding)
        
        return embeddings
    
    def _compute(self, text: str) -> List[float]:
        return [0.0] * 384

自定义检索器 #

python
from typing import List, Dict, Any
import chromadb

class CustomRetriever:
    def __init__(self, collection, rerank_model=None):
        self.collection = collection
        self.rerank_model = rerank_model
    
    def retrieve(
        self,
        query: str,
        n_results: int = 5,
        filters: Dict[str, Any] = None
    ) -> List[Dict]:
        results = self.collection.query(
            query_texts=[query],
            where=filters,
            n_results=n_results * 2
        )
        
        documents = []
        for i in range(len(results['ids'][0])):
            documents.append({
                'id': results['ids'][0][i],
                'content': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'score': results['distances'][0][i]
            })
        
        if self.rerank_model:
            documents = self._rerank(query, documents)
        
        return documents[:n_results]
    
    def _rerank(self, query: str, documents: List[Dict]) -> List[Dict]:
        return sorted(documents, key=lambda x: x['score'], reverse=True)

自定义存储后端 #

python
from chromadb.api.segment import SegmentAPI
from chromadb.config import Settings
import chromadb

class CustomStorageBackend:
    def __init__(self, config: dict):
        self.config = config
    
    def save(self, data):
        pass
    
    def load(self):
        pass

def create_custom_client(backend: CustomStorageBackend):
    settings = Settings(
        chroma_api_impl="chromadb.api.segment.SegmentAPI",
        chroma_sysdb_impl="chromadb.db.impl.sqlite.SqliteDB",
        persist_directory="./custom_db"
    )
    
    return chromadb.Client(settings)

完整 RAG 应用示例 #

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough, RunnableParallel
from langchain.schema.output_parser import StrOutputParser
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

class RAGApplication:
    def __init__(self, docs_path: str, persist_directory: str = "./chroma_rag"):
        self.docs_path = docs_path
        self.persist_directory = persist_directory
        
        self.embeddings = OpenAIEmbeddings()
        self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
        
        self._setup_vectorstore()
        self._setup_chain()
    
    def _setup_vectorstore(self):
        self.vectorstore = Chroma(
            persist_directory=self.persist_directory,
            embedding_function=self.embeddings
        )
        
        if self.vectorstore._collection.count() == 0:
            self._load_documents()
    
    def _load_documents(self):
        loader = DirectoryLoader(
            self.docs_path,
            glob="**/*.txt",
            loader_cls=TextLoader
        )
        documents = loader.load()
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        texts = text_splitter.split_documents(documents)
        
        self.vectorstore.add_documents(texts)
        print(f"加载了 {len(texts)} 个文档块")
    
    def _setup_chain(self):
        template = """你是一个专业的助手。请根据以下上下文回答问题。
如果上下文中没有相关信息,请说"我不知道"。

上下文:
{context}

问题:{question}

请用中文详细回答:"""
        
        prompt = ChatPromptTemplate.from_template(template)
        
        self.chain = (
            {
                "context": self.vectorstore.as_retriever(search_kwargs={"k": 3}),
                "question": RunnablePassthrough()
            }
            | prompt
            | self.llm
            | StrOutputParser()
        )
    
    def query(self, question: str) -> str:
        return self.chain.invoke(question)
    
    def add_documents(self, texts: list, metadatas: list = None):
        from langchain.schema import Document
        
        documents = [
            Document(page_content=text, metadata=meta or {})
            for text, meta in zip(texts, metadatas or [{}] * len(texts))
        ]
        
        self.vectorstore.add_documents(documents)
        print(f"添加了 {len(documents)} 个文档")

rag = RAGApplication("./documents")

answer = rag.query("什么是向量数据库?")
print(answer)

rag.add_documents(
    texts=["新文档内容1", "新文档内容2"],
    metadatas=[{"source": "manual"}, {"source": "manual"}]
)

下一步 #

现在你已经掌握了集成与扩展,接下来学习 最佳实践,了解生产环境中的最佳配置和优化技巧!

最后更新:2026-04-04