集成与扩展 #
LangChain 集成 #
安装依赖 #
bash
pip install langchain langchain-community chromadb
基本集成 #
python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import TextLoader
loader = TextLoader("./documents/example.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=texts,
embedding=embeddings,
persist_directory="./chroma_db"
)
results = vectorstore.similarity_search("查询内容", k=3)
for doc in results:
print(doc.page_content)
作为检索器使用 #
python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_community.llms import OpenAI
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 3}
)
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(),
chain_type="stuff",
retriever=retriever
)
answer = qa_chain.run("什么是机器学习?")
print(answer)
RAG 应用示例 #
python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
template = """你是一个有帮助的助手。根据以下上下文回答问题:
上下文:
{context}
问题:{question}
请用中文回答:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model_name="gpt-3.5-turbo")
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
answer = rag_chain.invoke("什么是向量数据库?")
print(answer)
带记忆的对话 #
python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings
)
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
qa = ConversationalRetrievalChain.from_llm(
llm=ChatOpenAI(),
retriever=vectorstore.as_retriever(),
memory=memory
)
chat_history = []
question = "什么是 Chroma?"
result = qa({"question": question, "chat_history": chat_history})
print(result["answer"])
chat_history.append((question, result["answer"]))
follow_up = "它有什么特点?"
result = qa({"question": follow_up, "chat_history": chat_history})
print(result["answer"])
LlamaIndex 集成 #
安装依赖 #
bash
pip install llama-index llama-index-vector-stores-chroma chromadb
基本集成 #
python
import chromadb
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext
from llama_index.core.embeddings import resolve_embed_model
documents = SimpleDirectoryReader("./documents").load_data()
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("llamaindex_docs")
embed_model = resolve_embed_model("local:BAAI/bge-small-en-v1.5")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model
)
query_engine = index.as_query_engine()
response = query_engine.query("什么是向量数据库?")
print(response)
持久化索引 #
python
import chromadb
from llama_index.core import VectorStoreIndex, load_index_from_storage
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext
from llama_index.core.embeddings import resolve_embed_model
embed_model = resolve_embed_model("local:BAAI/bge-small-en-v1.5")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("persistent_docs")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
try:
index = load_index_from_storage(storage_context)
print("加载已有索引")
except:
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("./documents").load_data()
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
embed_model=embed_model
)
print("创建新索引")
query_engine = index.as_query_engine()
高级查询 #
python
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=5
)
query_engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.7)]
)
response = query_engine.query("查询内容")
print(response)
其他框架集成 #
Haystack 集成 #
python
from haystack.document_stores import ChromaDocumentStore
from haystack.nodes import EmbeddingRetriever
from haystack.pipelines import DocumentSearchPipeline
document_store = ChromaDocumentStore(
persist_path="./chroma_haystack",
collection_name="haystack_docs"
)
retriever = EmbeddingRetriever(
document_store=document_store,
embedding_model="sentence-transformers/all-MiniLM-L6-v2"
)
pipeline = DocumentSearchPipeline(retriever)
result = pipeline.run(
query="查询内容",
params={"Retriever": {"top_k": 5}}
)
for doc in result["documents"]:
print(doc.content)
Semantic Kernel 集成 #
python
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.connectors.memory.chroma import ChromaMemoryStore
kernel = sk.Kernel()
kernel.add_chat_service(
"chat",
OpenAIChatCompletion("gpt-3.5-turbo", api_key="your-api-key")
)
kernel.register_memory_store(
memory_store=ChromaMemoryStore("./chroma_kernel")
)
await kernel.memory.save_information_async(
collection="docs",
text="Chroma 是一个向量数据库",
id="doc1"
)
result = await kernel.memory.search_async(
collection="docs",
query="什么是 Chroma?",
limit=3
)
for item in result:
print(item.text)
DSPy 集成 #
python
import dspy
from chromadb import Client
from chromadb.utils import embedding_functions
chroma_client = Client()
collection = chroma_client.get_or_create_collection("dspy_docs")
ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)
class ChromaRM(dspy.Retrieve):
def __init__(self, collection, k=3):
super().__init__(k=k)
self.collection = collection
def forward(self, query):
results = self.collection.query(
query_texts=[query],
n_results=self.k
)
passages = []
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
passages.append(dspy.Passage(content=doc, metadata=meta))
return dspy.Prediction(passages=passages)
rm = ChromaRM(collection, k=3)
class RAG(dspy.Module):
def __init__(self):
super().__init__()
self.retrieve = rm
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.generate(context=context, question=question)
rag = RAG()
自定义扩展 #
自定义嵌入函数包装器 #
python
from chromadb.api.types import EmbeddingFunction, Documents, Embeddings
from typing import List
class CustomEmbeddingWrapper(EmbeddingFunction):
def __init__(self, model_name: str = "custom-model"):
self.model_name = model_name
self._model = None
def _load_model(self):
if self._model is None:
pass
return self._model
def __call__(self, input: Documents) -> Embeddings:
model = self._load_model()
embeddings = []
for text in input:
embedding = self._compute(text)
embeddings.append(embedding)
return embeddings
def _compute(self, text: str) -> List[float]:
return [0.0] * 384
自定义检索器 #
python
from typing import List, Dict, Any
import chromadb
class CustomRetriever:
def __init__(self, collection, rerank_model=None):
self.collection = collection
self.rerank_model = rerank_model
def retrieve(
self,
query: str,
n_results: int = 5,
filters: Dict[str, Any] = None
) -> List[Dict]:
results = self.collection.query(
query_texts=[query],
where=filters,
n_results=n_results * 2
)
documents = []
for i in range(len(results['ids'][0])):
documents.append({
'id': results['ids'][0][i],
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'score': results['distances'][0][i]
})
if self.rerank_model:
documents = self._rerank(query, documents)
return documents[:n_results]
def _rerank(self, query: str, documents: List[Dict]) -> List[Dict]:
return sorted(documents, key=lambda x: x['score'], reverse=True)
自定义存储后端 #
python
from chromadb.api.segment import SegmentAPI
from chromadb.config import Settings
import chromadb
class CustomStorageBackend:
def __init__(self, config: dict):
self.config = config
def save(self, data):
pass
def load(self):
pass
def create_custom_client(backend: CustomStorageBackend):
settings = Settings(
chroma_api_impl="chromadb.api.segment.SegmentAPI",
chroma_sysdb_impl="chromadb.db.impl.sqlite.SqliteDB",
persist_directory="./custom_db"
)
return chromadb.Client(settings)
完整 RAG 应用示例 #
python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough, RunnableParallel
from langchain.schema.output_parser import StrOutputParser
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGApplication:
def __init__(self, docs_path: str, persist_directory: str = "./chroma_rag"):
self.docs_path = docs_path
self.persist_directory = persist_directory
self.embeddings = OpenAIEmbeddings()
self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
self._setup_vectorstore()
self._setup_chain()
def _setup_vectorstore(self):
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
if self.vectorstore._collection.count() == 0:
self._load_documents()
def _load_documents(self):
loader = DirectoryLoader(
self.docs_path,
glob="**/*.txt",
loader_cls=TextLoader
)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
texts = text_splitter.split_documents(documents)
self.vectorstore.add_documents(texts)
print(f"加载了 {len(texts)} 个文档块")
def _setup_chain(self):
template = """你是一个专业的助手。请根据以下上下文回答问题。
如果上下文中没有相关信息,请说"我不知道"。
上下文:
{context}
问题:{question}
请用中文详细回答:"""
prompt = ChatPromptTemplate.from_template(template)
self.chain = (
{
"context": self.vectorstore.as_retriever(search_kwargs={"k": 3}),
"question": RunnablePassthrough()
}
| prompt
| self.llm
| StrOutputParser()
)
def query(self, question: str) -> str:
return self.chain.invoke(question)
def add_documents(self, texts: list, metadatas: list = None):
from langchain.schema import Document
documents = [
Document(page_content=text, metadata=meta or {})
for text, meta in zip(texts, metadatas or [{}] * len(texts))
]
self.vectorstore.add_documents(documents)
print(f"添加了 {len(documents)} 个文档")
rag = RAGApplication("./documents")
answer = rag.query("什么是向量数据库?")
print(answer)
rag.add_documents(
texts=["新文档内容1", "新文档内容2"],
metadatas=[{"source": "manual"}, {"source": "manual"}]
)
下一步 #
现在你已经掌握了集成与扩展,接下来学习 最佳实践,了解生产环境中的最佳配置和优化技巧!
最后更新:2026-04-04