混合搜索 #
一、混合搜索概述 #
1.1 什么是混合搜索 #
混合搜索是结合多种搜索方式的检索技术,提供更精准的搜索结果。
text
混合搜索类型:
┌─────────────────────────────────────────┐
│ 混合搜索模式 │
├─────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────┐ │
│ │ 向量 + 标量过滤 │ │
│ │ 相似度搜索 + 条件过滤 │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ 多向量搜索 │ │
│ │ 多个向量字段的联合搜索 │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ 稀疏 + 密集向量 │ │
│ │ 关键词搜索 + 语义搜索 │ │
│ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
1.2 混合搜索优势 #
text
混合搜索优势:
┌─────────────────────────────────────────┐
│ 精度提升 │
│ - 结合多种特征 │
│ - 减少无关结果 │
│ - 提高召回率 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 灵活性 │
│ - 支持多种搜索组合 │
│ - 可调整权重 │
│ - 适应不同场景 │
└─────────────────────────────────────────┘
二、向量+标量过滤 #
2.1 基本过滤搜索 #
python
from pymilvus import Collection
collection = Collection("documents")
collection.load()
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
expr='category == "electronics"',
output_fields=["title", "category"]
)
2.2 复杂过滤条件 #
python
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
expr='category == "electronics" and price > 100 and price < 1000',
output_fields=["title", "category", "price"]
)
2.3 JSON字段过滤 #
python
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
expr='json_contains(metadata["tags"], "AI")',
output_fields=["title", "metadata"]
)
三、多向量搜索 #
3.1 多向量字段Schema #
python
from pymilvus import FieldSchema, CollectionSchema, DataType
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="title_embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="content_embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="image_embedding", dtype=DataType.FLOAT_VECTOR, dim=512)
]
schema = CollectionSchema(fields, "多向量文档集合")
3.2 多向量搜索 #
python
from pymilvus import AnnSearchRequest
title_req = AnnSearchRequest(
data=[title_query_vector],
anns_field="title_embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=20
)
content_req = AnnSearchRequest(
data=[content_query_vector],
anns_field="content_embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=20
)
results = collection.hybrid_search(
reqs=[title_req, content_req],
rerank={"strategy": "rrf", "params": {"k": 60}},
limit=10
)
3.3 加权多向量搜索 #
python
results = collection.hybrid_search(
reqs=[title_req, content_req],
rerank={"strategy": "weighted", "params": {"weights": [0.3, 0.7]}},
limit=10
)
四、重排序策略 #
4.1 倒数排名融合 (RRF) #
python
results = collection.hybrid_search(
reqs=[req1, req2],
rerank={
"strategy": "rrf",
"params": {"k": 60}
},
limit=10
)
RRF公式:
text
RRF_score(d) = Σ 1/(k + rank(d))
4.2 加权融合 #
python
results = collection.hybrid_search(
reqs=[req1, req2, req3],
rerank={
"strategy": "weighted",
"params": {
"weights": [0.5, 0.3, 0.2]
}
},
limit=10
)
4.3 自定义重排序 #
python
def custom_rerank(results_list, weights):
scores = {}
for results, weight in zip(results_list, weights):
for rank, hit in enumerate(results[0]):
if hit.id not in scores:
scores[hit.id] = {"score": 0, "entity": hit.entity}
scores[hit.id]["score"] += weight * (1.0 / (rank + 1))
sorted_results = sorted(
scores.items(),
key=lambda x: x[1]["score"],
reverse=True
)
return sorted_results
title_results = collection.search(
data=[title_vector],
anns_field="title_embedding",
param=search_params,
limit=20
)
content_results = collection.search(
data=[content_vector],
anns_field="content_embedding",
param=search_params,
limit=20
)
reranked = custom_rerank([title_results, content_results], [0.3, 0.7])
五、稀疏+密集向量 #
5.1 Schema设计 #
python
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="sparse_embedding", dtype=DataType.SPARSE_FLOAT_VECTOR),
FieldSchema(name="dense_embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]
schema = CollectionSchema(fields, "稀疏+密集向量集合")
5.2 创建索引 #
python
sparse_index = {
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
}
collection.create_index("sparse_embedding", sparse_index)
dense_index = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index("dense_embedding", dense_index)
5.3 混合搜索 #
python
sparse_req = AnnSearchRequest(
data=[sparse_vector],
anns_field="sparse_embedding",
param={"metric_type": "IP"},
limit=20
)
dense_req = AnnSearchRequest(
data=[dense_vector],
anns_field="dense_embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=20
)
results = collection.hybrid_search(
reqs=[sparse_req, dense_req],
rerank={"strategy": "rrf", "params": {"k": 60}},
limit=10
)
六、混合搜索完整示例 #
6.1 RAG混合搜索 #
python
from pymilvus import Collection, AnnSearchRequest
import numpy as np
class HybridRAGSearch:
def __init__(self, collection):
self.collection = collection
def search_with_keywords_and_semantic(
self,
query_text,
keyword_vector,
semantic_vector,
top_k=10
):
keyword_req = AnnSearchRequest(
data=[keyword_vector],
anns_field="keyword_embedding",
param={"metric_type": "IP"},
limit=top_k * 2
)
semantic_req = AnnSearchRequest(
data=[semantic_vector],
anns_field="semantic_embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 16}},
limit=top_k * 2
)
results = self.collection.hybrid_search(
reqs=[keyword_req, semantic_req],
rerank={"strategy": "rrf", "params": {"k": 60}},
limit=top_k,
output_fields=["title", "content"]
)
documents = []
for hit in results[0]:
documents.append({
"id": hit.id,
"score": hit.distance,
"title": hit.entity.get("title"),
"content": hit.entity.get("content")
})
return documents
def search_with_filter(
self,
query_vector,
category=None,
date_range=None,
top_k=10
):
expr_parts = []
if category:
expr_parts.append(f'category == "{category}"')
if date_range:
start, end = date_range
expr_parts.append(f'created_at >= {start} and created_at <= {end}')
expr = " and ".join(expr_parts) if expr_parts else None
return self.collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 16}},
limit=top_k,
expr=expr,
output_fields=["title", "content", "category", "created_at"]
)
searcher = HybridRAGSearch(collection)
results = searcher.search_with_keywords_and_semantic(
query_text="机器学习入门",
keyword_vector=sparse_vector,
semantic_vector=dense_vector,
top_k=5
)
6.2 多模态搜索 #
python
class MultiModalSearch:
def __init__(self, collection):
self.collection = collection
def search_by_text_and_image(
self,
text_vector,
image_vector,
text_weight=0.5,
image_weight=0.5,
top_k=10
):
text_req = AnnSearchRequest(
data=[text_vector],
anns_field="text_embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 16}},
limit=top_k * 2
)
image_req = AnnSearchRequest(
data=[image_vector],
anns_field="image_embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 16}},
limit=top_k * 2
)
results = self.collection.hybrid_search(
reqs=[text_req, image_req],
rerank={
"strategy": "weighted",
"params": {"weights": [text_weight, image_weight]}
},
limit=top_k,
output_fields=["title", "image_url", "description"]
)
return results
searcher = MultiModalSearch(collection)
results = searcher.search_by_text_and_image(
text_vector=text_embedding,
image_vector=image_embedding,
text_weight=0.4,
image_weight=0.6
)
七、性能优化 #
7.1 搜索参数优化 #
python
def optimize_search_params(collection, query_vectors, index_type):
if index_type == "IVF_FLAT":
nprobe_options = [8, 16, 32, 64]
best_nprobe = 16
best_latency = float('inf')
for nprobe in nprobe_options:
start = time.time()
collection.search(
data=query_vectors,
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": nprobe}},
limit=10
)
latency = time.time() - start
if latency < best_latency:
best_latency = latency
best_nprobe = nprobe
return {"nprobe": best_nprobe}
return {}
7.2 批量混合搜索 #
python
def batch_hybrid_search(collection, query_pairs, batch_size=100):
all_results = []
for i in range(0, len(query_pairs), batch_size):
batch = query_pairs[i:i+batch_size]
title_vectors = [p[0] for p in batch]
content_vectors = [p[1] for p in batch]
title_req = AnnSearchRequest(
data=title_vectors,
anns_field="title_embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10
)
content_req = AnnSearchRequest(
data=content_vectors,
anns_field="content_embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10
)
results = collection.hybrid_search(
reqs=[title_req, content_req],
rerank={"strategy": "rrf", "params": {"k": 60}},
limit=10
)
all_results.extend(results)
return all_results
八、总结 #
混合搜索速查表:
| 搜索类型 | 方法 |
|---|---|
| 向量+过滤 | search() + expr参数 |
| 多向量搜索 | hybrid_search() |
| RRF重排序 | strategy: “rrf” |
| 加权融合 | strategy: “weighted” |
| 稀疏+密集 | SPARSE_FLOAT_VECTOR + FLOAT_VECTOR |
下一步,让我们学习时间旅行!
最后更新:2026-04-04