性能优化 #

一、性能优化概述 #

text
性能优化方向:

┌─────────────────────────────────────────┐
│           优化维度                       │
├─────────────────────────────────────────┤
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  索引优化                        │   │
│  │  - 索引类型选择                  │   │
│  │  - 参数调优                      │   │
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  查询优化                        │   │
│  │  - 搜索参数                      │   │
│  │  - 批量处理                      │   │
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  配置优化                        │   │
│  │  - 内存配置                      │   │
│  │  - 并发配置                      │   │
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  架构优化                        │   │
│  │  - 资源配置                      │   │
│  │  - 扩展策略                      │   │
│  └─────────────────────────────────┘   │
│                                         │
└─────────────────────────────────────────┘

二、索引优化 #

2.1 索引类型选择 #

text
索引选择指南:

数据量              推荐索引         说明
──────────────────────────────────────────
< 10万              FLAT            精度最高
10万-100万          IVF_FLAT        平衡性能
100万-1000万        HNSW/IVF_PQ     高性能
> 1000万            DISKANN         大规模

2.2 IVF索引参数调优 #

python
def tune_ivf_params(n_vectors, dim):
    nlist = int(4 * (n_vectors ** 0.5))
    
    nlist = min(nlist, 65536)
    
    return {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": nlist}
    }

index_params = tune_ivf_params(1000000, 768)
collection.create_index("embedding", index_params)

2.3 HNSW参数调优 #

python
def tune_hnsw_params(dim, precision_requirement="medium"):
    if precision_requirement == "high":
        M = min(64, dim // 2)
        efConstruction = 500
    elif precision_requirement == "medium":
        M = min(32, dim // 4)
        efConstruction = 200
    else:
        M = min(16, dim // 8)
        efConstruction = 100
    
    return {
        "metric_type": "L2",
        "index_type": "HNSW",
        "params": {
            "M": M,
            "efConstruction": efConstruction
        }
    }

index_params = tune_hnsw_params(768, "high")
collection.create_index("embedding", index_params)

2.4 搜索参数优化 #

python
def optimize_search_params(index_type, nlist=None):
    if index_type == "IVF_FLAT":
        return {
            "metric_type": "L2",
            "params": {"nprobe": max(1, nlist // 8)}
        }
    elif index_type == "HNSW":
        return {
            "metric_type": "L2",
            "params": {"ef": 64}
        }
    elif index_type == "IVF_PQ":
        return {
            "metric_type": "L2",
            "params": {"nprobe": max(1, nlist // 4)}
        }
    return {"metric_type": "L2"}

三、查询优化 #

3.1 批量搜索 #

python
import numpy as np

def batch_search(collection, query_vectors, batch_size=100):
    all_results = []
    
    for i in range(0, len(query_vectors), batch_size):
        batch = query_vectors[i:i+batch_size]
        results = collection.search(
            data=batch,
            anns_field="embedding",
            param={"metric_type": "L2", "params": {"nprobe": 16}},
            limit=10
        )
        all_results.extend(results)
    
    return all_results

query_vectors = np.random.rand(1000, 768).tolist()
results = batch_search(collection, query_vectors)

3.2 并行搜索 #

python
from concurrent.futures import ThreadPoolExecutor

def parallel_search(collection, query_vectors, workers=4):
    def search_batch(batch):
        return collection.search(
            data=batch,
            anns_field="embedding",
            param={"metric_type": "L2", "params": {"nprobe": 16}},
            limit=10
        )
    
    batch_size = len(query_vectors) // workers
    batches = [
        query_vectors[i:i+batch_size]
        for i in range(0, len(query_vectors), batch_size)
    ]
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(search_batch, batches))
    
    return results

3.3 过滤优化 #

python
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    expr='category == "electronics"'
)

results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    partition_names=["electronics_partition"]
)

3.4 输出字段优化 #

python
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    output_fields=["id", "title"]
)

results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10
)

四、配置优化 #

4.1 内存配置 #

yaml
queryNode:
  resources:
    requests:
      memory: "16Gi"
    limits:
      memory: "32Gi"
  config:
    cacheSize: 32

dataNode:
  resources:
    requests:
      memory: "8Gi"
    limits:
      memory: "16Gi"

4.2 并发配置 #

yaml
proxy:
  config:
    maxFieldLength: 65535
    maxShardNum: 256
    maxTaskNum: 1024

queryNode:
  config:
    scheduler:
      receiveChanSize: 1024
      unsolvedQueueSize: 1024

4.3 段配置 #

yaml
dataCoord:
  config:
    segment:
      maxSize: 512
      diskSegmentMaxSize: 2048
      sealProportion: 0.12

queryNode:
  config:
    segcore:
      chunkRows: 1024
      smallIndex:
        nlist: 128
        nprobe: 16

五、资源配置优化 #

5.1 CPU配置 #

yaml
queryNode:
  resources:
    requests:
      cpu: "4"
    limits:
      cpu: "8"

indexNode:
  resources:
    requests:
      cpu: "4"
    limits:
      cpu: "8"

5.2 副本配置 #

yaml
queryNode:
  replicas: 5

proxy:
  replicas: 3

dataNode:
  replicas: 3

5.3 存储配置 #

yaml
minio:
  persistence:
    storageClass: "fast-ssd"
    size: 500Gi

pulsar:
  bookkeeper:
    volumes:
      journal:
        storageClass: "fast-ssd"
        size: 100Gi

六、性能监控 #

6.1 关键指标 #

text
关键监控指标:

┌─────────────────────────────────────────┐
│           延迟指标                       │
├─────────────────────────────────────────┤
│  - 搜索延迟 (P50/P95/P99)               │
│  - 插入延迟                             │
│  - 查询延迟                             │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│           吞吐量指标                     │
├─────────────────────────────────────────┤
│  - QPS (每秒查询数)                     │
│  - 插入TPS                              │
│  - 网络吞吐                             │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│           资源指标                       │
├─────────────────────────────────────────┤
│  - CPU使用率                            │
│  - 内存使用率                           │
│  - 磁盘I/O                              │
│  - 网络I/O                              │
└─────────────────────────────────────────┘

6.2 性能测试 #

python
import time
import numpy as np

def benchmark_search(collection, num_queries=1000, dim=768):
    query_vectors = np.random.rand(num_queries, dim).tolist()
    
    search_params = {
        "metric_type": "L2",
        "params": {"nprobe": 16}
    }
    
    latencies = []
    
    for query in query_vectors:
        start = time.time()
        collection.search(
            data=[query],
            anns_field="embedding",
            param=search_params,
            limit=10
        )
        latencies.append(time.time() - start)
    
    latencies = np.array(latencies) * 1000
    
    return {
        "p50": np.percentile(latencies, 50),
        "p95": np.percentile(latencies, 95),
        "p99": np.percentile(latencies, 99),
        "avg": np.mean(latencies),
        "qps": num_queries / sum(latencies / 1000)
    }

results = benchmark_search(collection)
print(f"P50延迟: {results['p50']:.2f}ms")
print(f"P95延迟: {results['p95']:.2f}ms")
print(f"P99延迟: {results['p99']:.2f}ms")
print(f"QPS: {results['qps']:.2f}")

七、性能调优案例 #

7.1 高QPS场景 #

yaml
proxy:
  replicas: 5
  resources:
    requests:
      cpu: "2"
      memory: "4Gi"

queryNode:
  replicas: 10
  resources:
    requests:
      cpu: "4"
      memory: "16Gi"
  config:
    cacheSize: 64

queryCoord:
  config:
    autoHandoff: true
    autoBalance: true

7.2 大数据量场景 #

yaml
dataNode:
  replicas: 5
  config:
    segment:
      maxSize: 1024

indexNode:
  replicas: 5
  resources:
    requests:
      cpu: "8"
      memory: "32Gi"

queryNode:
  config:
    segcore:
      smallIndex:
        enable: true

7.3 低延迟场景 #

yaml
queryNode:
  config:
    cacheSize: 128
    segcore:
      chunkRows: 4096

proxy:
  config:
    timeTickInterval: 100

八、最佳实践 #

8.1 性能优化检查清单 #

text
性能优化检查清单:

□ 选择合适的索引类型
□ 调优索引参数
□ 优化搜索参数
□ 使用批量操作
□ 配置足够内存
□ 合理配置副本数
□ 启用监控告警
□ 定期性能测试

8.2 常见问题排查 #

text
常见性能问题:

问题                原因                解决方案
──────────────────────────────────────────────
搜索延迟高          索引参数不当        调整nprobe/ef
内存不足            缓存配置小          增加cacheSize
QPS低               副本数不足          增加queryNode副本
插入慢              批量大小小          增大batch_size

九、总结 #

性能优化速查表:

优化方向 方法
索引优化 选择合适索引类型和参数
查询优化 批量搜索、并行搜索
配置优化 内存、并发、段配置
资源优化 CPU、内存、副本配置
监控优化 关键指标监控告警

恭喜你完成Milvus学习之旅!

最后更新:2026-04-04