最佳实践 #

生产部署架构 #

架构设计 #

text
┌─────────────────────────────────────────────────────────────┐
│                    生产部署架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐  │
│  │   客户端    │     │   客户端    │     │   客户端    │  │
│  └──────┬──────┘     └──────┬──────┘     └──────┬──────┘  │
│         │                   │                   │          │
│         └───────────────────┼───────────────────┘          │
│                             │                              │
│                    ┌────────▼────────┐                     │
│                    │   负载均衡器    │                     │
│                    └────────┬────────┘                     │
│                             │                              │
│         ┌───────────────────┼───────────────────┐          │
│         │                   │                   │          │
│  ┌──────▼──────┐     ┌──────▼──────┐     ┌──────▼──────┐  │
│  │ Chroma 节点 │     │ Chroma 节点 │     │ Chroma 节点 │  │
│  │    (主)     │     │    (从)     │     │    (从)     │  │
│  └──────┬──────┘     └──────┬──────┘     └──────┬──────┘  │
│         │                   │                   │          │
│         └───────────────────┼───────────────────┘          │
│                             │                              │
│                    ┌────────▼────────┐                     │
│                    │   共享存储      │                     │
│                    │  (持久化数据)   │                     │
│                    └─────────────────┘                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

部署清单 #

text
┌─────────────────────────────────────────────────────────────┐
│                    部署前检查清单                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  硬件资源                                                   │
│  □ CPU: 4核以上                                             │
│  □ 内存: 16GB以上                                           │
│  □ 存储: SSD,预留3倍数据大小                               │
│  □ 网络: 低延迟,高带宽                                     │
│                                                             │
│  软件环境                                                   │
│  □ Python 3.10+                                             │
│  □ 足够的文件描述符限制                                     │
│  □ 防火墙配置                                               │
│  □ SSL 证书                                                 │
│                                                             │
│  安全配置                                                   │
│  □ 访问控制                                                 │
│  □ 数据加密                                                 │
│  □ 网络隔离                                                 │
│  □ 审计日志                                                 │
│                                                             │
│  监控告警                                                   │
│  □ 性能监控                                                 │
│  □ 资源监控                                                 │
│  □ 错误告警                                                 │
│  □ 日志收集                                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

性能优化 #

索引优化 #

python
import chromadb

def create_optimized_collection(client, name: str, use_case: str = "balanced"):
    configs = {
        "high_recall": {
            "hnsw:space": "cosine",
            "hnsw:construction_ef": 400,
            "hnsw:M": 64
        },
        "balanced": {
            "hnsw:space": "cosine",
            "hnsw:construction_ef": 200,
            "hnsw:M": 32
        },
        "fast_insert": {
            "hnsw:space": "cosine",
            "hnsw:construction_ef": 50,
            "hnsw:M": 8
        },
        "memory_efficient": {
            "hnsw:space": "cosine",
            "hnsw:construction_ef": 100,
            "hnsw:M": 8
        }
    }
    
    return client.create_collection(
        name=name,
        metadata=configs.get(use_case, configs["balanced"])
    )

批量操作最佳实践 #

python
import chromadb
from typing import List, Dict, Any

class BatchProcessor:
    def __init__(self, collection, batch_size: int = 1000):
        self.collection = collection
        self.batch_size = batch_size
    
    def add_documents(
        self,
        documents: List[str],
        ids: List[str],
        metadatas: List[Dict[str, Any]] = None
    ):
        total = len(documents)
        
        for i in range(0, total, self.batch_size):
            end = min(i + self.batch_size, total)
            
            batch_docs = documents[i:end]
            batch_ids = ids[i:end]
            batch_metas = metadatas[i:end] if metadatas else None
            
            try:
                self.collection.add(
                    documents=batch_docs,
                    ids=batch_ids,
                    metadatas=batch_metas
                )
            except Exception as e:
                print(f"批次 {i//self.batch_size} 失败: {e}")
                self._retry_batch(batch_docs, batch_ids, batch_metas)
            
            if end % 10000 == 0 or end == total:
                print(f"进度: {end}/{total}")
    
    def _retry_batch(self, docs, ids, metadatas, max_retries=3):
        for attempt in range(max_retries):
            try:
                self.collection.add(
                    documents=docs,
                    ids=ids,
                    metadatas=metadatas
                )
                return
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                print(f"重试 {attempt + 1}/{max_retries}")

查询优化 #

python
from functools import lru_cache
import hashlib

class OptimizedQuery:
    def __init__(self, collection, cache_size: int = 1000):
        self.collection = collection
        self.cache_size = cache_size
        self._cache = {}
    
    def query(
        self,
        query_text: str,
        n_results: int = 5,
        where: dict = None,
        use_cache: bool = True
    ):
        cache_key = self._make_cache_key(query_text, n_results, where)
        
        if use_cache and cache_key in self._cache:
            return self._cache[cache_key]
        
        results = self.collection.query(
            query_texts=[query_text],
            n_results=n_results,
            where=where
        )
        
        if use_cache:
            self._update_cache(cache_key, results)
        
        return results
    
    def _make_cache_key(self, query_text: str, n_results: int, where: dict) -> str:
        key_data = f"{query_text}:{n_results}:{where}"
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def _update_cache(self, key: str, results: dict):
        if len(self._cache) >= self.cache_size:
            self._cache.pop(next(iter(self._cache)))
        
        self._cache[key] = results
    
    def clear_cache(self):
        self._cache.clear()

安全配置 #

访问控制 #

python
import chromadb
from functools import wraps
from typing import Callable

def require_auth(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if not self.is_authenticated:
            raise PermissionError("未授权访问")
        return func(self, *args, **kwargs)
    return wrapper

class SecureChromaClient:
    def __init__(self, host: str, port: int, username: str, password: str):
        self.client = chromadb.HttpClient(
            host=host,
            port=port,
            credentials=chromadb.BasicAuthCredentials(
                username=username,
                password=password
            )
        )
        self.is_authenticated = True
    
    @require_auth
    def get_collection(self, name: str):
        return self.client.get_collection(name)
    
    @require_auth
    def create_collection(self, name: str, **kwargs):
        return self.client.create_collection(name, **kwargs)
    
    @require_auth
    def delete_collection(self, name: str):
        return self.client.delete_collection(name)

数据加密 #

python
from cryptography.fernet import Fernet
import base64
import json

class EncryptedMetadata:
    def __init__(self, key: bytes = None):
        self.key = key or Fernet.generate_key()
        self.cipher = Fernet(self.key)
    
    def encrypt_metadata(self, metadata: dict) -> str:
        json_data = json.dumps(metadata)
        encrypted = self.cipher.encrypt(json_data.encode())
        return base64.urlsafe_b64encode(encrypted).decode()
    
    def decrypt_metadata(self, encrypted_data: str) -> dict:
        encrypted = base64.urlsafe_b64decode(encrypted_data.encode())
        decrypted = self.cipher.decrypt(encrypted)
        return json.loads(decrypted.decode())
    
    def encrypt_document(self, document: str) -> str:
        encrypted = self.cipher.encrypt(document.encode())
        return base64.urlsafe_b64encode(encrypted).decode()
    
    def decrypt_document(self, encrypted_data: str) -> str:
        encrypted = base64.urlsafe_b64decode(encrypted_data.encode())
        decrypted = self.cipher.decrypt(encrypted)
        return decrypted.decode()

encryptor = EncryptedMetadata()

encrypted_meta = encryptor.encrypt_metadata({"secret": "data"})
encrypted_doc = encryptor.encrypt_document("敏感文档内容")

网络安全 #

python
import chromadb
from typing import Optional

class SecureClientFactory:
    @staticmethod
    def create_client(
        host: str,
        port: int,
        ssl: bool = True,
        verify_ssl: bool = True,
        username: Optional[str] = None,
        password: Optional[str] = None
    ):
        settings = chromadb.config.Settings(
            chroma_api_impl="chromadb.api.fastapi.FastAPI",
            chroma_server_host=host,
            chroma_server_http_port=port,
            chroma_server_ssl_enabled=ssl,
            chroma_server_ssl_verify=verify_ssl
        )
        
        if username and password:
            settings.chroma_server_auth_credentials = {
                "username": username,
                "password": password
            }
        
        return chromadb.Client(settings)

监控与运维 #

健康检查 #

python
import chromadb
import time
from typing import Dict, Any

class HealthChecker:
    def __init__(self, client):
        self.client = client
    
    def check_health(self) -> Dict[str, Any]:
        health = {
            "status": "healthy",
            "timestamp": time.time(),
            "checks": {}
        }
        
        try:
            health["checks"]["connection"] = self._check_connection()
        except Exception as e:
            health["checks"]["connection"] = {"status": "unhealthy", "error": str(e)}
            health["status"] = "unhealthy"
        
        try:
            health["checks"]["collections"] = self._check_collections()
        except Exception as e:
            health["checks"]["collections"] = {"status": "unhealthy", "error": str(e)}
        
        try:
            health["checks"]["query"] = self._check_query()
        except Exception as e:
            health["checks"]["query"] = {"status": "unhealthy", "error": str(e)}
        
        return health
    
    def _check_connection(self) -> Dict[str, Any]:
        start = time.time()
        self.client.heartbeat()
        latency = time.time() - start
        
        return {
            "status": "healthy",
            "latency_ms": latency * 1000
        }
    
    def _check_collections(self) -> Dict[str, Any]:
        collections = self.client.list_collections()
        return {
            "status": "healthy",
            "count": len(collections)
        }
    
    def _check_query(self) -> Dict[str, Any]:
        collections = self.client.list_collections()
        
        if not collections:
            return {"status": "healthy", "message": "No collections to test"}
        
        test_collection = collections[0]
        start = time.time()
        test_collection.get(limit=1)
        latency = time.time() - start
        
        return {
            "status": "healthy",
            "latency_ms": latency * 1000
        }

性能监控 #

python
import time
import psutil
import os
from dataclasses import dataclass
from typing import List
import json

@dataclass
class Metric:
    name: str
    value: float
    timestamp: float
    tags: dict

class PerformanceMonitor:
    def __init__(self, collection):
        self.collection = collection
        self.metrics: List[Metric] = []
        self.process = psutil.Process(os.getpid())
    
    def record_operation(self, operation: str, duration: float, success: bool = True):
        metric = Metric(
            name=f"chroma_{operation}_duration",
            value=duration,
            timestamp=time.time(),
            tags={"success": success}
        )
        self.metrics.append(metric)
    
    def record_memory(self):
        memory_info = self.process.memory_info()
        
        self.metrics.append(Metric(
            name="chroma_memory_rss_mb",
            value=memory_info.rss / 1024 / 1024,
            timestamp=time.time(),
            tags={}
        ))
        
        self.metrics.append(Metric(
            name="chroma_memory_vms_mb",
            value=memory_info.vms / 1024 / 1024,
            timestamp=time.time(),
            tags={}
        ))
    
    def record_collection_stats(self):
        count = self.collection.count()
        
        self.metrics.append(Metric(
            name="chroma_document_count",
            value=count,
            timestamp=time.time(),
            tags={"collection": self.collection.name}
        ))
    
    def get_summary(self) -> dict:
        operations = {}
        
        for metric in self.metrics:
            if metric.name.endswith("_duration"):
                op = metric.name.replace("chroma_", "").replace("_duration", "")
                
                if op not in operations:
                    operations[op] = {"count": 0, "total_time": 0, "errors": 0}
                
                operations[op]["count"] += 1
                operations[op]["total_time"] += metric.value
                
                if not metric.tags.get("success", True):
                    operations[op]["errors"] += 1
        
        summary = {}
        for op, data in operations.items():
            summary[op] = {
                "count": data["count"],
                "avg_time_ms": (data["total_time"] / data["count"]) * 1000,
                "error_rate": data["errors"] / data["count"]
            }
        
        return summary
    
    def export_metrics(self, filepath: str):
        data = [
            {
                "name": m.name,
                "value": m.value,
                "timestamp": m.timestamp,
                "tags": m.tags
            }
            for m in self.metrics
        ]
        
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)

日志管理 #

python
import logging
import json
from datetime import datetime
from typing import Any, Dict

class StructuredLogger:
    def __init__(self, name: str, log_file: str = None):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        formatter = logging.Formatter('%(message)s')
        
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)
        
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)
    
    def log_operation(
        self,
        operation: str,
        collection: str = None,
        duration: float = None,
        success: bool = True,
        **kwargs
    ):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "operation": operation,
            "collection": collection,
            "duration_ms": duration * 1000 if duration else None,
            "success": success,
            **kwargs
        }
        
        if success:
            self.logger.info(json.dumps(log_entry))
        else:
            self.logger.error(json.dumps(log_entry))
    
    def log_error(self, operation: str, error: Exception, **kwargs):
        self.log_operation(
            operation=operation,
            success=False,
            error_type=type(error).__name__,
            error_message=str(error),
            **kwargs
        )

logger = StructuredLogger("chroma", "chroma.log")

logger.log_operation(
    operation="add_documents",
    collection="documents",
    duration=0.5,
    document_count=100
)

故障排查 #

常见问题诊断 #

python
class Diagnostics:
    def __init__(self, client):
        self.client = client
    
    def diagnose(self) -> dict:
        results = {
            "connection": self._test_connection(),
            "collections": self._test_collections(),
            "storage": self._test_storage(),
            "performance": self._test_performance()
        }
        
        return results
    
    def _test_connection(self) -> dict:
        try:
            self.client.heartbeat()
            return {"status": "ok", "message": "连接正常"}
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    def _test_collections(self) -> dict:
        try:
            collections = self.client.list_collections()
            return {
                "status": "ok",
                "count": len(collections),
                "names": [c.name for c in collections]
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    def _test_storage(self) -> dict:
        try:
            test_collection = self.client.get_or_create_collection("diagnostic_test")
            
            test_collection.add(
                documents=["test"],
                ids=["test_id"]
            )
            
            result = test_collection.get(ids=["test_id"])
            
            test_collection.delete(ids=["test_id"])
            
            return {"status": "ok", "message": "存储读写正常"}
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    def _test_performance(self) -> dict:
        import time
        
        try:
            test_collection = self.client.get_or_create_collection("perf_test")
            
            docs = [f"test document {i}" for i in range(100)]
            ids = [f"test_{i}" for i in range(100)]
            
            start = time.time()
            test_collection.add(documents=docs, ids=ids)
            add_time = time.time() - start
            
            start = time.time()
            test_collection.query(query_texts=["test"], n_results=10)
            query_time = time.time() - start
            
            test_collection.delete(ids=ids)
            
            return {
                "status": "ok",
                "add_time_ms": add_time * 1000,
                "query_time_ms": query_time * 1000
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

错误处理最佳实践 #

python
import chromadb
from typing import Optional, List, Dict, Any
import time

class RobustChromaClient:
    def __init__(self, client, max_retries: int = 3, retry_delay: float = 1.0):
        self.client = client
        self.max_retries = max_retries
        self.retry_delay = retry_delay
    
    def safe_add(
        self,
        collection_name: str,
        documents: List[str],
        ids: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None
    ) -> bool:
        for attempt in range(self.max_retries):
            try:
                collection = self.client.get_collection(collection_name)
                collection.add(
                    documents=documents,
                    ids=ids,
                    metadatas=metadatas
                )
                return True
            
            except chromadb.errors.InvalidCollectionException:
                try:
                    self.client.create_collection(collection_name)
                    continue
                except Exception as e:
                    print(f"创建集合失败: {e}")
                    return False
            
            except chromadb.errors.DuplicateIDError:
                print(f"ID 重复,尝试更新")
                try:
                    collection = self.client.get_collection(collection_name)
                    collection.upsert(
                        documents=documents,
                        ids=ids,
                        metadatas=metadatas
                    )
                    return True
                except Exception as e:
                    print(f"更新失败: {e}")
                    return False
            
            except Exception as e:
                print(f"添加失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        return False
    
    def safe_query(
        self,
        collection_name: str,
        query_texts: List[str],
        n_results: int = 5,
        where: Optional[Dict] = None
    ) -> Optional[Dict]:
        for attempt in range(self.max_retries):
            try:
                collection = self.client.get_collection(collection_name)
                return collection.query(
                    query_texts=query_texts,
                    n_results=n_results,
                    where=where
                )
            
            except chromadb.errors.InvalidCollectionException:
                print(f"集合不存在: {collection_name}")
                return None
            
            except Exception as e:
                print(f"查询失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        return None

备份与恢复 #

python
import chromadb
import json
import shutil
import os
from datetime import datetime
from typing import Optional

class BackupManager:
    def __init__(self, client, backup_dir: str = "./backups"):
        self.client = client
        self.backup_dir = backup_dir
        os.makedirs(backup_dir, exist_ok=True)
    
    def create_backup(self, backup_name: Optional[str] = None) -> str:
        if backup_name is None:
            backup_name = datetime.now().strftime("backup_%Y%m%d_%H%M%S")
        
        backup_path = os.path.join(self.backup_dir, backup_name)
        os.makedirs(backup_path, exist_ok=True)
        
        collections = self.client.list_collections()
        
        for collection in collections:
            self._backup_collection(collection, backup_path)
        
        manifest = {
            "backup_name": backup_name,
            "created_at": datetime.now().isoformat(),
            "collections": [c.name for c in collections]
        }
        
        with open(os.path.join(backup_path, "manifest.json"), 'w') as f:
            json.dump(manifest, f, indent=2)
        
        print(f"备份完成: {backup_path}")
        return backup_path
    
    def _backup_collection(self, collection, backup_path: str):
        data = collection.get(include=["documents", "metadatas", "embeddings"])
        
        collection_data = {
            "name": collection.name,
            "metadata": collection.metadata,
            "documents": []
        }
        
        for i in range(len(data['ids'])):
            doc = {
                "id": data['ids'][i],
                "document": data['documents'][i] if data['documents'] else None,
                "embedding": data['embeddings'][i] if data['embeddings'] else None,
                "metadata": data['metadatas'][i] if data['metadatas'] else None
            }
            collection_data["documents"].append(doc)
        
        filename = f"{collection.name}.json"
        with open(os.path.join(backup_path, filename), 'w', encoding='utf-8') as f:
            json.dump(collection_data, f, ensure_ascii=False, indent=2)
    
    def restore_backup(self, backup_name: str) -> bool:
        backup_path = os.path.join(self.backup_dir, backup_name)
        
        if not os.path.exists(backup_path):
            print(f"备份不存在: {backup_path}")
            return False
        
        with open(os.path.join(backup_path, "manifest.json"), 'r') as f:
            manifest = json.load(f)
        
        for collection_name in manifest["collections"]:
            self._restore_collection(collection_name, backup_path)
        
        print(f"恢复完成: {backup_name}")
        return True
    
    def _restore_collection(self, collection_name: str, backup_path: str):
        filename = f"{collection_name}.json"
        with open(os.path.join(backup_path, filename), 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        collection = self.client.get_or_create_collection(
            name=data["name"],
            metadata=data.get("metadata", {})
        )
        
        if data["documents"]:
            batch_size = 1000
            docs = data["documents"]
            
            for i in range(0, len(docs), batch_size):
                batch = docs[i:i+batch_size]
                
                collection.add(
                    ids=[d["id"] for d in batch],
                    documents=[d["document"] for d in batch if d["document"]],
                    embeddings=[d["embedding"] for d in batch if d["embedding"]],
                    metadatas=[d["metadata"] for d in batch if d["metadata"]]
                )
    
    def list_backups(self) -> list:
        backups = []
        
        for name in os.listdir(self.backup_dir):
            manifest_path = os.path.join(self.backup_dir, name, "manifest.json")
            
            if os.path.exists(manifest_path):
                with open(manifest_path, 'r') as f:
                    manifest = json.load(f)
                backups.append(manifest)
        
        return sorted(backups, key=lambda x: x["created_at"], reverse=True)

总结 #

通过本指南,你已经掌握了 Chroma 的完整知识体系:

  1. 基础入门:理解向量数据库概念,完成环境搭建
  2. 核心功能:掌握集合管理、文档操作、向量嵌入、查询检索
  3. 进阶应用:学会高级配置、框架集成、生产部署
  4. 最佳实践:掌握性能优化、安全配置、监控运维

继续实践和探索,你将能够构建出高效、可靠的向量数据库应用!

最后更新:2026-04-04