高级配置 #

持久化存储 #

内存模式 vs 持久化模式 #

text
┌─────────────────────────────────────────────────────────────┐
│                    存储模式对比                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  内存模式 (Ephemeral)                                       │
│  ├── 数据存储在内存中                                       │
│  ├── 程序结束数据丢失                                       │
│  ├── 适合测试和原型                                         │
│  └── 速度最快                                               │
│                                                             │
│  持久化模式 (Persistent)                                    │
│  ├── 数据存储在磁盘                                         │
│  ├── 程序重启数据保留                                       │
│  ├── 适合生产环境                                           │
│  └── 速度略慢于内存模式                                     │
│                                                             │
│  服务器模式 (Server)                                        │
│  ├── 独立服务器进程                                         │
│  ├── 支持多客户端                                           │
│  ├── 适合分布式部署                                         │
│  └── 需要额外配置                                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

持久化客户端 #

python
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")

collection = client.get_or_create_collection(name="documents")

collection.add(
    documents=["持久化存储的文档"],
    ids=["doc1"]
)

print("数据已保存到 ./chroma_db 目录")

持久化路径配置 #

python
import chromadb
import os

db_path = os.path.expanduser("~/data/chroma_db")

os.makedirs(db_path, exist_ok=True)

client = chromadb.PersistentClient(path=db_path)

print(f"数据库路径: {db_path}")

持久化数据管理 #

python
import chromadb
import shutil
import os

def backup_database(source_path: str, backup_path: str):
    if os.path.exists(backup_path):
        shutil.rmtree(backup_path)
    
    shutil.copytree(source_path, backup_path)
    print(f"备份完成: {backup_path}")

def restore_database(backup_path: str, target_path: str):
    if os.path.exists(target_path):
        shutil.rmtree(target_path)
    
    shutil.copytree(backup_path, target_path)
    print(f"恢复完成: {target_path}")

def clear_database(db_path: str):
    if os.path.exists(db_path):
        shutil.rmtree(db_path)
        print(f"数据库已清空: {db_path}")

client = chromadb.PersistentClient(path="./chroma_db")
backup_database("./chroma_db", "./chroma_backup")

服务器模式 #

启动 Chroma 服务器 #

bash
# 基本启动
chroma run --host localhost --port 8000 --path ./chroma_db

# 后台运行
chroma run --host 0.0.0.0 --port 8000 --path ./chroma_db &

# Docker 启动
docker run -p 8000:8000 chromadb/chroma

连接服务器 #

python
import chromadb

client = chromadb.HttpClient(
    host="localhost",
    port=8000
)

collection = client.get_or_create_collection(name="server_docs")

collection.add(
    documents=["服务器模式文档"],
    ids=["server_doc1"]
)

print("连接服务器成功")

服务器配置 #

python
import chromadb

client = chromadb.HttpClient(
    host="localhost",
    port=8000,
    credentials=chromadb.BasicAuthCredentials(
        username="admin",
        password="password123"
    )
)

异步客户端 #

python
import chromadb
import asyncio

async def main():
    client = await chromadb.AsyncHttpClient(
        host="localhost",
        port=8000
    )
    
    collection = await client.get_or_create_collection(name="async_docs")
    
    await collection.add(
        documents=["异步操作文档"],
        ids=["async_doc1"]
    )
    
    results = await collection.query(
        query_texts=["异步查询"],
        n_results=1
    )
    
    print(results)

asyncio.run(main())

性能优化 #

HNSW 索引优化 #

python
import chromadb

client = chromadb.Client()

high_recall_collection = client.create_collection(
    name="high_recall",
    metadata={
        "hnsw:space": "cosine",
        "hnsw:construction_ef": 400,
        "hnsw:M": 64
    }
)

fast_insert_collection = client.create_collection(
    name="fast_insert",
    metadata={
        "hnsw:space": "cosine",
        "hnsw:construction_ef": 50,
        "hnsw:M": 8
    }
)

HNSW 参数详解 #

text
┌─────────────────────────────────────────────────────────────┐
│                    HNSW 参数优化                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  construction_ef (构建时候选列表大小)                       │
│  ├── 默认: 100                                              │
│  ├── 高召回率: 200-400                                      │
│  ├── 快速构建: 50-100                                       │
│  └── 影响: 构建时间和索引质量                               │
│                                                             │
│  M (每个节点的连接数)                                        │
│  ├── 默认: 16                                               │
│  ├── 高召回率: 32-64                                        │
│  ├── 低内存: 8-16                                           │
│  └── 影响: 召回率和内存占用                                 │
│                                                             │
│  batch_size (批量添加大小)                                   │
│  ├── 默认: 100                                              │
│  ├── 大数据集: 1000-5000                                    │
│  └── 影响: 添加速度                                         │
│                                                             │
│  sync_threshold (同步阈值)                                   │
│  ├── 默认: 1000                                             │
│  └── 影响: 持久化频率                                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

批量操作优化 #

python
import chromadb
from typing import List, Dict, Any

def optimized_batch_add(
    collection,
    documents: List[str],
    ids: List[str],
    metadatas: List[Dict[str, Any]] = None,
    batch_size: int = 1000
):
    total = len(documents)
    
    for i in range(0, total, batch_size):
        end = min(i + batch_size, total)
        
        batch_docs = documents[i:end]
        batch_ids = ids[i:end]
        batch_metas = metadatas[i:end] if metadatas else None
        
        collection.add(
            documents=batch_docs,
            ids=batch_ids,
            metadatas=batch_metas
        )
        
        if end % 10000 == 0 or end == total:
            print(f"进度: {end}/{total}")

client = chromadb.Client()
collection = client.create_collection(name="optimized")

documents = [f"文档内容 {i}" for i in range(100000)]
ids = [f"doc{i}" for i in range(100000)]
metadatas = [{"batch": i // 1000} for i in range(100000)]

optimized_batch_add(collection, documents, ids, metadatas, batch_size=5000)

查询优化 #

python
def optimized_query(collection, query_texts: List[str], n_results: int = 10):
    results = collection.query(
        query_texts=query_texts,
        n_results=n_results,
        include=["documents", "distances"]
    )
    
    return results

def cached_query(collection, query_cache: dict, query_text: str, n_results: int = 5):
    cache_key = f"{query_text}:{n_results}"
    
    if cache_key in query_cache:
        return query_cache[cache_key]
    
    results = collection.query(
        query_texts=[query_text],
        n_results=n_results
    )
    
    query_cache[cache_key] = results
    return results

内存管理 #

内存监控 #

python
import psutil
import os

def get_memory_usage():
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    return memory_mb

def monitor_collection_memory(collection):
    count = collection.count()
    memory = get_memory_usage()
    
    print(f"文档数量: {count}")
    print(f"内存使用: {memory:.2f} MB")
    print(f"平均每文档: {memory/count:.4f} MB" if count > 0 else "")

内存优化策略 #

python
import chromadb

def create_memory_efficient_collection(client, name: str):
    return client.create_collection(
        name=name,
        metadata={
            "hnsw:M": 8,
            "hnsw:construction_ef": 50
        }
    )

def clear_collection_memory(collection):
    all_ids = collection.get()['ids']
    
    if all_ids:
        batch_size = 1000
        for i in range(0, len(all_ids), batch_size):
            batch_ids = all_ids[i:i+batch_size]
            collection.delete(ids=batch_ids)

并发处理 #

多线程操作 #

python
import chromadb
from concurrent.futures import ThreadPoolExecutor
import threading

client = chromadb.HttpClient(host="localhost", port=8000)
collection = client.get_or_create_collection(name="concurrent_docs")

lock = threading.Lock()

def add_documents_batch(batch_id: int, documents: list, ids: list):
    with lock:
        collection.add(
            documents=documents,
            ids=ids
        )
    print(f"批次 {batch_id} 完成")

def parallel_add(total_docs: int = 10000, batch_size: int = 1000, workers: int = 4):
    all_docs = [f"文档 {i}" for i in range(total_docs)]
    all_ids = [f"doc{i}" for i in range(total_docs)]
    
    batches = []
    for i in range(0, total_docs, batch_size):
        batch_id = i // batch_size
        batch_docs = all_docs[i:i+batch_size]
        batch_ids = all_ids[i:i+batch_size]
        batches.append((batch_id, batch_docs, batch_ids))
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        executor.map(lambda b: add_documents_batch(*b), batches)

parallel_add(total_docs=10000, batch_size=1000, workers=4)

异步批量操作 #

python
import asyncio
import chromadb

async def async_batch_add(collection, documents, ids, batch_size=100):
    for i in range(0, len(documents), batch_size):
        batch_docs = documents[i:i+batch_size]
        batch_ids = ids[i:i+batch_size]
        
        await collection.add(
            documents=batch_docs,
            ids=batch_ids
        )
        
        if (i + batch_size) % 1000 == 0:
            print(f"已添加 {i + batch_size} 个文档")

async def main():
    client = await chromadb.AsyncHttpClient(host="localhost", port=8000)
    collection = await client.get_or_create_collection(name="async_batch")
    
    documents = [f"异步文档 {i}" for i in range(5000)]
    ids = [f"async_doc{i}" for i in range(5000)]
    
    await async_batch_add(collection, documents, ids)

asyncio.run(main())

数据迁移 #

导出数据 #

python
import json
import chromadb

def export_collection_to_json(collection, output_file: str):
    data = collection.get(include=["documents", "metadatas", "embeddings"])
    
    export_data = {
        "name": collection.name,
        "metadata": collection.metadata,
        "count": len(data['ids']),
        "documents": []
    }
    
    for i in range(len(data['ids'])):
        doc = {
            "id": data['ids'][i],
            "document": data['documents'][i] if data['documents'] else None,
            "embedding": data['embeddings'][i] if data['embeddings'] else None,
            "metadata": data['metadatas'][i] if data['metadatas'] else None
        }
        export_data["documents"].append(doc)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(export_data, f, ensure_ascii=False, indent=2)
    
    print(f"导出完成: {output_file}")
    print(f"文档数量: {export_data['count']}")

client = chromadb.Client()
collection = client.get_collection("documents")
export_collection_to_json(collection, "export.json")

导入数据 #

python
import json
import chromadb

def import_collection_from_json(client, input_file: str):
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    collection = client.get_or_create_collection(
        name=data["name"],
        metadata=data.get("metadata", {})
    )
    
    if data["documents"]:
        batch_size = 1000
        docs = data["documents"]
        
        for i in range(0, len(docs), batch_size):
            batch = docs[i:i+batch_size]
            
            collection.add(
                ids=[d["id"] for d in batch],
                documents=[d["document"] for d in batch if d["document"]],
                embeddings=[d["embedding"] for d in batch if d["embedding"]],
                metadatas=[d["metadata"] for d in batch if d["metadata"]]
            )
    
    print(f"导入完成: {data['name']}")
    print(f"文档数量: {len(data['documents'])}")
    
    return collection

client = chromadb.Client()
collection = import_collection_from_json(client, "export.json")

监控与日志 #

日志配置 #

python
import logging
import chromadb

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('chroma.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('chroma')

class LoggingCollection:
    def __init__(self, collection):
        self.collection = collection
    
    def add(self, **kwargs):
        logger.info(f"添加文档: {len(kwargs.get('ids', []))} 个")
        result = self.collection.add(**kwargs)
        logger.info("添加完成")
        return result
    
    def query(self, **kwargs):
        logger.info(f"查询: {kwargs.get('query_texts', [])}")
        result = self.collection.query(**kwargs)
        logger.info(f"查询结果: {len(result['ids'][0])} 个")
        return result
    
    def delete(self, **kwargs):
        logger.info(f"删除文档: {kwargs}")
        result = self.collection.delete(**kwargs)
        logger.info("删除完成")
        return result

client = chromadb.Client()
base_collection = client.create_collection(name="logged_docs")
collection = LoggingCollection(base_collection)

性能监控 #

python
import time
from functools import wraps

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        print(f"{func.__name__} 耗时: {end_time - start_time:.4f} 秒")
        return result
    return wrapper

class MonitoredCollection:
    def __init__(self, collection):
        self.collection = collection
        self.stats = {
            'add_count': 0,
            'query_count': 0,
            'delete_count': 0,
            'total_add_time': 0,
            'total_query_time': 0
        }
    
    @timing_decorator
    def add(self, **kwargs):
        start = time.time()
        result = self.collection.add(**kwargs)
        self.stats['add_count'] += len(kwargs.get('ids', []))
        self.stats['total_add_time'] += time.time() - start
        return result
    
    @timing_decorator
    def query(self, **kwargs):
        start = time.time()
        result = self.collection.query(**kwargs)
        self.stats['query_count'] += 1
        self.stats['total_query_time'] += time.time() - start
        return result
    
    def get_stats(self):
        return {
            **self.stats,
            'avg_add_time': self.stats['total_add_time'] / max(1, self.stats['add_count']),
            'avg_query_time': self.stats['total_query_time'] / max(1, self.stats['query_count'])
        }

下一步 #

现在你已经掌握了高级配置,接下来学习 集成与扩展,了解如何将 Chroma 集成到主流 AI 框架中!

最后更新:2026-04-04