高级配置 #
持久化存储 #
内存模式 vs 持久化模式 #
text
┌─────────────────────────────────────────────────────────────┐
│ 存储模式对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 内存模式 (Ephemeral) │
│ ├── 数据存储在内存中 │
│ ├── 程序结束数据丢失 │
│ ├── 适合测试和原型 │
│ └── 速度最快 │
│ │
│ 持久化模式 (Persistent) │
│ ├── 数据存储在磁盘 │
│ ├── 程序重启数据保留 │
│ ├── 适合生产环境 │
│ └── 速度略慢于内存模式 │
│ │
│ 服务器模式 (Server) │
│ ├── 独立服务器进程 │
│ ├── 支持多客户端 │
│ ├── 适合分布式部署 │
│ └── 需要额外配置 │
│ │
└─────────────────────────────────────────────────────────────┘
持久化客户端 #
python
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection(name="documents")
collection.add(
documents=["持久化存储的文档"],
ids=["doc1"]
)
print("数据已保存到 ./chroma_db 目录")
持久化路径配置 #
python
import chromadb
import os
db_path = os.path.expanduser("~/data/chroma_db")
os.makedirs(db_path, exist_ok=True)
client = chromadb.PersistentClient(path=db_path)
print(f"数据库路径: {db_path}")
持久化数据管理 #
python
import chromadb
import shutil
import os
def backup_database(source_path: str, backup_path: str):
if os.path.exists(backup_path):
shutil.rmtree(backup_path)
shutil.copytree(source_path, backup_path)
print(f"备份完成: {backup_path}")
def restore_database(backup_path: str, target_path: str):
if os.path.exists(target_path):
shutil.rmtree(target_path)
shutil.copytree(backup_path, target_path)
print(f"恢复完成: {target_path}")
def clear_database(db_path: str):
if os.path.exists(db_path):
shutil.rmtree(db_path)
print(f"数据库已清空: {db_path}")
client = chromadb.PersistentClient(path="./chroma_db")
backup_database("./chroma_db", "./chroma_backup")
服务器模式 #
启动 Chroma 服务器 #
bash
# 基本启动
chroma run --host localhost --port 8000 --path ./chroma_db
# 后台运行
chroma run --host 0.0.0.0 --port 8000 --path ./chroma_db &
# Docker 启动
docker run -p 8000:8000 chromadb/chroma
连接服务器 #
python
import chromadb
client = chromadb.HttpClient(
host="localhost",
port=8000
)
collection = client.get_or_create_collection(name="server_docs")
collection.add(
documents=["服务器模式文档"],
ids=["server_doc1"]
)
print("连接服务器成功")
服务器配置 #
python
import chromadb
client = chromadb.HttpClient(
host="localhost",
port=8000,
credentials=chromadb.BasicAuthCredentials(
username="admin",
password="password123"
)
)
异步客户端 #
python
import chromadb
import asyncio
async def main():
client = await chromadb.AsyncHttpClient(
host="localhost",
port=8000
)
collection = await client.get_or_create_collection(name="async_docs")
await collection.add(
documents=["异步操作文档"],
ids=["async_doc1"]
)
results = await collection.query(
query_texts=["异步查询"],
n_results=1
)
print(results)
asyncio.run(main())
性能优化 #
HNSW 索引优化 #
python
import chromadb
client = chromadb.Client()
high_recall_collection = client.create_collection(
name="high_recall",
metadata={
"hnsw:space": "cosine",
"hnsw:construction_ef": 400,
"hnsw:M": 64
}
)
fast_insert_collection = client.create_collection(
name="fast_insert",
metadata={
"hnsw:space": "cosine",
"hnsw:construction_ef": 50,
"hnsw:M": 8
}
)
HNSW 参数详解 #
text
┌─────────────────────────────────────────────────────────────┐
│ HNSW 参数优化 │
├─────────────────────────────────────────────────────────────┤
│ │
│ construction_ef (构建时候选列表大小) │
│ ├── 默认: 100 │
│ ├── 高召回率: 200-400 │
│ ├── 快速构建: 50-100 │
│ └── 影响: 构建时间和索引质量 │
│ │
│ M (每个节点的连接数) │
│ ├── 默认: 16 │
│ ├── 高召回率: 32-64 │
│ ├── 低内存: 8-16 │
│ └── 影响: 召回率和内存占用 │
│ │
│ batch_size (批量添加大小) │
│ ├── 默认: 100 │
│ ├── 大数据集: 1000-5000 │
│ └── 影响: 添加速度 │
│ │
│ sync_threshold (同步阈值) │
│ ├── 默认: 1000 │
│ └── 影响: 持久化频率 │
│ │
└─────────────────────────────────────────────────────────────┘
批量操作优化 #
python
import chromadb
from typing import List, Dict, Any
def optimized_batch_add(
collection,
documents: List[str],
ids: List[str],
metadatas: List[Dict[str, Any]] = None,
batch_size: int = 1000
):
total = len(documents)
for i in range(0, total, batch_size):
end = min(i + batch_size, total)
batch_docs = documents[i:end]
batch_ids = ids[i:end]
batch_metas = metadatas[i:end] if metadatas else None
collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas
)
if end % 10000 == 0 or end == total:
print(f"进度: {end}/{total}")
client = chromadb.Client()
collection = client.create_collection(name="optimized")
documents = [f"文档内容 {i}" for i in range(100000)]
ids = [f"doc{i}" for i in range(100000)]
metadatas = [{"batch": i // 1000} for i in range(100000)]
optimized_batch_add(collection, documents, ids, metadatas, batch_size=5000)
查询优化 #
python
def optimized_query(collection, query_texts: List[str], n_results: int = 10):
results = collection.query(
query_texts=query_texts,
n_results=n_results,
include=["documents", "distances"]
)
return results
def cached_query(collection, query_cache: dict, query_text: str, n_results: int = 5):
cache_key = f"{query_text}:{n_results}"
if cache_key in query_cache:
return query_cache[cache_key]
results = collection.query(
query_texts=[query_text],
n_results=n_results
)
query_cache[cache_key] = results
return results
内存管理 #
内存监控 #
python
import psutil
import os
def get_memory_usage():
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
return memory_mb
def monitor_collection_memory(collection):
count = collection.count()
memory = get_memory_usage()
print(f"文档数量: {count}")
print(f"内存使用: {memory:.2f} MB")
print(f"平均每文档: {memory/count:.4f} MB" if count > 0 else "")
内存优化策略 #
python
import chromadb
def create_memory_efficient_collection(client, name: str):
return client.create_collection(
name=name,
metadata={
"hnsw:M": 8,
"hnsw:construction_ef": 50
}
)
def clear_collection_memory(collection):
all_ids = collection.get()['ids']
if all_ids:
batch_size = 1000
for i in range(0, len(all_ids), batch_size):
batch_ids = all_ids[i:i+batch_size]
collection.delete(ids=batch_ids)
并发处理 #
多线程操作 #
python
import chromadb
from concurrent.futures import ThreadPoolExecutor
import threading
client = chromadb.HttpClient(host="localhost", port=8000)
collection = client.get_or_create_collection(name="concurrent_docs")
lock = threading.Lock()
def add_documents_batch(batch_id: int, documents: list, ids: list):
with lock:
collection.add(
documents=documents,
ids=ids
)
print(f"批次 {batch_id} 完成")
def parallel_add(total_docs: int = 10000, batch_size: int = 1000, workers: int = 4):
all_docs = [f"文档 {i}" for i in range(total_docs)]
all_ids = [f"doc{i}" for i in range(total_docs)]
batches = []
for i in range(0, total_docs, batch_size):
batch_id = i // batch_size
batch_docs = all_docs[i:i+batch_size]
batch_ids = all_ids[i:i+batch_size]
batches.append((batch_id, batch_docs, batch_ids))
with ThreadPoolExecutor(max_workers=workers) as executor:
executor.map(lambda b: add_documents_batch(*b), batches)
parallel_add(total_docs=10000, batch_size=1000, workers=4)
异步批量操作 #
python
import asyncio
import chromadb
async def async_batch_add(collection, documents, ids, batch_size=100):
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
batch_ids = ids[i:i+batch_size]
await collection.add(
documents=batch_docs,
ids=batch_ids
)
if (i + batch_size) % 1000 == 0:
print(f"已添加 {i + batch_size} 个文档")
async def main():
client = await chromadb.AsyncHttpClient(host="localhost", port=8000)
collection = await client.get_or_create_collection(name="async_batch")
documents = [f"异步文档 {i}" for i in range(5000)]
ids = [f"async_doc{i}" for i in range(5000)]
await async_batch_add(collection, documents, ids)
asyncio.run(main())
数据迁移 #
导出数据 #
python
import json
import chromadb
def export_collection_to_json(collection, output_file: str):
data = collection.get(include=["documents", "metadatas", "embeddings"])
export_data = {
"name": collection.name,
"metadata": collection.metadata,
"count": len(data['ids']),
"documents": []
}
for i in range(len(data['ids'])):
doc = {
"id": data['ids'][i],
"document": data['documents'][i] if data['documents'] else None,
"embedding": data['embeddings'][i] if data['embeddings'] else None,
"metadata": data['metadatas'][i] if data['metadatas'] else None
}
export_data["documents"].append(doc)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
print(f"导出完成: {output_file}")
print(f"文档数量: {export_data['count']}")
client = chromadb.Client()
collection = client.get_collection("documents")
export_collection_to_json(collection, "export.json")
导入数据 #
python
import json
import chromadb
def import_collection_from_json(client, input_file: str):
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
collection = client.get_or_create_collection(
name=data["name"],
metadata=data.get("metadata", {})
)
if data["documents"]:
batch_size = 1000
docs = data["documents"]
for i in range(0, len(docs), batch_size):
batch = docs[i:i+batch_size]
collection.add(
ids=[d["id"] for d in batch],
documents=[d["document"] for d in batch if d["document"]],
embeddings=[d["embedding"] for d in batch if d["embedding"]],
metadatas=[d["metadata"] for d in batch if d["metadata"]]
)
print(f"导入完成: {data['name']}")
print(f"文档数量: {len(data['documents'])}")
return collection
client = chromadb.Client()
collection = import_collection_from_json(client, "export.json")
监控与日志 #
日志配置 #
python
import logging
import chromadb
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('chroma.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('chroma')
class LoggingCollection:
def __init__(self, collection):
self.collection = collection
def add(self, **kwargs):
logger.info(f"添加文档: {len(kwargs.get('ids', []))} 个")
result = self.collection.add(**kwargs)
logger.info("添加完成")
return result
def query(self, **kwargs):
logger.info(f"查询: {kwargs.get('query_texts', [])}")
result = self.collection.query(**kwargs)
logger.info(f"查询结果: {len(result['ids'][0])} 个")
return result
def delete(self, **kwargs):
logger.info(f"删除文档: {kwargs}")
result = self.collection.delete(**kwargs)
logger.info("删除完成")
return result
client = chromadb.Client()
base_collection = client.create_collection(name="logged_docs")
collection = LoggingCollection(base_collection)
性能监控 #
python
import time
from functools import wraps
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 耗时: {end_time - start_time:.4f} 秒")
return result
return wrapper
class MonitoredCollection:
def __init__(self, collection):
self.collection = collection
self.stats = {
'add_count': 0,
'query_count': 0,
'delete_count': 0,
'total_add_time': 0,
'total_query_time': 0
}
@timing_decorator
def add(self, **kwargs):
start = time.time()
result = self.collection.add(**kwargs)
self.stats['add_count'] += len(kwargs.get('ids', []))
self.stats['total_add_time'] += time.time() - start
return result
@timing_decorator
def query(self, **kwargs):
start = time.time()
result = self.collection.query(**kwargs)
self.stats['query_count'] += 1
self.stats['total_query_time'] += time.time() - start
return result
def get_stats(self):
return {
**self.stats,
'avg_add_time': self.stats['total_add_time'] / max(1, self.stats['add_count']),
'avg_query_time': self.stats['total_query_time'] / max(1, self.stats['query_count'])
}
下一步 #
现在你已经掌握了高级配置,接下来学习 集成与扩展,了解如何将 Chroma 集成到主流 AI 框架中!
最后更新:2026-04-04