最佳实践 #
生产部署架构 #
架构设计 #
text
┌─────────────────────────────────────────────────────────────┐
│ 生产部署架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 客户端 │ │ 客户端 │ │ 客户端 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ 负载均衡器 │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Chroma 节点 │ │ Chroma 节点 │ │ Chroma 节点 │ │
│ │ (主) │ │ (从) │ │ (从) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ 共享存储 │ │
│ │ (持久化数据) │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
部署清单 #
text
┌─────────────────────────────────────────────────────────────┐
│ 部署前检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 硬件资源 │
│ □ CPU: 4核以上 │
│ □ 内存: 16GB以上 │
│ □ 存储: SSD,预留3倍数据大小 │
│ □ 网络: 低延迟,高带宽 │
│ │
│ 软件环境 │
│ □ Python 3.10+ │
│ □ 足够的文件描述符限制 │
│ □ 防火墙配置 │
│ □ SSL 证书 │
│ │
│ 安全配置 │
│ □ 访问控制 │
│ □ 数据加密 │
│ □ 网络隔离 │
│ □ 审计日志 │
│ │
│ 监控告警 │
│ □ 性能监控 │
│ □ 资源监控 │
│ □ 错误告警 │
│ □ 日志收集 │
│ │
└─────────────────────────────────────────────────────────────┘
性能优化 #
索引优化 #
python
import chromadb
def create_optimized_collection(client, name: str, use_case: str = "balanced"):
configs = {
"high_recall": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 400,
"hnsw:M": 64
},
"balanced": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 200,
"hnsw:M": 32
},
"fast_insert": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 50,
"hnsw:M": 8
},
"memory_efficient": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 100,
"hnsw:M": 8
}
}
return client.create_collection(
name=name,
metadata=configs.get(use_case, configs["balanced"])
)
批量操作最佳实践 #
python
import chromadb
from typing import List, Dict, Any
class BatchProcessor:
def __init__(self, collection, batch_size: int = 1000):
self.collection = collection
self.batch_size = batch_size
def add_documents(
self,
documents: List[str],
ids: List[str],
metadatas: List[Dict[str, Any]] = None
):
total = len(documents)
for i in range(0, total, self.batch_size):
end = min(i + self.batch_size, total)
batch_docs = documents[i:end]
batch_ids = ids[i:end]
batch_metas = metadatas[i:end] if metadatas else None
try:
self.collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas
)
except Exception as e:
print(f"批次 {i//self.batch_size} 失败: {e}")
self._retry_batch(batch_docs, batch_ids, batch_metas)
if end % 10000 == 0 or end == total:
print(f"进度: {end}/{total}")
def _retry_batch(self, docs, ids, metadatas, max_retries=3):
for attempt in range(max_retries):
try:
self.collection.add(
documents=docs,
ids=ids,
metadatas=metadatas
)
return
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"重试 {attempt + 1}/{max_retries}")
查询优化 #
python
from functools import lru_cache
import hashlib
class OptimizedQuery:
def __init__(self, collection, cache_size: int = 1000):
self.collection = collection
self.cache_size = cache_size
self._cache = {}
def query(
self,
query_text: str,
n_results: int = 5,
where: dict = None,
use_cache: bool = True
):
cache_key = self._make_cache_key(query_text, n_results, where)
if use_cache and cache_key in self._cache:
return self._cache[cache_key]
results = self.collection.query(
query_texts=[query_text],
n_results=n_results,
where=where
)
if use_cache:
self._update_cache(cache_key, results)
return results
def _make_cache_key(self, query_text: str, n_results: int, where: dict) -> str:
key_data = f"{query_text}:{n_results}:{where}"
return hashlib.md5(key_data.encode()).hexdigest()
def _update_cache(self, key: str, results: dict):
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
self._cache[key] = results
def clear_cache(self):
self._cache.clear()
安全配置 #
访问控制 #
python
import chromadb
from functools import wraps
from typing import Callable
def require_auth(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.is_authenticated:
raise PermissionError("未授权访问")
return func(self, *args, **kwargs)
return wrapper
class SecureChromaClient:
def __init__(self, host: str, port: int, username: str, password: str):
self.client = chromadb.HttpClient(
host=host,
port=port,
credentials=chromadb.BasicAuthCredentials(
username=username,
password=password
)
)
self.is_authenticated = True
@require_auth
def get_collection(self, name: str):
return self.client.get_collection(name)
@require_auth
def create_collection(self, name: str, **kwargs):
return self.client.create_collection(name, **kwargs)
@require_auth
def delete_collection(self, name: str):
return self.client.delete_collection(name)
数据加密 #
python
from cryptography.fernet import Fernet
import base64
import json
class EncryptedMetadata:
def __init__(self, key: bytes = None):
self.key = key or Fernet.generate_key()
self.cipher = Fernet(self.key)
def encrypt_metadata(self, metadata: dict) -> str:
json_data = json.dumps(metadata)
encrypted = self.cipher.encrypt(json_data.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt_metadata(self, encrypted_data: str) -> dict:
encrypted = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted.decode())
def encrypt_document(self, document: str) -> str:
encrypted = self.cipher.encrypt(document.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt_document(self, encrypted_data: str) -> str:
encrypted = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted = self.cipher.decrypt(encrypted)
return decrypted.decode()
encryptor = EncryptedMetadata()
encrypted_meta = encryptor.encrypt_metadata({"secret": "data"})
encrypted_doc = encryptor.encrypt_document("敏感文档内容")
网络安全 #
python
import chromadb
from typing import Optional
class SecureClientFactory:
@staticmethod
def create_client(
host: str,
port: int,
ssl: bool = True,
verify_ssl: bool = True,
username: Optional[str] = None,
password: Optional[str] = None
):
settings = chromadb.config.Settings(
chroma_api_impl="chromadb.api.fastapi.FastAPI",
chroma_server_host=host,
chroma_server_http_port=port,
chroma_server_ssl_enabled=ssl,
chroma_server_ssl_verify=verify_ssl
)
if username and password:
settings.chroma_server_auth_credentials = {
"username": username,
"password": password
}
return chromadb.Client(settings)
监控与运维 #
健康检查 #
python
import chromadb
import time
from typing import Dict, Any
class HealthChecker:
def __init__(self, client):
self.client = client
def check_health(self) -> Dict[str, Any]:
health = {
"status": "healthy",
"timestamp": time.time(),
"checks": {}
}
try:
health["checks"]["connection"] = self._check_connection()
except Exception as e:
health["checks"]["connection"] = {"status": "unhealthy", "error": str(e)}
health["status"] = "unhealthy"
try:
health["checks"]["collections"] = self._check_collections()
except Exception as e:
health["checks"]["collections"] = {"status": "unhealthy", "error": str(e)}
try:
health["checks"]["query"] = self._check_query()
except Exception as e:
health["checks"]["query"] = {"status": "unhealthy", "error": str(e)}
return health
def _check_connection(self) -> Dict[str, Any]:
start = time.time()
self.client.heartbeat()
latency = time.time() - start
return {
"status": "healthy",
"latency_ms": latency * 1000
}
def _check_collections(self) -> Dict[str, Any]:
collections = self.client.list_collections()
return {
"status": "healthy",
"count": len(collections)
}
def _check_query(self) -> Dict[str, Any]:
collections = self.client.list_collections()
if not collections:
return {"status": "healthy", "message": "No collections to test"}
test_collection = collections[0]
start = time.time()
test_collection.get(limit=1)
latency = time.time() - start
return {
"status": "healthy",
"latency_ms": latency * 1000
}
性能监控 #
python
import time
import psutil
import os
from dataclasses import dataclass
from typing import List
import json
@dataclass
class Metric:
name: str
value: float
timestamp: float
tags: dict
class PerformanceMonitor:
def __init__(self, collection):
self.collection = collection
self.metrics: List[Metric] = []
self.process = psutil.Process(os.getpid())
def record_operation(self, operation: str, duration: float, success: bool = True):
metric = Metric(
name=f"chroma_{operation}_duration",
value=duration,
timestamp=time.time(),
tags={"success": success}
)
self.metrics.append(metric)
def record_memory(self):
memory_info = self.process.memory_info()
self.metrics.append(Metric(
name="chroma_memory_rss_mb",
value=memory_info.rss / 1024 / 1024,
timestamp=time.time(),
tags={}
))
self.metrics.append(Metric(
name="chroma_memory_vms_mb",
value=memory_info.vms / 1024 / 1024,
timestamp=time.time(),
tags={}
))
def record_collection_stats(self):
count = self.collection.count()
self.metrics.append(Metric(
name="chroma_document_count",
value=count,
timestamp=time.time(),
tags={"collection": self.collection.name}
))
def get_summary(self) -> dict:
operations = {}
for metric in self.metrics:
if metric.name.endswith("_duration"):
op = metric.name.replace("chroma_", "").replace("_duration", "")
if op not in operations:
operations[op] = {"count": 0, "total_time": 0, "errors": 0}
operations[op]["count"] += 1
operations[op]["total_time"] += metric.value
if not metric.tags.get("success", True):
operations[op]["errors"] += 1
summary = {}
for op, data in operations.items():
summary[op] = {
"count": data["count"],
"avg_time_ms": (data["total_time"] / data["count"]) * 1000,
"error_rate": data["errors"] / data["count"]
}
return summary
def export_metrics(self, filepath: str):
data = [
{
"name": m.name,
"value": m.value,
"timestamp": m.timestamp,
"tags": m.tags
}
for m in self.metrics
]
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
日志管理 #
python
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
def __init__(self, name: str, log_file: str = None):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_operation(
self,
operation: str,
collection: str = None,
duration: float = None,
success: bool = True,
**kwargs
):
log_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation,
"collection": collection,
"duration_ms": duration * 1000 if duration else None,
"success": success,
**kwargs
}
if success:
self.logger.info(json.dumps(log_entry))
else:
self.logger.error(json.dumps(log_entry))
def log_error(self, operation: str, error: Exception, **kwargs):
self.log_operation(
operation=operation,
success=False,
error_type=type(error).__name__,
error_message=str(error),
**kwargs
)
logger = StructuredLogger("chroma", "chroma.log")
logger.log_operation(
operation="add_documents",
collection="documents",
duration=0.5,
document_count=100
)
故障排查 #
常见问题诊断 #
python
class Diagnostics:
def __init__(self, client):
self.client = client
def diagnose(self) -> dict:
results = {
"connection": self._test_connection(),
"collections": self._test_collections(),
"storage": self._test_storage(),
"performance": self._test_performance()
}
return results
def _test_connection(self) -> dict:
try:
self.client.heartbeat()
return {"status": "ok", "message": "连接正常"}
except Exception as e:
return {"status": "error", "message": str(e)}
def _test_collections(self) -> dict:
try:
collections = self.client.list_collections()
return {
"status": "ok",
"count": len(collections),
"names": [c.name for c in collections]
}
except Exception as e:
return {"status": "error", "message": str(e)}
def _test_storage(self) -> dict:
try:
test_collection = self.client.get_or_create_collection("diagnostic_test")
test_collection.add(
documents=["test"],
ids=["test_id"]
)
result = test_collection.get(ids=["test_id"])
test_collection.delete(ids=["test_id"])
return {"status": "ok", "message": "存储读写正常"}
except Exception as e:
return {"status": "error", "message": str(e)}
def _test_performance(self) -> dict:
import time
try:
test_collection = self.client.get_or_create_collection("perf_test")
docs = [f"test document {i}" for i in range(100)]
ids = [f"test_{i}" for i in range(100)]
start = time.time()
test_collection.add(documents=docs, ids=ids)
add_time = time.time() - start
start = time.time()
test_collection.query(query_texts=["test"], n_results=10)
query_time = time.time() - start
test_collection.delete(ids=ids)
return {
"status": "ok",
"add_time_ms": add_time * 1000,
"query_time_ms": query_time * 1000
}
except Exception as e:
return {"status": "error", "message": str(e)}
错误处理最佳实践 #
python
import chromadb
from typing import Optional, List, Dict, Any
import time
class RobustChromaClient:
def __init__(self, client, max_retries: int = 3, retry_delay: float = 1.0):
self.client = client
self.max_retries = max_retries
self.retry_delay = retry_delay
def safe_add(
self,
collection_name: str,
documents: List[str],
ids: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None
) -> bool:
for attempt in range(self.max_retries):
try:
collection = self.client.get_collection(collection_name)
collection.add(
documents=documents,
ids=ids,
metadatas=metadatas
)
return True
except chromadb.errors.InvalidCollectionException:
try:
self.client.create_collection(collection_name)
continue
except Exception as e:
print(f"创建集合失败: {e}")
return False
except chromadb.errors.DuplicateIDError:
print(f"ID 重复,尝试更新")
try:
collection = self.client.get_collection(collection_name)
collection.upsert(
documents=documents,
ids=ids,
metadatas=metadatas
)
return True
except Exception as e:
print(f"更新失败: {e}")
return False
except Exception as e:
print(f"添加失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
return False
def safe_query(
self,
collection_name: str,
query_texts: List[str],
n_results: int = 5,
where: Optional[Dict] = None
) -> Optional[Dict]:
for attempt in range(self.max_retries):
try:
collection = self.client.get_collection(collection_name)
return collection.query(
query_texts=query_texts,
n_results=n_results,
where=where
)
except chromadb.errors.InvalidCollectionException:
print(f"集合不存在: {collection_name}")
return None
except Exception as e:
print(f"查询失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
return None
备份与恢复 #
python
import chromadb
import json
import shutil
import os
from datetime import datetime
from typing import Optional
class BackupManager:
def __init__(self, client, backup_dir: str = "./backups"):
self.client = client
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
def create_backup(self, backup_name: Optional[str] = None) -> str:
if backup_name is None:
backup_name = datetime.now().strftime("backup_%Y%m%d_%H%M%S")
backup_path = os.path.join(self.backup_dir, backup_name)
os.makedirs(backup_path, exist_ok=True)
collections = self.client.list_collections()
for collection in collections:
self._backup_collection(collection, backup_path)
manifest = {
"backup_name": backup_name,
"created_at": datetime.now().isoformat(),
"collections": [c.name for c in collections]
}
with open(os.path.join(backup_path, "manifest.json"), 'w') as f:
json.dump(manifest, f, indent=2)
print(f"备份完成: {backup_path}")
return backup_path
def _backup_collection(self, collection, backup_path: str):
data = collection.get(include=["documents", "metadatas", "embeddings"])
collection_data = {
"name": collection.name,
"metadata": collection.metadata,
"documents": []
}
for i in range(len(data['ids'])):
doc = {
"id": data['ids'][i],
"document": data['documents'][i] if data['documents'] else None,
"embedding": data['embeddings'][i] if data['embeddings'] else None,
"metadata": data['metadatas'][i] if data['metadatas'] else None
}
collection_data["documents"].append(doc)
filename = f"{collection.name}.json"
with open(os.path.join(backup_path, filename), 'w', encoding='utf-8') as f:
json.dump(collection_data, f, ensure_ascii=False, indent=2)
def restore_backup(self, backup_name: str) -> bool:
backup_path = os.path.join(self.backup_dir, backup_name)
if not os.path.exists(backup_path):
print(f"备份不存在: {backup_path}")
return False
with open(os.path.join(backup_path, "manifest.json"), 'r') as f:
manifest = json.load(f)
for collection_name in manifest["collections"]:
self._restore_collection(collection_name, backup_path)
print(f"恢复完成: {backup_name}")
return True
def _restore_collection(self, collection_name: str, backup_path: str):
filename = f"{collection_name}.json"
with open(os.path.join(backup_path, filename), 'r', encoding='utf-8') as f:
data = json.load(f)
collection = self.client.get_or_create_collection(
name=data["name"],
metadata=data.get("metadata", {})
)
if data["documents"]:
batch_size = 1000
docs = data["documents"]
for i in range(0, len(docs), batch_size):
batch = docs[i:i+batch_size]
collection.add(
ids=[d["id"] for d in batch],
documents=[d["document"] for d in batch if d["document"]],
embeddings=[d["embedding"] for d in batch if d["embedding"]],
metadatas=[d["metadata"] for d in batch if d["metadata"]]
)
def list_backups(self) -> list:
backups = []
for name in os.listdir(self.backup_dir):
manifest_path = os.path.join(self.backup_dir, name, "manifest.json")
if os.path.exists(manifest_path):
with open(manifest_path, 'r') as f:
manifest = json.load(f)
backups.append(manifest)
return sorted(backups, key=lambda x: x["created_at"], reverse=True)
总结 #
通过本指南,你已经掌握了 Chroma 的完整知识体系:
- 基础入门:理解向量数据库概念,完成环境搭建
- 核心功能:掌握集合管理、文档操作、向量嵌入、查询检索
- 进阶应用:学会高级配置、框架集成、生产部署
- 最佳实践:掌握性能优化、安全配置、监控运维
继续实践和探索,你将能够构建出高效、可靠的向量数据库应用!
最后更新:2026-04-04