文档操作 #
文档结构 #
在 Chroma 中,每个文档由以下部分组成:
text
┌─────────────────────────────────────────────────────────────┐
│ 文档结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ id: 唯一标识符 (必需) │
│ ├── 类型: str │
│ └── 用途: 标识和引用文档 │
│ │
│ document: 文本内容 (可选) │
│ ├── 类型: str │
│ └── 用途: 存储原始文本,自动生成嵌入 │
│ │
│ embedding: 向量嵌入 (可选) │
│ ├── 类型: List[float] │
│ └── 用途: 预计算的向量,跳过嵌入生成 │
│ │
│ metadata: 元数据 (可选) │
│ ├── 类型: Dict[str, str/int/float/bool] │
│ └── 用途: 过滤、分类、存储额外信息 │
│ │
└─────────────────────────────────────────────────────────────┘
添加文档 #
基本添加 #
python
import chromadb
client = chromadb.Client()
collection = client.create_collection(name="documents")
collection.add(
documents=["这是第一个文档", "这是第二个文档"],
ids=["doc1", "doc2"]
)
print(f"添加了 {collection.count()} 个文档")
带元数据添加 #
python
collection.add(
documents=[
"Python 是编程语言",
"JavaScript 用于网页开发"
],
metadatas=[
{"category": "programming", "language": "Python"},
{"category": "programming", "language": "JavaScript"}
],
ids=["doc3", "doc4"]
)
带嵌入添加 #
python
collection.add(
embeddings=[
[0.1, 0.2, 0.3, 0.4],
[0.5, 0.6, 0.7, 0.8]
],
documents=["预计算嵌入的文档1", "预计算嵌入的文档2"],
ids=["doc5", "doc6"]
)
批量添加 #
python
documents = [f"文档内容 {i}" for i in range(1000)]
ids = [f"doc{i}" for i in range(1000)]
metadatas = [{"batch": i // 100} for i in range(1000)]
batch_size = 100
for i in range(0, len(documents), batch_size):
collection.add(
documents=documents[i:i+batch_size],
ids=ids[i:i+batch_size],
metadatas=metadatas[i:i+batch_size]
)
if (i + batch_size) % 500 == 0:
print(f"已添加 {i + batch_size} 个文档")
print(f"总共添加了 {collection.count()} 个文档")
添加最佳实践 #
python
def add_documents_safe(collection, documents, ids, metadatas=None, batch_size=100):
existing_ids = set(collection.get()['ids'])
new_docs = []
new_ids = []
new_metas = []
for i, doc_id in enumerate(ids):
if doc_id not in existing_ids:
new_docs.append(documents[i])
new_ids.append(doc_id)
if metadatas:
new_metas.append(metadatas[i])
if not new_docs:
print("没有新文档需要添加")
return
for i in range(0, len(new_docs), batch_size):
batch_docs = new_docs[i:i+batch_size]
batch_ids = new_ids[i:i+batch_size]
batch_metas = new_metas[i:i+batch_size] if new_metas else None
collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas if batch_metas else None
)
print(f"添加了 {len(new_docs)} 个新文档")
获取文档 #
获取所有文档 #
python
results = collection.get()
print(f"文档数量: {len(results['ids'])}")
for doc_id, doc in zip(results['ids'], results['documents']):
print(f"{doc_id}: {doc}")
按 ID 获取 #
python
results = collection.get(
ids=["doc1", "doc2"]
)
for doc_id, doc in zip(results['ids'], results['documents']):
print(f"{doc_id}: {doc}")
获取特定字段 #
python
results = collection.get(
ids=["doc1", "doc2"],
include=["documents", "metadatas"]
)
print(results['documents'])
print(results['metadatas'])
包含嵌入 #
python
results = collection.get(
ids=["doc1"],
include=["documents", "embeddings", "metadatas"]
)
print(f"嵌入维度: {len(results['embeddings'][0])}")
按条件获取 #
python
results = collection.get(
where={"category": "programming"},
include=["documents", "metadatas"]
)
for doc, meta in zip(results['documents'], results['metadatas']):
print(f"{meta['language']}: {doc}")
更新文档 #
更新文档内容 #
python
collection.update(
ids=["doc1"],
documents=["更新后的文档内容"]
)
results = collection.get(ids=["doc1"])
print(results['documents'][0])
更新元数据 #
python
collection.update(
ids=["doc1"],
metadatas=[{"category": "updated", "version": "2.0"}]
)
results = collection.get(ids=["doc1"])
print(results['metadatas'][0])
更新嵌入 #
python
collection.update(
ids=["doc1"],
embeddings=[[0.1, 0.2, 0.3, 0.4, 0.5]]
)
批量更新 #
python
ids = ["doc1", "doc2", "doc3"]
documents = ["更新文档1", "更新文档2", "更新文档3"]
metadatas = [
{"updated": True, "version": "2.0"},
{"updated": True, "version": "2.0"},
{"updated": True, "version": "2.0"}
]
collection.update(
ids=ids,
documents=documents,
metadatas=metadatas
)
Upsert 操作 #
python
collection.upsert(
ids=["doc1", "doc_new"],
documents=["更新文档1", "新文档"],
metadatas=[
{"type": "update"},
{"type": "new"}
]
)
print("Upsert 完成")
更新最佳实践 #
python
def update_document_safe(collection, doc_id, document=None, metadata=None):
try:
existing = collection.get(ids=[doc_id])
if not existing['ids']:
print(f"文档 {doc_id} 不存在,将创建新文档")
collection.add(
ids=[doc_id],
documents=[document] if document else None,
metadatas=[metadata] if metadata else None
)
else:
update_data = {"ids": [doc_id]}
if document:
update_data["documents"] = [document]
if metadata:
existing_meta = existing['metadatas'][0] or {}
existing_meta.update(metadata)
update_data["metadatas"] = [existing_meta]
collection.update(**update_data)
print(f"文档 {doc_id} 更新成功")
except Exception as e:
print(f"更新失败: {e}")
删除文档 #
按 ID 删除 #
python
collection.delete(ids=["doc1"])
print(f"剩余文档数: {collection.count()}")
批量删除 #
python
ids_to_delete = ["doc2", "doc3", "doc4"]
collection.delete(ids=ids_to_delete)
print(f"删除了 {len(ids_to_delete)} 个文档")
按条件删除 #
python
collection.delete(
where={"category": "old"}
)
print("已删除所有 category 为 old 的文档")
删除所有文档 #
python
all_ids = collection.get()['ids']
if all_ids:
collection.delete(ids=all_ids)
print(f"删除了 {len(all_ids)} 个文档")
else:
print("集合为空")
删除最佳实践 #
python
def delete_documents_safe(collection, ids):
existing_ids = set(collection.get()['ids'])
ids_to_delete = [id for id in ids if id in existing_ids]
if not ids_to_delete:
print("没有文档需要删除")
return
collection.delete(ids=ids_to_delete)
print(f"删除了 {len(ids_to_delete)} 个文档")
def delete_by_metadata(collection, metadata_filter):
results = collection.get(
where=metadata_filter,
include=[]
)
if results['ids']:
collection.delete(ids=results['ids'])
print(f"删除了 {len(results['ids'])} 个匹配的文档")
else:
print("没有匹配的文档")
文档操作示例 #
示例 1:文档版本管理 #
python
import chromadb
from datetime import datetime
client = chromadb.Client()
docs = client.create_collection(name="versioned_docs")
def add_version(collection, doc_id, content, version=None):
if version is None:
version = datetime.now().strftime("%Y%m%d_%H%M%S")
versioned_id = f"{doc_id}_v{version}"
collection.add(
documents=[content],
ids=[versioned_id],
metadatas=[{
"doc_id": doc_id,
"version": version,
"created_at": datetime.now().isoformat()
}]
)
return versioned_id
def get_latest_version(collection, doc_id):
results = collection.get(
where={"doc_id": doc_id},
include=["documents", "metadatas"]
)
if not results['ids']:
return None
sorted_results = sorted(
zip(results['metadatas'], results['documents']),
key=lambda x: x[0]['version'],
reverse=True
)
return sorted_results[0]
add_version(docs, "report", "第一版报告内容", "20240101")
add_version(docs, "report", "第二版报告内容", "20240102")
add_version(docs, "report", "第三版报告内容", "20240103")
latest = get_latest_version(docs, "report")
print(f"最新版本: {latest[0]['version']}")
print(f"内容: {latest[1]}")
示例 2:文档标签系统 #
python
import chromadb
client = chromadb.Client()
tagged_docs = client.create_collection(name="tagged_docs")
def add_tagged_document(collection, doc_id, content, tags):
collection.add(
documents=[content],
ids=[doc_id],
metadatas=[{
"tags": ",".join(tags),
"tag_count": len(tags)
}]
)
def get_by_tag(collection, tag):
return collection.get(
where={"tags": {"$contains": tag}},
include=["documents", "metadatas"]
)
def add_tags(collection, doc_id, new_tags):
results = collection.get(ids=[doc_id], include=["metadatas"])
if results['metadatas']:
current_tags = results['metadatas'][0].get("tags", "").split(",")
current_tags = [t for t in current_tags if t]
updated_tags = list(set(current_tags + new_tags))
collection.update(
ids=[doc_id],
metadatas=[{
"tags": ",".join(updated_tags),
"tag_count": len(updated_tags)
}]
)
add_tagged_document(tagged_docs, "doc1", "Python 教程", ["python", "programming"])
add_tagged_document(tagged_docs, "doc2", "JavaScript 指南", ["javascript", "web"])
add_tags(tagged_docs, "doc1", ["tutorial", "beginner"])
python_docs = get_by_tag(tagged_docs, "python")
print(f"Python 相关文档: {len(python_docs['ids'])}")
示例 3:文档增量更新 #
python
import chromadb
import hashlib
client = chromadb.Client()
incremental_docs = client.create_collection(name="incremental_docs")
def compute_hash(content):
return hashlib.md5(content.encode()).hexdigest()
def add_or_update(collection, doc_id, content, metadata=None):
content_hash = compute_hash(content)
existing = collection.get(ids=[doc_id], include=["metadatas"])
if existing['ids']:
old_hash = existing['metadatas'][0].get("content_hash")
if old_hash == content_hash:
print(f"文档 {doc_id} 内容未变化,跳过更新")
return False
full_metadata = metadata or {}
full_metadata["content_hash"] = content_hash
collection.upsert(
ids=[doc_id],
documents=[content],
metadatas=[full_metadata]
)
print(f"文档 {doc_id} 已更新")
return True
add_or_update(incremental_docs, "doc1", "初始内容")
add_or_update(incremental_docs, "doc1", "初始内容")
add_or_update(incremental_docs, "doc1", "更新后的内容")
性能优化 #
批量操作优化 #
python
def batch_add_optimized(collection, documents, ids, metadatas=None, batch_size=100):
total = len(documents)
for i in range(0, total, batch_size):
end = min(i + batch_size, total)
batch_docs = documents[i:end]
batch_ids = ids[i:end]
batch_metas = metadatas[i:end] if metadatas else None
collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas
)
if end % 1000 == 0 or end == total:
print(f"进度: {end}/{total}")
内存优化 #
python
def add_large_dataset(collection, data_generator, batch_size=100):
batch_docs = []
batch_ids = []
batch_metas = []
count = 0
for doc_id, doc, meta in data_generator:
batch_docs.append(doc)
batch_ids.append(doc_id)
batch_metas.append(meta)
if len(batch_docs) >= batch_size:
collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas
)
count += len(batch_docs)
batch_docs = []
batch_ids = []
batch_metas = []
if batch_docs:
collection.add(
documents=batch_docs,
ids=batch_ids,
metadatas=batch_metas
)
count += len(batch_docs)
print(f"总共添加了 {count} 个文档")
下一步 #
现在你已经掌握了文档操作,接下来学习 向量嵌入,了解如何高效地处理向量嵌入!
最后更新:2026-04-04