快照与备份 #
本章介绍 Qdrant 的数据备份和恢复机制。
快照概述 #
text
快照功能:
┌─────────────────────────────────────────────────────────────┐
│ 快照架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 创建快照 存储快照 恢复快照 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Collection │ │ 本地存储 │ │ 新实例 │ │
│ │ 数据 │ │ S3/OSS │ │ 原实例 │ │
│ │ 配置 │ │ NFS │ │ 集群 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
创建快照 #
创建 Collection 快照 #
python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
client = QdrantClient(":memory:")
client.create_collection(
collection_name="backup_demo",
vectors_config=VectorParams(size=4, distance=Distance.COSINE)
)
points = [
PointStruct(id=i, vector=[i*0.1, i*0.1+0.1, i*0.1+0.2, i*0.1+0.3])
for i in range(100)
]
client.upsert("backup_demo", points)
snapshot_info = client.create_snapshot(
collection_name="backup_demo"
)
print(f"快照名称: {snapshot_info.name}")
print(f"创建时间: {snapshot_info.creation_time}")
print(f"大小: {snapshot_info.size} bytes")
创建完整快照 #
python
snapshot_info = client.create_full_snapshot()
print(f"完整快照: {snapshot_info.name}")
列出快照 #
python
snapshots = client.get_snapshots("backup_demo")
for snapshot in snapshots:
print(f"名称: {snapshot.name}")
print(f"大小: {snapshot.size} bytes")
print(f"创建时间: {snapshot.creation_time}")
print("---")
恢复快照 #
从快照恢复 Collection #
python
snapshot_name = "backup_demo-2024-01-01-12-00-00.snapshot"
client.recover_snapshot(
collection_name="restored_collection",
location="backup_demo-2024-01-01-12-00-00.snapshot"
)
print("快照恢复成功")
从 URL 恢复 #
python
snapshot_url = "https://storage.example.com/snapshots/backup.snapshot"
client.recover_snapshot(
collection_name="restored_from_url",
location=snapshot_url
)
print("从 URL 恢复成功")
从文件恢复 #
python
client.recover_snapshot(
collection_name="restored_from_file",
location="/path/to/snapshot.snapshot"
)
print("从文件恢复成功")
删除快照 #
删除 Collection 快照 #
python
client.delete_snapshot(
collection_name="backup_demo",
snapshot_name="old-snapshot.snapshot"
)
print("快照已删除")
删除完整快照 #
python
client.delete_full_snapshot(
snapshot_name="old-full-snapshot.snapshot"
)
print("完整快照已删除")
快照存储配置 #
本地存储配置 #
yaml
storage:
snapshots_path: ./snapshots
snapshots_config:
snapshots_storage: local
S3 存储配置 #
yaml
storage:
snapshots_path: ./snapshots
snapshots_config:
snapshots_storage: s3
s3_config:
bucket: qdrant-snapshots
region: us-east-1
access_key: ${AWS_ACCESS_KEY}
secret_key: ${AWS_SECRET_KEY}
自定义存储配置 #
python
from qdrant_client.models import SnapshotDescription
client.create_snapshot(
collection_name="backup_demo",
wait=True
)
备份策略 #
定期备份脚本 #
python
import time
from datetime import datetime
from qdrant_client import QdrantClient
client = QdrantClient(url="http://localhost:6333")
def backup_collection(collection_name, max_backups=7):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_info = client.create_snapshot(
collection_name=collection_name,
wait=True
)
print(f"创建快照: {snapshot_info.name}")
snapshots = client.get_snapshots(collection_name)
if len(snapshots) > max_backups:
snapshots_to_delete = sorted(snapshots, key=lambda x: x.creation_time)[:-max_backups]
for snapshot in snapshots_to_delete:
client.delete_snapshot(
collection_name=collection_name,
snapshot_name=snapshot.name
)
print(f"删除旧快照: {snapshot.name}")
backup_collection("backup_demo")
全量备份脚本 #
python
def full_backup(max_backups=7):
collections = client.get_collections()
for collection in collections.collections:
print(f"备份 Collection: {collection.name}")
backup_collection(collection.name, max_backups)
full_snapshot = client.create_full_snapshot(wait=True)
print(f"创建完整快照: {full_snapshot.name}")
full_backup()
增量备份策略 #
python
import hashlib
def incremental_backup(collection_name, last_backup_time):
points = []
offset = None
while True:
result = client.scroll(
collection_name=collection_name,
limit=100,
offset=offset,
with_payload=True,
with_vectors=True
)
batch, next_offset = result
for point in batch:
if point.payload.get("updated_at", 0) > last_backup_time:
points.append(point)
if next_offset is None:
break
offset = next_offset
return points
last_backup = 1704067200
changed_points = incremental_backup("backup_demo", last_backup)
print(f"变更点数: {len(changed_points)}")
数据迁移 #
跨实例迁移 #
python
def migrate_collection(source_client, target_client, collection_name):
print(f"开始迁移: {collection_name}")
source_info = source_client.get_collection(collection_name)
target_client.create_collection(
collection_name=collection_name,
vectors_config=source_info.config.params.vectors
)
offset = None
total_migrated = 0
while True:
result = source_client.scroll(
collection_name=collection_name,
limit=100,
offset=offset,
with_payload=True,
with_vectors=True
)
points, next_offset = result
if points:
target_client.upsert(
collection_name=collection_name,
points=points
)
total_migrated += len(points)
print(f"已迁移: {total_migrated}")
if next_offset is None:
break
offset = next_offset
print(f"迁移完成: {total_migrated} 个点")
source = QdrantClient(url="http://source:6333")
target = QdrantClient(url="http://target:6333")
migrate_collection(source, target, "backup_demo")
快照迁移 #
python
def migrate_via_snapshot(source_client, target_client, collection_name):
snapshot_info = source_client.create_snapshot(
collection_name=collection_name,
wait=True
)
snapshot_data = source_client.download_snapshot(
collection_name=collection_name,
snapshot_name=snapshot_info.name
)
with open(f"{snapshot_info.name}", "wb") as f:
f.write(snapshot_data)
target_client.recover_snapshot(
collection_name=collection_name,
location=snapshot_info.name
)
print("快照迁移完成")
migrate_via_snapshot(source, target, "backup_demo")
灾难恢复 #
恢复流程 #
python
def disaster_recovery(backup_path, target_client):
import os
snapshot_files = [f for f in os.listdir(backup_path) if f.endswith('.snapshot')]
for snapshot_file in snapshot_files:
collection_name = snapshot_file.split('-')[0]
target_client.recover_snapshot(
collection_name=collection_name,
location=os.path.join(backup_path, snapshot_file)
)
print(f"恢复 Collection: {collection_name}")
disaster_recovery("/backups", target)
验证恢复 #
python
def verify_recovery(source_client, target_client, collection_name):
source_info = source_client.get_collection(collection_name)
target_info = target_client.get_collection(collection_name)
if source_info.points_count != target_info.points_count:
print(f"警告: 点数不匹配 (源: {source_info.points_count}, 目标: {target_info.points_count})")
return False
source_sample = source_client.scroll(
collection_name=collection_name,
limit=10,
with_vectors=True
)[0]
target_sample = target_client.scroll(
collection_name=collection_name,
limit=10,
with_vectors=True
)[0]
for s_point, t_point in zip(source_sample, target_sample):
if s_point.vector != t_point.vector:
print(f"警告: 向量不匹配 (ID: {s_point.id})")
return False
print("恢复验证通过")
return True
verify_recovery(source, target, "backup_demo")
监控和告警 #
备份状态监控 #
python
def monitor_backups(collection_name, max_age_hours=24):
snapshots = client.get_snapshots(collection_name)
if not snapshots:
return {"status": "warning", "message": "没有快照"}
latest = max(snapshots, key=lambda x: x.creation_time)
age_hours = (time.time() - latest.creation_time) / 3600
if age_hours > max_age_hours:
return {"status": "warning", "message": f"快照过期 ({age_hours:.1f} 小时)"}
return {"status": "ok", "message": f"最新快照: {latest.name}"}
status = monitor_backups("backup_demo")
print(status)
存储空间监控 #
python
def monitor_storage():
import shutil
snapshots_path = "./snapshots"
total, used, free = shutil.disk_usage(snapshots_path)
usage_percent = (used / total) * 100
if usage_percent > 90:
return {"status": "critical", "usage": usage_percent}
elif usage_percent > 80:
return {"status": "warning", "usage": usage_percent}
else:
return {"status": "ok", "usage": usage_percent}
storage_status = monitor_storage()
print(f"存储使用率: {storage_status['usage']:.1f}%")
最佳实践 #
备份策略建议 #
text
备份策略建议:
开发环境:
├── 每日备份
├── 保留 3 天
└── 本地存储
测试环境:
├── 每日备份
├── 保留 7 天
└── 本地存储
生产环境:
├── 每小时增量备份
├── 每日全量备份
├── 保留 30 天
└── 异地存储(S3/OSS)
快照命名规范 #
python
def generate_snapshot_name(collection_name):
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{collection_name}_{timestamp}.snapshot"
name = generate_snapshot_name("backup_demo")
print(name)
自动化备份脚本 #
python
import schedule
import time
def scheduled_backup():
collections = client.get_collections()
for collection in collections.collections:
try:
backup_collection(collection.name)
print(f"备份完成: {collection.name}")
except Exception as e:
print(f"备份失败 {collection.name}: {e}")
schedule.every().day.at("02:00").do(scheduled_backup)
while True:
schedule.run_pending()
time.sleep(60)
小结 #
本章详细介绍了快照与备份:
- 创建和管理快照
- 快照恢复方法
- 备份策略设计
- 数据迁移
- 灾难恢复
下一步 #
掌握备份策略后,继续学习 监控与运维,了解系统监控和运维管理!
最后更新:2026-04-04