监控与运维 #
本章介绍 Qdrant 的监控和运维管理。
监控概述 #
text
Qdrant 监控架构:
┌─────────────────────────────────────────────────────────────┐
│ 监控体系 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 数据采集 数据存储 可视化告警 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ REST API │ │ Prometheus │ │ Grafana │ │
│ │ Metrics │ → │ InfluxDB │ → │ Alert │ │
│ │ Logs │ │ Elasticsearch│ │ Dashboard │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
健康检查 #
基础健康检查 #
python
from qdrant_client import QdrantClient
client = QdrantClient(url="http://localhost:6333")
def health_check():
try:
response = client.http.client.get("/health")
if response.status_code == 200:
return {"status": "healthy", "message": "服务正常"}
else:
return {"status": "unhealthy", "message": f"状态码: {response.status_code}"}
except Exception as e:
return {"status": "error", "message": str(e)}
health = health_check()
print(health)
详细健康检查 #
python
def detailed_health_check():
checks = {}
try:
collections = client.get_collections()
checks["collections"] = {
"status": "ok",
"count": len(collections.collections)
}
except Exception as e:
checks["collections"] = {"status": "error", "message": str(e)}
try:
cluster_info = client.get_cluster_info()
checks["cluster"] = {
"status": "ok" if cluster_info.status == "enabled" else "standalone",
"peers": cluster_info.number_of_peers
}
except Exception as e:
checks["cluster"] = {"status": "error", "message": str(e)}
for col in collections.collections:
info = client.get_collection(col.name)
if info.status == "green":
status = "ok"
elif info.status == "yellow":
status = "warning"
else:
status = "error"
checks[f"collection_{col.name}"] = {
"status": status,
"points": info.points_count,
"indexed": info.indexed_vectors_count
}
return checks
health_details = detailed_health_check()
for check, result in health_details.items():
print(f"{check}: {result}")
就绪检查 #
python
def readiness_check():
try:
response = client.http.client.get("/ready")
if response.status_code == 200:
return {"ready": True}
else:
return {"ready": False, "reason": f"状态码: {response.status_code}"}
except Exception as e:
return {"ready": False, "reason": str(e)}
ready = readiness_check()
print(f"服务就绪: {ready}")
监控指标 #
Prometheus 指标 #
Qdrant 内置 Prometheus 指标端点:
text
访问指标端点:
http://localhost:6333/metrics
关键指标 #
text
关键监控指标:
应用指标:
├── qdrant_collections_total
├── qdrant_points_total
├── qdrant_requests_total
├── qdrant_request_duration_seconds
└── qdrant_grpc_requests_total
存储指标:
├── qdrant_storage_size_bytes
├── qdrant_segments_total
└── qdrant_index_size_bytes
内存指标:
├── qdrant_memory_used_bytes
└── qdrant_memory_available_bytes
集群指标:
├── qdrant_cluster_peers_total
├── qdrant_cluster_shards_total
└── qdrant_cluster_status
Prometheus 配置 #
yaml
scrape_configs:
- job_name: 'qdrant'
static_configs:
- targets: ['qdrant:6333']
metrics_path: '/metrics'
scrape_interval: 15s
Grafana Dashboard #
json
{
"dashboard": {
"title": "Qdrant Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(qdrant_requests_total[5m])"
}
]
},
{
"title": "Request Latency",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(qdrant_request_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Points Count",
"targets": [
{
"expr": "qdrant_points_total"
}
]
}
]
}
}
Collection 监控 #
Collection 状态监控 #
python
def monitor_collection(collection_name):
info = client.get_collection(collection_name)
metrics = {
"name": collection_name,
"status": info.status,
"points_count": info.points_count,
"indexed_vectors_count": info.indexed_vectors_count,
"segments_count": info.segments_count,
"optimizer_status": str(info.optimizer_status)
}
if hasattr(info, 'config'):
metrics["vector_size"] = info.config.params.vectors.size
metrics["distance"] = str(info.config.params.vectors.distance)
return metrics
metrics = monitor_collection("my_collection")
for key, value in metrics.items():
print(f"{key}: {value}")
索引进度监控 #
python
import time
def monitor_indexing_progress(collection_name, interval=5):
while True:
info = client.get_collection(collection_name)
total = info.points_count
indexed = info.indexed_vectors_count
if total > 0:
progress = (indexed / total) * 100
print(f"索引进度: {progress:.1f}% ({indexed}/{total})")
if info.status == "green":
print("索引完成")
break
time.sleep(interval)
monitor_indexing_progress("my_collection")
段信息监控 #
python
def monitor_segments(collection_name):
info = client.get_collection(collection_name)
print(f"段数量: {info.segments_count}")
if hasattr(info, 'segments'):
for segment in info.segments:
print(f" 段 {segment.segment_id}:")
print(f" 向量数: {segment.points_count}")
print(f" 状态: {segment.status}")
性能监控 #
查询延迟监控 #
python
import time
import statistics
def measure_query_latency(collection_name, query_vectors, iterations=100):
latencies = []
for query_vector in query_vectors[:iterations]:
start = time.time()
client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=10
)
latency_ms = (time.time() - start) * 1000
latencies.append(latency_ms)
return {
"mean_ms": statistics.mean(latencies),
"median_ms": statistics.median(latencies),
"p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_ms": sorted(latencies)[int(len(latencies) * 0.99)],
"max_ms": max(latencies),
"min_ms": min(latencies)
}
latency_stats = measure_query_latency("my_collection", [[0.1] * 384] * 100)
print(f"P99 延迟: {latency_stats['p99_ms']:.2f}ms")
吞吐量监控 #
python
from concurrent.futures import ThreadPoolExecutor
import time
def measure_throughput(collection_name, query_vectors, duration_sec=10, workers=8):
start_time = time.time()
query_count = [0]
def worker():
while time.time() - start_time < duration_sec:
for qv in query_vectors:
client.search(
collection_name=collection_name,
query_vector=qv,
limit=10
)
query_count[0] += 1
if time.time() - start_time >= duration_sec:
break
with ThreadPoolExecutor(max_workers=workers) as executor:
list(executor.map(lambda _: worker(), range(workers)))
qps = query_count[0] / duration_sec
return {
"total_queries": query_count[0],
"duration_sec": duration_sec,
"qps": qps
}
throughput = measure_throughput("my_collection", [[0.1] * 384] * 10)
print(f"QPS: {throughput['qps']:.0f}")
内存使用监控 #
python
import psutil
import os
def monitor_memory():
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
"rss_mb": memory_info.rss / (1024 * 1024),
"vms_mb": memory_info.vms / (1024 * 1024),
"percent": process.memory_percent()
}
memory_stats = monitor_memory()
print(f"内存使用: {memory_stats['rss_mb']:.1f} MB ({memory_stats['percent']:.1f}%)")
日志管理 #
日志级别配置 #
yaml
log_level: INFO
logger:
level: INFO
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
结构化日志 #
python
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
return json.dumps(log_entry)
logger = logging.getLogger("qdrant_monitor")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("搜索请求完成", extra={"collection": "my_collection", "latency_ms": 5.2})
告警配置 #
告警规则 #
yaml
groups:
- name: qdrant_alerts
rules:
- alert: QdrantHighLatency
expr: histogram_quantile(0.99, rate(qdrant_request_duration_seconds_bucket[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Qdrant 查询延迟过高"
description: "P99 延迟超过 100ms"
- alert: QdrantHighMemory
expr: qdrant_memory_used_bytes / qdrant_memory_available_bytes > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "Qdrant 内存使用过高"
description: "内存使用率超过 90%"
- alert: QdrantCollectionRed
expr: qdrant_collection_status == 2
for: 1m
labels:
severity: critical
annotations:
summary: "Collection 状态异常"
description: "Collection 处于红色状态"
告警通知 #
yaml
alertmanager:
receivers:
- name: 'team-alerts'
slack_configs:
- api_url: 'https://hooks.slack.com/services/xxx'
channel: '#alerts'
email_configs:
- to: 'team@example.com'
from: 'alertmanager@example.com'
smarthost: 'smtp.example.com:587'
运维操作 #
Collection 维护 #
python
def maintain_collection(collection_name):
info = client.get_collection(collection_name)
print(f"Collection: {collection_name}")
print(f"状态: {info.status}")
print(f"点数: {info.points_count}")
if info.status == "yellow":
print("触发优化...")
client.update_collection(
collection_name=collection_name,
optimizer_config=OptimizersConfigDiff(
deleted_threshold=0.1
)
)
deleted_ratio = getattr(info, 'deleted_count', 0) / max(info.points_count, 1)
if deleted_ratio > 0.1:
print(f"删除比例过高: {deleted_ratio:.1%}")
maintain_collection("my_collection")
清理操作 #
python
def cleanup_old_snapshots(collection_name, max_age_days=30):
from datetime import datetime, timedelta
snapshots = client.get_snapshots(collection_name)
cutoff_time = datetime.now() - timedelta(days=max_age_days)
for snapshot in snapshots:
snapshot_time = datetime.fromtimestamp(snapshot.creation_time)
if snapshot_time < cutoff_time:
client.delete_snapshot(
collection_name=collection_name,
snapshot_name=snapshot.name
)
print(f"删除旧快照: {snapshot.name}")
cleanup_old_snapshots("my_collection")
容量规划 #
python
def capacity_planning(collection_name, growth_rate_per_day=10000):
info = client.get_collection(collection_name)
current_points = info.points_count
vector_size = info.config.params.vectors.size
bytes_per_point = vector_size * 4
hnsw_overhead = 1.3
current_memory_gb = (current_points * bytes_per_point * hnsw_overhead) / (1024**3)
projected_points_30d = current_points + (growth_rate_per_day * 30)
projected_memory_30d_gb = (projected_points_30d * bytes_per_point * hnsw_overhead) / (1024**3)
projected_points_90d = current_points + (growth_rate_per_day * 90)
projected_memory_90d_gb = (projected_points_90d * bytes_per_point * hnsw_overhead) / (1024**3)
return {
"current": {
"points": current_points,
"memory_gb": current_memory_gb
},
"projected_30d": {
"points": projected_points_30d,
"memory_gb": projected_memory_30d_gb
},
"projected_90d": {
"points": projected_points_90d,
"memory_gb": projected_memory_90d_gb
}
}
capacity = capacity_planning("my_collection")
print(f"当前内存: {capacity['current']['memory_gb']:.2f} GB")
print(f"30 天后: {capacity['projected_30d']['memory_gb']:.2f} GB")
print(f"90 天后: {capacity['projected_90d']['memory_gb']:.2f} GB")
最佳实践 #
监控清单 #
text
监控检查清单:
基础监控:
├── 服务健康状态
├── 端口可达性
├── 磁盘空间
└── 内存使用
性能监控:
├── 查询延迟(P50/P95/P99)
├── 吞吐量(QPS)
├── 索引进度
└── 优化器状态
业务监控:
├── Collection 状态
├── 点数变化
├── 快照状态
└── 集群状态
告警配置:
├── 服务不可用
├── 高延迟
├── 内存告警
└── 磁盘告警
运维脚本 #
python
def daily_maintenance():
collections = client.get_collections()
for col in collections.collections:
print(f"\n维护 Collection: {col.name}")
info = client.get_collection(col.name)
if info.status != "green":
print(f" 状态异常: {info.status}")
monitor_collection(col.name)
print("\n维护完成")
daily_maintenance()
小结 #
本章详细介绍了监控与运维:
- 健康检查方法
- Prometheus 监控集成
- 性能监控指标
- 日志管理
- 告警配置
- 日常运维操作
下一步 #
掌握监控运维后,继续学习 语义搜索,开始实战应用!
最后更新:2026-04-04