快照与备份 #

本章介绍 Qdrant 的数据备份和恢复机制。

快照概述 #

text
快照功能:

┌─────────────────────────────────────────────────────────────┐
│                      快照架构                                │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  创建快照          存储快照          恢复快照                │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Collection  │  │  本地存储    │  │  新实例     │         │
│  │   数据      │  │  S3/OSS     │  │  原实例     │         │
│  │   配置      │  │  NFS        │  │  集群       │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                              │
└─────────────────────────────────────────────────────────────┘

创建快照 #

创建 Collection 快照 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

client = QdrantClient(":memory:")

client.create_collection(
    collection_name="backup_demo",
    vectors_config=VectorParams(size=4, distance=Distance.COSINE)
)

points = [
    PointStruct(id=i, vector=[i*0.1, i*0.1+0.1, i*0.1+0.2, i*0.1+0.3])
    for i in range(100)
]
client.upsert("backup_demo", points)

snapshot_info = client.create_snapshot(
    collection_name="backup_demo"
)

print(f"快照名称: {snapshot_info.name}")
print(f"创建时间: {snapshot_info.creation_time}")
print(f"大小: {snapshot_info.size} bytes")

创建完整快照 #

python
snapshot_info = client.create_full_snapshot()

print(f"完整快照: {snapshot_info.name}")

列出快照 #

python
snapshots = client.get_snapshots("backup_demo")

for snapshot in snapshots:
    print(f"名称: {snapshot.name}")
    print(f"大小: {snapshot.size} bytes")
    print(f"创建时间: {snapshot.creation_time}")
    print("---")

恢复快照 #

从快照恢复 Collection #

python
snapshot_name = "backup_demo-2024-01-01-12-00-00.snapshot"

client.recover_snapshot(
    collection_name="restored_collection",
    location="backup_demo-2024-01-01-12-00-00.snapshot"
)

print("快照恢复成功")

从 URL 恢复 #

python
snapshot_url = "https://storage.example.com/snapshots/backup.snapshot"

client.recover_snapshot(
    collection_name="restored_from_url",
    location=snapshot_url
)

print("从 URL 恢复成功")

从文件恢复 #

python
client.recover_snapshot(
    collection_name="restored_from_file",
    location="/path/to/snapshot.snapshot"
)

print("从文件恢复成功")

删除快照 #

删除 Collection 快照 #

python
client.delete_snapshot(
    collection_name="backup_demo",
    snapshot_name="old-snapshot.snapshot"
)

print("快照已删除")

删除完整快照 #

python
client.delete_full_snapshot(
    snapshot_name="old-full-snapshot.snapshot"
)

print("完整快照已删除")

快照存储配置 #

本地存储配置 #

yaml
storage:
  snapshots_path: ./snapshots
  
  snapshots_config:
    snapshots_storage: local

S3 存储配置 #

yaml
storage:
  snapshots_path: ./snapshots
  
  snapshots_config:
    snapshots_storage: s3
    s3_config:
      bucket: qdrant-snapshots
      region: us-east-1
      access_key: ${AWS_ACCESS_KEY}
      secret_key: ${AWS_SECRET_KEY}

自定义存储配置 #

python
from qdrant_client.models import SnapshotDescription

client.create_snapshot(
    collection_name="backup_demo",
    wait=True
)

备份策略 #

定期备份脚本 #

python
import time
from datetime import datetime
from qdrant_client import QdrantClient

client = QdrantClient(url="http://localhost:6333")

def backup_collection(collection_name, max_backups=7):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    snapshot_info = client.create_snapshot(
        collection_name=collection_name,
        wait=True
    )
    
    print(f"创建快照: {snapshot_info.name}")
    
    snapshots = client.get_snapshots(collection_name)
    
    if len(snapshots) > max_backups:
        snapshots_to_delete = sorted(snapshots, key=lambda x: x.creation_time)[:-max_backups]
        
        for snapshot in snapshots_to_delete:
            client.delete_snapshot(
                collection_name=collection_name,
                snapshot_name=snapshot.name
            )
            print(f"删除旧快照: {snapshot.name}")

backup_collection("backup_demo")

全量备份脚本 #

python
def full_backup(max_backups=7):
    collections = client.get_collections()
    
    for collection in collections.collections:
        print(f"备份 Collection: {collection.name}")
        backup_collection(collection.name, max_backups)
    
    full_snapshot = client.create_full_snapshot(wait=True)
    print(f"创建完整快照: {full_snapshot.name}")

full_backup()

增量备份策略 #

python
import hashlib

def incremental_backup(collection_name, last_backup_time):
    points = []
    offset = None
    
    while True:
        result = client.scroll(
            collection_name=collection_name,
            limit=100,
            offset=offset,
            with_payload=True,
            with_vectors=True
        )
        
        batch, next_offset = result
        
        for point in batch:
            if point.payload.get("updated_at", 0) > last_backup_time:
                points.append(point)
        
        if next_offset is None:
            break
        offset = next_offset
    
    return points

last_backup = 1704067200
changed_points = incremental_backup("backup_demo", last_backup)
print(f"变更点数: {len(changed_points)}")

数据迁移 #

跨实例迁移 #

python
def migrate_collection(source_client, target_client, collection_name):
    print(f"开始迁移: {collection_name}")
    
    source_info = source_client.get_collection(collection_name)
    
    target_client.create_collection(
        collection_name=collection_name,
        vectors_config=source_info.config.params.vectors
    )
    
    offset = None
    total_migrated = 0
    
    while True:
        result = source_client.scroll(
            collection_name=collection_name,
            limit=100,
            offset=offset,
            with_payload=True,
            with_vectors=True
        )
        
        points, next_offset = result
        
        if points:
            target_client.upsert(
                collection_name=collection_name,
                points=points
            )
            total_migrated += len(points)
            print(f"已迁移: {total_migrated}")
        
        if next_offset is None:
            break
        offset = next_offset
    
    print(f"迁移完成: {total_migrated} 个点")

source = QdrantClient(url="http://source:6333")
target = QdrantClient(url="http://target:6333")

migrate_collection(source, target, "backup_demo")

快照迁移 #

python
def migrate_via_snapshot(source_client, target_client, collection_name):
    snapshot_info = source_client.create_snapshot(
        collection_name=collection_name,
        wait=True
    )
    
    snapshot_data = source_client.download_snapshot(
        collection_name=collection_name,
        snapshot_name=snapshot_info.name
    )
    
    with open(f"{snapshot_info.name}", "wb") as f:
        f.write(snapshot_data)
    
    target_client.recover_snapshot(
        collection_name=collection_name,
        location=snapshot_info.name
    )
    
    print("快照迁移完成")

migrate_via_snapshot(source, target, "backup_demo")

灾难恢复 #

恢复流程 #

python
def disaster_recovery(backup_path, target_client):
    import os
    
    snapshot_files = [f for f in os.listdir(backup_path) if f.endswith('.snapshot')]
    
    for snapshot_file in snapshot_files:
        collection_name = snapshot_file.split('-')[0]
        
        target_client.recover_snapshot(
            collection_name=collection_name,
            location=os.path.join(backup_path, snapshot_file)
        )
        
        print(f"恢复 Collection: {collection_name}")

disaster_recovery("/backups", target)

验证恢复 #

python
def verify_recovery(source_client, target_client, collection_name):
    source_info = source_client.get_collection(collection_name)
    target_info = target_client.get_collection(collection_name)
    
    if source_info.points_count != target_info.points_count:
        print(f"警告: 点数不匹配 (源: {source_info.points_count}, 目标: {target_info.points_count})")
        return False
    
    source_sample = source_client.scroll(
        collection_name=collection_name,
        limit=10,
        with_vectors=True
    )[0]
    
    target_sample = target_client.scroll(
        collection_name=collection_name,
        limit=10,
        with_vectors=True
    )[0]
    
    for s_point, t_point in zip(source_sample, target_sample):
        if s_point.vector != t_point.vector:
            print(f"警告: 向量不匹配 (ID: {s_point.id})")
            return False
    
    print("恢复验证通过")
    return True

verify_recovery(source, target, "backup_demo")

监控和告警 #

备份状态监控 #

python
def monitor_backups(collection_name, max_age_hours=24):
    snapshots = client.get_snapshots(collection_name)
    
    if not snapshots:
        return {"status": "warning", "message": "没有快照"}
    
    latest = max(snapshots, key=lambda x: x.creation_time)
    
    age_hours = (time.time() - latest.creation_time) / 3600
    
    if age_hours > max_age_hours:
        return {"status": "warning", "message": f"快照过期 ({age_hours:.1f} 小时)"}
    
    return {"status": "ok", "message": f"最新快照: {latest.name}"}

status = monitor_backups("backup_demo")
print(status)

存储空间监控 #

python
def monitor_storage():
    import shutil
    
    snapshots_path = "./snapshots"
    
    total, used, free = shutil.disk_usage(snapshots_path)
    
    usage_percent = (used / total) * 100
    
    if usage_percent > 90:
        return {"status": "critical", "usage": usage_percent}
    elif usage_percent > 80:
        return {"status": "warning", "usage": usage_percent}
    else:
        return {"status": "ok", "usage": usage_percent}

storage_status = monitor_storage()
print(f"存储使用率: {storage_status['usage']:.1f}%")

最佳实践 #

备份策略建议 #

text
备份策略建议:

开发环境:
├── 每日备份
├── 保留 3 天
└── 本地存储

测试环境:
├── 每日备份
├── 保留 7 天
└── 本地存储

生产环境:
├── 每小时增量备份
├── 每日全量备份
├── 保留 30 天
└── 异地存储(S3/OSS)

快照命名规范 #

python
def generate_snapshot_name(collection_name):
    from datetime import datetime
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"{collection_name}_{timestamp}.snapshot"

name = generate_snapshot_name("backup_demo")
print(name)

自动化备份脚本 #

python
import schedule
import time

def scheduled_backup():
    collections = client.get_collections()
    
    for collection in collections.collections:
        try:
            backup_collection(collection.name)
            print(f"备份完成: {collection.name}")
        except Exception as e:
            print(f"备份失败 {collection.name}: {e}")

schedule.every().day.at("02:00").do(scheduled_backup)

while True:
    schedule.run_pending()
    time.sleep(60)

小结 #

本章详细介绍了快照与备份:

  • 创建和管理快照
  • 快照恢复方法
  • 备份策略设计
  • 数据迁移
  • 灾难恢复

下一步 #

掌握备份策略后,继续学习 监控与运维,了解系统监控和运维管理!

最后更新:2026-04-04