监控与运维 #

本章介绍 Qdrant 的监控和运维管理。

监控概述 #

text
Qdrant 监控架构:

┌─────────────────────────────────────────────────────────────┐
│                      监控体系                                │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  数据采集              数据存储              可视化告警       │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐ │
│  │ REST API    │      │ Prometheus  │      │ Grafana     │ │
│  │ Metrics     │  →   │ InfluxDB    │  →   │ Alert       │ │
│  │ Logs        │      │ Elasticsearch│      │ Dashboard   │ │
│  └─────────────┘      └─────────────┘      └─────────────┘ │
│                                                              │
└─────────────────────────────────────────────────────────────┘

健康检查 #

基础健康检查 #

python
from qdrant_client import QdrantClient

client = QdrantClient(url="http://localhost:6333")

def health_check():
    try:
        response = client.http.client.get("/health")
        
        if response.status_code == 200:
            return {"status": "healthy", "message": "服务正常"}
        else:
            return {"status": "unhealthy", "message": f"状态码: {response.status_code}"}
    
    except Exception as e:
        return {"status": "error", "message": str(e)}

health = health_check()
print(health)

详细健康检查 #

python
def detailed_health_check():
    checks = {}
    
    try:
        collections = client.get_collections()
        checks["collections"] = {
            "status": "ok",
            "count": len(collections.collections)
        }
    except Exception as e:
        checks["collections"] = {"status": "error", "message": str(e)}
    
    try:
        cluster_info = client.get_cluster_info()
        checks["cluster"] = {
            "status": "ok" if cluster_info.status == "enabled" else "standalone",
            "peers": cluster_info.number_of_peers
        }
    except Exception as e:
        checks["cluster"] = {"status": "error", "message": str(e)}
    
    for col in collections.collections:
        info = client.get_collection(col.name)
        
        if info.status == "green":
            status = "ok"
        elif info.status == "yellow":
            status = "warning"
        else:
            status = "error"
        
        checks[f"collection_{col.name}"] = {
            "status": status,
            "points": info.points_count,
            "indexed": info.indexed_vectors_count
        }
    
    return checks

health_details = detailed_health_check()
for check, result in health_details.items():
    print(f"{check}: {result}")

就绪检查 #

python
def readiness_check():
    try:
        response = client.http.client.get("/ready")
        
        if response.status_code == 200:
            return {"ready": True}
        else:
            return {"ready": False, "reason": f"状态码: {response.status_code}"}
    
    except Exception as e:
        return {"ready": False, "reason": str(e)}

ready = readiness_check()
print(f"服务就绪: {ready}")

监控指标 #

Prometheus 指标 #

Qdrant 内置 Prometheus 指标端点:

text
访问指标端点:
http://localhost:6333/metrics

关键指标 #

text
关键监控指标:

应用指标:
├── qdrant_collections_total
├── qdrant_points_total
├── qdrant_requests_total
├── qdrant_request_duration_seconds
└── qdrant_grpc_requests_total

存储指标:
├── qdrant_storage_size_bytes
├── qdrant_segments_total
└── qdrant_index_size_bytes

内存指标:
├── qdrant_memory_used_bytes
└── qdrant_memory_available_bytes

集群指标:
├── qdrant_cluster_peers_total
├── qdrant_cluster_shards_total
└── qdrant_cluster_status

Prometheus 配置 #

yaml
scrape_configs:
  - job_name: 'qdrant'
    static_configs:
      - targets: ['qdrant:6333']
    metrics_path: '/metrics'
    scrape_interval: 15s

Grafana Dashboard #

json
{
  "dashboard": {
    "title": "Qdrant Monitoring",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(qdrant_requests_total[5m])"
          }
        ]
      },
      {
        "title": "Request Latency",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(qdrant_request_duration_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "Points Count",
        "targets": [
          {
            "expr": "qdrant_points_total"
          }
        ]
      }
    ]
  }
}

Collection 监控 #

Collection 状态监控 #

python
def monitor_collection(collection_name):
    info = client.get_collection(collection_name)
    
    metrics = {
        "name": collection_name,
        "status": info.status,
        "points_count": info.points_count,
        "indexed_vectors_count": info.indexed_vectors_count,
        "segments_count": info.segments_count,
        "optimizer_status": str(info.optimizer_status)
    }
    
    if hasattr(info, 'config'):
        metrics["vector_size"] = info.config.params.vectors.size
        metrics["distance"] = str(info.config.params.vectors.distance)
    
    return metrics

metrics = monitor_collection("my_collection")
for key, value in metrics.items():
    print(f"{key}: {value}")

索引进度监控 #

python
import time

def monitor_indexing_progress(collection_name, interval=5):
    while True:
        info = client.get_collection(collection_name)
        
        total = info.points_count
        indexed = info.indexed_vectors_count
        
        if total > 0:
            progress = (indexed / total) * 100
            print(f"索引进度: {progress:.1f}% ({indexed}/{total})")
        
        if info.status == "green":
            print("索引完成")
            break
        
        time.sleep(interval)

monitor_indexing_progress("my_collection")

段信息监控 #

python
def monitor_segments(collection_name):
    info = client.get_collection(collection_name)
    
    print(f"段数量: {info.segments_count}")
    
    if hasattr(info, 'segments'):
        for segment in info.segments:
            print(f"  段 {segment.segment_id}:")
            print(f"    向量数: {segment.points_count}")
            print(f"    状态: {segment.status}")

性能监控 #

查询延迟监控 #

python
import time
import statistics

def measure_query_latency(collection_name, query_vectors, iterations=100):
    latencies = []
    
    for query_vector in query_vectors[:iterations]:
        start = time.time()
        
        client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=10
        )
        
        latency_ms = (time.time() - start) * 1000
        latencies.append(latency_ms)
    
    return {
        "mean_ms": statistics.mean(latencies),
        "median_ms": statistics.median(latencies),
        "p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
        "p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],
        "max_ms": max(latencies),
        "min_ms": min(latencies)
    }

latency_stats = measure_query_latency("my_collection", [[0.1] * 384] * 100)
print(f"P99 延迟: {latency_stats['p99_ms']:.2f}ms")

吞吐量监控 #

python
from concurrent.futures import ThreadPoolExecutor
import time

def measure_throughput(collection_name, query_vectors, duration_sec=10, workers=8):
    start_time = time.time()
    query_count = [0]
    
    def worker():
        while time.time() - start_time < duration_sec:
            for qv in query_vectors:
                client.search(
                    collection_name=collection_name,
                    query_vector=qv,
                    limit=10
                )
                query_count[0] += 1
                if time.time() - start_time >= duration_sec:
                    break
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        list(executor.map(lambda _: worker(), range(workers)))
    
    qps = query_count[0] / duration_sec
    
    return {
        "total_queries": query_count[0],
        "duration_sec": duration_sec,
        "qps": qps
    }

throughput = measure_throughput("my_collection", [[0.1] * 384] * 10)
print(f"QPS: {throughput['qps']:.0f}")

内存使用监控 #

python
import psutil
import os

def monitor_memory():
    process = psutil.Process(os.getpid())
    
    memory_info = process.memory_info()
    
    return {
        "rss_mb": memory_info.rss / (1024 * 1024),
        "vms_mb": memory_info.vms / (1024 * 1024),
        "percent": process.memory_percent()
    }

memory_stats = monitor_memory()
print(f"内存使用: {memory_stats['rss_mb']:.1f} MB ({memory_stats['percent']:.1f}%)")

日志管理 #

日志级别配置 #

yaml
log_level: INFO

logger:
  level: INFO
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

结构化日志 #

python
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        return json.dumps(log_entry)

logger = logging.getLogger("qdrant_monitor")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.info("搜索请求完成", extra={"collection": "my_collection", "latency_ms": 5.2})

告警配置 #

告警规则 #

yaml
groups:
  - name: qdrant_alerts
    rules:
      - alert: QdrantHighLatency
        expr: histogram_quantile(0.99, rate(qdrant_request_duration_seconds_bucket[5m])) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Qdrant 查询延迟过高"
          description: "P99 延迟超过 100ms"
      
      - alert: QdrantHighMemory
        expr: qdrant_memory_used_bytes / qdrant_memory_available_bytes > 0.9
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Qdrant 内存使用过高"
          description: "内存使用率超过 90%"
      
      - alert: QdrantCollectionRed
        expr: qdrant_collection_status == 2
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Collection 状态异常"
          description: "Collection 处于红色状态"

告警通知 #

yaml
alertmanager:
  receivers:
    - name: 'team-alerts'
      slack_configs:
        - api_url: 'https://hooks.slack.com/services/xxx'
          channel: '#alerts'
      email_configs:
        - to: 'team@example.com'
          from: 'alertmanager@example.com'
          smarthost: 'smtp.example.com:587'

运维操作 #

Collection 维护 #

python
def maintain_collection(collection_name):
    info = client.get_collection(collection_name)
    
    print(f"Collection: {collection_name}")
    print(f"状态: {info.status}")
    print(f"点数: {info.points_count}")
    
    if info.status == "yellow":
        print("触发优化...")
        client.update_collection(
            collection_name=collection_name,
            optimizer_config=OptimizersConfigDiff(
                deleted_threshold=0.1
            )
        )
    
    deleted_ratio = getattr(info, 'deleted_count', 0) / max(info.points_count, 1)
    if deleted_ratio > 0.1:
        print(f"删除比例过高: {deleted_ratio:.1%}")

maintain_collection("my_collection")

清理操作 #

python
def cleanup_old_snapshots(collection_name, max_age_days=30):
    from datetime import datetime, timedelta
    
    snapshots = client.get_snapshots(collection_name)
    cutoff_time = datetime.now() - timedelta(days=max_age_days)
    
    for snapshot in snapshots:
        snapshot_time = datetime.fromtimestamp(snapshot.creation_time)
        
        if snapshot_time < cutoff_time:
            client.delete_snapshot(
                collection_name=collection_name,
                snapshot_name=snapshot.name
            )
            print(f"删除旧快照: {snapshot.name}")

cleanup_old_snapshots("my_collection")

容量规划 #

python
def capacity_planning(collection_name, growth_rate_per_day=10000):
    info = client.get_collection(collection_name)
    
    current_points = info.points_count
    vector_size = info.config.params.vectors.size
    
    bytes_per_point = vector_size * 4
    hnsw_overhead = 1.3
    
    current_memory_gb = (current_points * bytes_per_point * hnsw_overhead) / (1024**3)
    
    projected_points_30d = current_points + (growth_rate_per_day * 30)
    projected_memory_30d_gb = (projected_points_30d * bytes_per_point * hnsw_overhead) / (1024**3)
    
    projected_points_90d = current_points + (growth_rate_per_day * 90)
    projected_memory_90d_gb = (projected_points_90d * bytes_per_point * hnsw_overhead) / (1024**3)
    
    return {
        "current": {
            "points": current_points,
            "memory_gb": current_memory_gb
        },
        "projected_30d": {
            "points": projected_points_30d,
            "memory_gb": projected_memory_30d_gb
        },
        "projected_90d": {
            "points": projected_points_90d,
            "memory_gb": projected_memory_90d_gb
        }
    }

capacity = capacity_planning("my_collection")
print(f"当前内存: {capacity['current']['memory_gb']:.2f} GB")
print(f"30 天后: {capacity['projected_30d']['memory_gb']:.2f} GB")
print(f"90 天后: {capacity['projected_90d']['memory_gb']:.2f} GB")

最佳实践 #

监控清单 #

text
监控检查清单:

基础监控:
├── 服务健康状态
├── 端口可达性
├── 磁盘空间
└── 内存使用

性能监控:
├── 查询延迟(P50/P95/P99)
├── 吞吐量(QPS)
├── 索引进度
└── 优化器状态

业务监控:
├── Collection 状态
├── 点数变化
├── 快照状态
└── 集群状态

告警配置:
├── 服务不可用
├── 高延迟
├── 内存告警
└── 磁盘告警

运维脚本 #

python
def daily_maintenance():
    collections = client.get_collections()
    
    for col in collections.collections:
        print(f"\n维护 Collection: {col.name}")
        
        info = client.get_collection(col.name)
        
        if info.status != "green":
            print(f"  状态异常: {info.status}")
        
        monitor_collection(col.name)
    
    print("\n维护完成")

daily_maintenance()

小结 #

本章详细介绍了监控与运维:

  • 健康检查方法
  • Prometheus 监控集成
  • 性能监控指标
  • 日志管理
  • 告警配置
  • 日常运维操作

下一步 #

掌握监控运维后,继续学习 语义搜索,开始实战应用!

最后更新:2026-04-04