文档操作 #

文档结构 #

在 Chroma 中,每个文档由以下部分组成:

text
┌─────────────────────────────────────────────────────────────┐
│                    文档结构                                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  id: 唯一标识符 (必需)                                      │
│  ├── 类型: str                                              │
│  └── 用途: 标识和引用文档                                   │
│                                                             │
│  document: 文本内容 (可选)                                   │
│  ├── 类型: str                                              │
│  └── 用途: 存储原始文本,自动生成嵌入                       │
│                                                             │
│  embedding: 向量嵌入 (可选)                                  │
│  ├── 类型: List[float]                                      │
│  └── 用途: 预计算的向量,跳过嵌入生成                       │
│                                                             │
│  metadata: 元数据 (可选)                                     │
│  ├── 类型: Dict[str, str/int/float/bool]                    │
│  └── 用途: 过滤、分类、存储额外信息                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

添加文档 #

基本添加 #

python
import chromadb

client = chromadb.Client()
collection = client.create_collection(name="documents")

collection.add(
    documents=["这是第一个文档", "这是第二个文档"],
    ids=["doc1", "doc2"]
)

print(f"添加了 {collection.count()} 个文档")

带元数据添加 #

python
collection.add(
    documents=[
        "Python 是编程语言",
        "JavaScript 用于网页开发"
    ],
    metadatas=[
        {"category": "programming", "language": "Python"},
        {"category": "programming", "language": "JavaScript"}
    ],
    ids=["doc3", "doc4"]
)

带嵌入添加 #

python
collection.add(
    embeddings=[
        [0.1, 0.2, 0.3, 0.4],
        [0.5, 0.6, 0.7, 0.8]
    ],
    documents=["预计算嵌入的文档1", "预计算嵌入的文档2"],
    ids=["doc5", "doc6"]
)

批量添加 #

python
documents = [f"文档内容 {i}" for i in range(1000)]
ids = [f"doc{i}" for i in range(1000)]
metadatas = [{"batch": i // 100} for i in range(1000)]

batch_size = 100
for i in range(0, len(documents), batch_size):
    collection.add(
        documents=documents[i:i+batch_size],
        ids=ids[i:i+batch_size],
        metadatas=metadatas[i:i+batch_size]
    )
    
    if (i + batch_size) % 500 == 0:
        print(f"已添加 {i + batch_size} 个文档")

print(f"总共添加了 {collection.count()} 个文档")

添加最佳实践 #

python
def add_documents_safe(collection, documents, ids, metadatas=None, batch_size=100):
    existing_ids = set(collection.get()['ids'])
    
    new_docs = []
    new_ids = []
    new_metas = []
    
    for i, doc_id in enumerate(ids):
        if doc_id not in existing_ids:
            new_docs.append(documents[i])
            new_ids.append(doc_id)
            if metadatas:
                new_metas.append(metadatas[i])
    
    if not new_docs:
        print("没有新文档需要添加")
        return
    
    for i in range(0, len(new_docs), batch_size):
        batch_docs = new_docs[i:i+batch_size]
        batch_ids = new_ids[i:i+batch_size]
        batch_metas = new_metas[i:i+batch_size] if new_metas else None
        
        collection.add(
            documents=batch_docs,
            ids=batch_ids,
            metadatas=batch_metas if batch_metas else None
        )
    
    print(f"添加了 {len(new_docs)} 个新文档")

获取文档 #

获取所有文档 #

python
results = collection.get()

print(f"文档数量: {len(results['ids'])}")
for doc_id, doc in zip(results['ids'], results['documents']):
    print(f"{doc_id}: {doc}")

按 ID 获取 #

python
results = collection.get(
    ids=["doc1", "doc2"]
)

for doc_id, doc in zip(results['ids'], results['documents']):
    print(f"{doc_id}: {doc}")

获取特定字段 #

python
results = collection.get(
    ids=["doc1", "doc2"],
    include=["documents", "metadatas"]
)

print(results['documents'])
print(results['metadatas'])

包含嵌入 #

python
results = collection.get(
    ids=["doc1"],
    include=["documents", "embeddings", "metadatas"]
)

print(f"嵌入维度: {len(results['embeddings'][0])}")

按条件获取 #

python
results = collection.get(
    where={"category": "programming"},
    include=["documents", "metadatas"]
)

for doc, meta in zip(results['documents'], results['metadatas']):
    print(f"{meta['language']}: {doc}")

更新文档 #

更新文档内容 #

python
collection.update(
    ids=["doc1"],
    documents=["更新后的文档内容"]
)

results = collection.get(ids=["doc1"])
print(results['documents'][0])

更新元数据 #

python
collection.update(
    ids=["doc1"],
    metadatas=[{"category": "updated", "version": "2.0"}]
)

results = collection.get(ids=["doc1"])
print(results['metadatas'][0])

更新嵌入 #

python
collection.update(
    ids=["doc1"],
    embeddings=[[0.1, 0.2, 0.3, 0.4, 0.5]]
)

批量更新 #

python
ids = ["doc1", "doc2", "doc3"]
documents = ["更新文档1", "更新文档2", "更新文档3"]
metadatas = [
    {"updated": True, "version": "2.0"},
    {"updated": True, "version": "2.0"},
    {"updated": True, "version": "2.0"}
]

collection.update(
    ids=ids,
    documents=documents,
    metadatas=metadatas
)

Upsert 操作 #

python
collection.upsert(
    ids=["doc1", "doc_new"],
    documents=["更新文档1", "新文档"],
    metadatas=[
        {"type": "update"},
        {"type": "new"}
    ]
)

print("Upsert 完成")

更新最佳实践 #

python
def update_document_safe(collection, doc_id, document=None, metadata=None):
    try:
        existing = collection.get(ids=[doc_id])
        
        if not existing['ids']:
            print(f"文档 {doc_id} 不存在,将创建新文档")
            collection.add(
                ids=[doc_id],
                documents=[document] if document else None,
                metadatas=[metadata] if metadata else None
            )
        else:
            update_data = {"ids": [doc_id]}
            
            if document:
                update_data["documents"] = [document]
            
            if metadata:
                existing_meta = existing['metadatas'][0] or {}
                existing_meta.update(metadata)
                update_data["metadatas"] = [existing_meta]
            
            collection.update(**update_data)
            print(f"文档 {doc_id} 更新成功")
    
    except Exception as e:
        print(f"更新失败: {e}")

删除文档 #

按 ID 删除 #

python
collection.delete(ids=["doc1"])

print(f"剩余文档数: {collection.count()}")

批量删除 #

python
ids_to_delete = ["doc2", "doc3", "doc4"]

collection.delete(ids=ids_to_delete)

print(f"删除了 {len(ids_to_delete)} 个文档")

按条件删除 #

python
collection.delete(
    where={"category": "old"}
)

print("已删除所有 category 为 old 的文档")

删除所有文档 #

python
all_ids = collection.get()['ids']

if all_ids:
    collection.delete(ids=all_ids)
    print(f"删除了 {len(all_ids)} 个文档")
else:
    print("集合为空")

删除最佳实践 #

python
def delete_documents_safe(collection, ids):
    existing_ids = set(collection.get()['ids'])
    
    ids_to_delete = [id for id in ids if id in existing_ids]
    
    if not ids_to_delete:
        print("没有文档需要删除")
        return
    
    collection.delete(ids=ids_to_delete)
    print(f"删除了 {len(ids_to_delete)} 个文档")

def delete_by_metadata(collection, metadata_filter):
    results = collection.get(
        where=metadata_filter,
        include=[]
    )
    
    if results['ids']:
        collection.delete(ids=results['ids'])
        print(f"删除了 {len(results['ids'])} 个匹配的文档")
    else:
        print("没有匹配的文档")

文档操作示例 #

示例 1:文档版本管理 #

python
import chromadb
from datetime import datetime

client = chromadb.Client()
docs = client.create_collection(name="versioned_docs")

def add_version(collection, doc_id, content, version=None):
    if version is None:
        version = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    versioned_id = f"{doc_id}_v{version}"
    
    collection.add(
        documents=[content],
        ids=[versioned_id],
        metadatas=[{
            "doc_id": doc_id,
            "version": version,
            "created_at": datetime.now().isoformat()
        }]
    )
    
    return versioned_id

def get_latest_version(collection, doc_id):
    results = collection.get(
        where={"doc_id": doc_id},
        include=["documents", "metadatas"]
    )
    
    if not results['ids']:
        return None
    
    sorted_results = sorted(
        zip(results['metadatas'], results['documents']),
        key=lambda x: x[0]['version'],
        reverse=True
    )
    
    return sorted_results[0]

add_version(docs, "report", "第一版报告内容", "20240101")
add_version(docs, "report", "第二版报告内容", "20240102")
add_version(docs, "report", "第三版报告内容", "20240103")

latest = get_latest_version(docs, "report")
print(f"最新版本: {latest[0]['version']}")
print(f"内容: {latest[1]}")

示例 2:文档标签系统 #

python
import chromadb

client = chromadb.Client()
tagged_docs = client.create_collection(name="tagged_docs")

def add_tagged_document(collection, doc_id, content, tags):
    collection.add(
        documents=[content],
        ids=[doc_id],
        metadatas=[{
            "tags": ",".join(tags),
            "tag_count": len(tags)
        }]
    )

def get_by_tag(collection, tag):
    return collection.get(
        where={"tags": {"$contains": tag}},
        include=["documents", "metadatas"]
    )

def add_tags(collection, doc_id, new_tags):
    results = collection.get(ids=[doc_id], include=["metadatas"])
    
    if results['metadatas']:
        current_tags = results['metadatas'][0].get("tags", "").split(",")
        current_tags = [t for t in current_tags if t]
        
        updated_tags = list(set(current_tags + new_tags))
        
        collection.update(
            ids=[doc_id],
            metadatas=[{
                "tags": ",".join(updated_tags),
                "tag_count": len(updated_tags)
            }]
        )

add_tagged_document(tagged_docs, "doc1", "Python 教程", ["python", "programming"])
add_tagged_document(tagged_docs, "doc2", "JavaScript 指南", ["javascript", "web"])

add_tags(tagged_docs, "doc1", ["tutorial", "beginner"])

python_docs = get_by_tag(tagged_docs, "python")
print(f"Python 相关文档: {len(python_docs['ids'])}")

示例 3:文档增量更新 #

python
import chromadb
import hashlib

client = chromadb.Client()
incremental_docs = client.create_collection(name="incremental_docs")

def compute_hash(content):
    return hashlib.md5(content.encode()).hexdigest()

def add_or_update(collection, doc_id, content, metadata=None):
    content_hash = compute_hash(content)
    
    existing = collection.get(ids=[doc_id], include=["metadatas"])
    
    if existing['ids']:
        old_hash = existing['metadatas'][0].get("content_hash")
        
        if old_hash == content_hash:
            print(f"文档 {doc_id} 内容未变化,跳过更新")
            return False
    
    full_metadata = metadata or {}
    full_metadata["content_hash"] = content_hash
    
    collection.upsert(
        ids=[doc_id],
        documents=[content],
        metadatas=[full_metadata]
    )
    
    print(f"文档 {doc_id} 已更新")
    return True

add_or_update(incremental_docs, "doc1", "初始内容")
add_or_update(incremental_docs, "doc1", "初始内容")
add_or_update(incremental_docs, "doc1", "更新后的内容")

性能优化 #

批量操作优化 #

python
def batch_add_optimized(collection, documents, ids, metadatas=None, batch_size=100):
    total = len(documents)
    
    for i in range(0, total, batch_size):
        end = min(i + batch_size, total)
        
        batch_docs = documents[i:end]
        batch_ids = ids[i:end]
        batch_metas = metadatas[i:end] if metadatas else None
        
        collection.add(
            documents=batch_docs,
            ids=batch_ids,
            metadatas=batch_metas
        )
        
        if end % 1000 == 0 or end == total:
            print(f"进度: {end}/{total}")

内存优化 #

python
def add_large_dataset(collection, data_generator, batch_size=100):
    batch_docs = []
    batch_ids = []
    batch_metas = []
    count = 0
    
    for doc_id, doc, meta in data_generator:
        batch_docs.append(doc)
        batch_ids.append(doc_id)
        batch_metas.append(meta)
        
        if len(batch_docs) >= batch_size:
            collection.add(
                documents=batch_docs,
                ids=batch_ids,
                metadatas=batch_metas
            )
            
            count += len(batch_docs)
            batch_docs = []
            batch_ids = []
            batch_metas = []
    
    if batch_docs:
        collection.add(
            documents=batch_docs,
            ids=batch_ids,
            metadatas=batch_metas
        )
        count += len(batch_docs)
    
    print(f"总共添加了 {count} 个文档")

下一步 #

现在你已经掌握了文档操作,接下来学习 向量嵌入,了解如何高效地处理向量嵌入!

最后更新:2026-04-04