标量查询 #
一、查询概述 #
1.1 查询与搜索的区别 #
text
Query vs Search:
┌─────────────────────────────────────────┐
│ Query (标量查询) │
├─────────────────────────────────────────┤
│ - 基于标量字段过滤 │
│ - 精确匹配 │
│ - 类似SQL WHERE子句 │
│ - 不使用向量索引 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Search (向量搜索) │
├─────────────────────────────────────────┤
│ - 基于向量相似度 │
│ - 近似匹配 │
│ - 使用向量索引 │
│ - 返回最相似的向量 │
└─────────────────────────────────────────┘
1.2 查询流程 #
text
查询流程:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 构建表达式 │────▶│ 执行查询 │────▶│ 获取结果 │
└──────────┘ └──────────┘ └──────────┘
二、基本查询 #
2.1 简单查询 #
python
from pymilvus import Collection
collection = Collection("documents")
collection.load()
results = collection.query(
expr='id in [1, 2, 3]',
output_fields=["id", "title", "content"]
)
for result in results:
print(result)
2.2 查询所有字段 #
python
results = collection.query(
expr='id > 0',
output_fields=["*"]
)
2.3 查询参数 #
| 参数 | 类型 | 说明 |
|---|---|---|
| expr | str | 过滤表达式 |
| output_fields | list | 返回字段列表 |
| partition_names | list | 分区列表 |
| timeout | float | 超时时间 |
| offset | int | 偏移量 |
| limit | int | 返回数量 |
三、表达式语法 #
3.1 比较运算符 #
python
results = collection.query(expr='id == 1')
results = collection.query(expr='id != 1')
results = collection.query(expr='id > 10')
results = collection.query(expr='id >= 10')
results = collection.query(expr='id < 100')
results = collection.query(expr='id <= 100')
3.2 逻辑运算符 #
python
results = collection.query(
expr='id > 10 and id < 100'
)
results = collection.query(
expr='category == "electronics" or category == "books"'
)
results = collection.query(
expr='not (id == 50)'
)
3.3 集合运算 #
python
results = collection.query(
expr='id in [1, 2, 3, 4, 5]'
)
results = collection.query(
expr='category in ["electronics", "books", "clothing"]'
)
results = collection.query(
expr='id not in [1, 2, 3]'
)
3.4 字符串匹配 #
python
results = collection.query(
expr='title like "%机器学习%"'
)
results = collection.query(
expr='title like "Python%"'
)
results = collection.query(
expr='title like "%入门"'
)
3.5 空值判断 #
python
results = collection.query(
expr='author != ""'
)
results = collection.query(
expr='author == ""'
)
四、复杂查询 #
4.1 组合条件 #
python
results = collection.query(
expr='category == "electronics" and price > 100 and price < 1000',
output_fields=["id", "title", "price"]
)
4.2 嵌套条件 #
python
results = collection.query(
expr='(category == "electronics" or category == "books") and price < 500',
output_fields=["id", "title", "category", "price"]
)
4.3 数值范围 #
python
results = collection.query(
expr='price >= 100 and price <= 500',
output_fields=["id", "title", "price"]
)
4.4 时间范围 #
python
import time
start_time = int(time.time()) - 86400 * 30
end_time = int(time.time())
results = collection.query(
expr=f'created_at >= {start_time} and created_at <= {end_time}',
output_fields=["id", "title", "created_at"]
)
五、JSON字段查询 #
5.1 JSON字段访问 #
python
results = collection.query(
expr='metadata["author"] == "张三"',
output_fields=["id", "metadata"]
)
results = collection.query(
expr='metadata["views"] > 1000',
output_fields=["id", "metadata"]
)
5.2 JSON包含查询 #
python
results = collection.query(
expr='json_contains(metadata["tags"], "AI")',
output_fields=["id", "metadata"]
)
results = collection.query(
expr='json_contains_all(metadata["tags"], ["AI", "Python"])',
output_fields=["id", "metadata"]
)
results = collection.query(
expr='json_contains_any(metadata["tags"], ["AI", "ML"])',
output_fields=["id", "metadata"]
)
5.3 JSON数组长度 #
python
results = collection.query(
expr='json_array_length(metadata["tags"]) > 2',
output_fields=["id", "metadata"]
)
六、数组字段查询 #
6.1 数组包含 #
python
results = collection.query(
expr='array_contains(tags, "AI")',
output_fields=["id", "tags"]
)
results = collection.query(
expr='array_contains_all(tags, ["AI", "Python"])',
output_fields=["id", "tags"]
)
results = collection.query(
expr='array_contains_any(tags, ["AI", "ML", "DL"])',
output_fields=["id", "tags"]
)
6.2 数组长度 #
python
results = collection.query(
expr='array_length(tags) > 3',
output_fields=["id", "tags"]
)
七、分页查询 #
7.1 基本分页 #
python
page_size = 20
page = 1
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
offset=(page - 1) * page_size,
limit=page_size
)
7.2 分页函数 #
python
def paginate_query(collection, expr, page=1, page_size=20, output_fields=None):
if output_fields is None:
output_fields = ["*"]
results = collection.query(
expr=expr,
output_fields=output_fields,
offset=(page - 1) * page_size,
limit=page_size
)
return results
page1 = paginate_query(collection, 'id > 0', page=1)
page2 = paginate_query(collection, 'id > 0', page=2)
7.3 游标分页 #
python
def cursor_query(collection, expr, last_id=0, limit=20, output_fields=None):
if output_fields is None:
output_fields = ["*"]
cursor_expr = f'{expr} and id > {last_id}'
results = collection.query(
expr=cursor_expr,
output_fields=output_fields,
limit=limit
)
next_cursor = results[-1]["id"] if results else None
return results, next_cursor
results, cursor = cursor_query(collection, 'category == "electronics"', last_id=0)
八、分区查询 #
8.1 指定分区 #
python
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
partition_names=["2024_01"]
)
8.2 多分区查询 #
python
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
partition_names=["2024_01", "2024_02"]
)
九、查询优化 #
9.1 索引优化 #
python
collection.create_index(
field_name="category",
index_params={"index_type": "Trie"}
)
collection.create_index(
field_name="created_at",
index_params={"index_type": "STL_SORT"}
)
9.2 查询性能 #
python
import time
def benchmark_query(collection, expr, rounds=10):
times = []
for _ in range(rounds):
start = time.time()
results = collection.query(expr=expr, output_fields=["id"])
times.append(time.time() - start)
avg_time = sum(times) / len(times)
print(f"平均查询时间: {avg_time*1000:.2f}ms")
十、完整示例 #
10.1 文档查询系统 #
python
from pymilvus import Collection
from datetime import datetime, timedelta
collection = Collection("documents")
collection.load()
class DocumentQuery:
def __init__(self, collection):
self.collection = collection
def find_by_id(self, doc_id):
results = self.collection.query(
expr=f'id == {doc_id}',
output_fields=["*"]
)
return results[0] if results else None
def find_by_category(self, category, limit=100):
return self.collection.query(
expr=f'category == "{category}"',
output_fields=["id", "title", "category"],
limit=limit
)
def find_by_author(self, author, limit=100):
return self.collection.query(
expr=f'author == "{author}"',
output_fields=["id", "title", "author"],
limit=limit
)
def find_recent(self, days=7, limit=100):
start_time = int((datetime.now() - timedelta(days=days)).timestamp())
return self.collection.query(
expr=f'created_at >= {start_time}',
output_fields=["id", "title", "created_at"],
limit=limit
)
def find_by_tags(self, tags, limit=100):
return self.collection.query(
expr=f'array_contains_any(tags, {tags})',
output_fields=["id", "title", "tags"],
limit=limit
)
def search_with_pagination(self, expr, page=1, page_size=20):
return self.collection.query(
expr=expr,
output_fields=["*"],
offset=(page - 1) * page_size,
limit=page_size
)
query = DocumentQuery(collection)
doc = query.find_by_id(1)
print(doc)
recent_docs = query.find_recent(days=30)
print(f"最近30天文档: {len(recent_docs)} 篇")
十一、总结 #
查询操作速查表:
| 操作 | 表达式示例 |
|---|---|
| 等于 | id == 1 |
| 不等于 | id != 1 |
| 大于 | id > 10 |
| 范围 | id >= 10 and id <= 100 |
| 包含 | id in [1, 2, 3] |
| 模糊匹配 | title like “%关键词%” |
| JSON字段 | metadata[“key”] == “value” |
| 数组包含 | array_contains(tags, “AI”) |
下一步,让我们学习数据删除!
最后更新:2026-04-04