分区管理 #
一、分区概述 #
1.1 什么是分区 #
分区是 Collection 的子集,用于优化查询性能和数据管理。
text
分区结构:
┌─────────────────────────────────────────┐
│ Collection: orders │
├─────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition: │ │ Partition: │ │
│ │ _default │ │ 2024_01 │ │
│ │ (默认分区) │ │ (2024年1月)│ │
│ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition: │ │ Partition: │ │
│ │ 2024_02 │ │ 2024_03 │ │
│ │ (2024年2月)│ │ (2024年3月)│ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────┘
1.2 分区的优势 #
text
分区优势:
┌─────────────────────────────────────────┐
│ 性能优化 │
├─────────────────────────────────────────┤
│ - 减少搜索范围 │
│ - 提高查询效率 │
│ - 降低内存占用 │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 数据管理 │
├─────────────────────────────────────────┤
│ - 按时间/业务分类 │
│ - 便于数据清理 │
│ - 支持数据生命周期管理 │
└─────────────────────────────────────────┘
二、创建分区 #
2.1 基本创建 #
python
from pymilvus import Collection
collection = Collection("documents")
collection.create_partition(
partition_name="2024_01",
description="2024年1月数据"
)
2.2 批量创建 #
python
def create_monthly_partitions(collection, year, months):
for month in months:
partition_name = f"{year}_{month:02d}"
description = f"{year}年{month}月数据"
collection.create_partition(partition_name, description)
print(f"创建分区: {partition_name}")
create_monthly_partitions(collection, 2024, range(1, 13))
2.3 按业务创建 #
python
def create_category_partitions(collection, categories):
for category in categories:
partition_name = f"category_{category}"
collection.create_partition(
partition_name,
description=f"{category}分类数据"
)
create_category_partitions(
collection,
["electronics", "books", "clothing", "food"]
)
三、分区操作 #
3.1 查看分区 #
python
print(collection.partitions)
for partition in collection.partitions:
print(f"分区名: {partition.name}")
print(f"描述: {partition.description}")
print(f"实体数: {partition.num_entities}")
print("-" * 30)
3.2 检查分区存在 #
python
has_partition = collection.has_partition("2024_01")
print(f"分区存在: {has_partition}")
3.3 获取分区对象 #
python
partition = collection.partition("2024_01")
print(f"分区名: {partition.name}")
print(f"描述: {partition.description}")
print(f"实体数: {partition.num_entities}")
3.4 删除分区 #
python
collection.drop_partition("2024_01")
if collection.has_partition("temp_partition"):
collection.drop_partition("temp_partition")
四、分区数据操作 #
4.1 插入数据到分区 #
python
partition = collection.partition("2024_01")
data = [
[1, 2, 3],
["文档1", "文档2", "文档3"],
[[0.1]*768, [0.2]*768, [0.3]*768]
]
partition.insert(data)
4.2 自动分区插入 #
python
from datetime import datetime
def insert_with_auto_partition(collection, data, date=None):
if date is None:
date = datetime.now()
partition_name = date.strftime("%Y_%m")
if not collection.has_partition(partition_name):
collection.create_partition(
partition_name,
description=f"{date.strftime('%Y年%m月')}数据"
)
partition = collection.partition(partition_name)
partition.insert(data)
insert_with_auto_partition(collection, data)
4.3 分区数据统计 #
python
def get_partition_stats(collection):
stats = {}
for partition in collection.partitions:
stats[partition.name] = {
"description": partition.description,
"num_entities": partition.num_entities
}
return stats
stats = get_partition_stats(collection)
for name, info in stats.items():
print(f"{name}: {info['num_entities']} 条数据")
五、分区搜索 #
5.1 指定分区搜索 #
python
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
partition_names=["2024_01"]
)
5.2 多分区搜索 #
python
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=10,
partition_names=["2024_01", "2024_02"]
)
5.3 动态分区搜索 #
python
def search_by_date_range(collection, query_vector, start_date, end_date):
partitions = []
current = start_date
while current <= end_date:
partition_name = current.strftime("%Y_%m")
if collection.has_partition(partition_name):
partitions.append(partition_name)
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
if not partitions:
return []
return collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10,
partition_names=partitions
)
六、分区查询 #
6.1 分区标量查询 #
python
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
partition_names=["2024_01"]
)
6.2 分区删除 #
python
collection.delete(
expr='id in [1, 2, 3]',
partition_name="2024_01"
)
七、分区策略 #
7.1 时间分区策略 #
python
from datetime import datetime, timedelta
class TimePartitionManager:
def __init__(self, collection, date_format="%Y_%m"):
self.collection = collection
self.date_format = date_format
def get_partition_name(self, date):
return date.strftime(self.date_format)
def create_partition_for_date(self, date):
partition_name = self.get_partition_name(date)
if not self.collection.has_partition(partition_name):
self.collection.create_partition(
partition_name,
description=f"{date.strftime('%Y年%m月%d日')}数据"
)
return partition_name
def insert_with_date(self, data, date):
partition_name = self.create_partition_for_date(date)
partition = self.collection.partition(partition_name)
partition.insert(data)
def search_in_date_range(self, query_vector, start_date, end_date, **kwargs):
partitions = []
current = start_date
while current <= end_date:
partition_name = self.get_partition_name(current)
if self.collection.has_partition(partition_name):
partitions.append(partition_name)
current += timedelta(days=1)
if not partitions:
return []
return self.collection.search(
data=[query_vector],
anns_field="embedding",
param=kwargs.get("param", {"metric_type": "L2"}),
limit=kwargs.get("limit", 10),
partition_names=partitions
)
def cleanup_old_partitions(self, keep_days=90):
cutoff = datetime.now() - timedelta(days=keep_days)
for partition in self.collection.partitions:
if partition.name == "_default":
continue
try:
partition_date = datetime.strptime(partition.name, self.date_format)
if partition_date < cutoff:
print(f"删除分区: {partition.name}")
self.collection.drop_partition(partition.name)
except ValueError:
continue
manager = TimePartitionManager(collection)
manager.insert_with_date(data, datetime.now())
7.2 业务分区策略 #
python
class CategoryPartitionManager:
def __init__(self, collection):
self.collection = collection
def get_partition_name(self, category):
return f"category_{category}"
def create_category_partition(self, category):
partition_name = self.get_partition_name(category)
if not self.collection.has_partition(partition_name):
self.collection.create_partition(
partition_name,
description=f"{category}分类数据"
)
return partition_name
def insert_by_category(self, data, category):
partition_name = self.create_category_partition(category)
partition = self.collection.partition(partition_name)
partition.insert(data)
def search_by_category(self, query_vector, category, **kwargs):
partition_name = self.get_partition_name(category)
if not self.collection.has_partition(partition_name):
return []
return self.collection.search(
data=[query_vector],
anns_field="embedding",
param=kwargs.get("param", {"metric_type": "L2"}),
limit=kwargs.get("limit", 10),
partition_names=[partition_name]
)
八、分区最佳实践 #
8.1 分区数量限制 #
text
分区数量建议:
- 单个Collection最大分区数: 4096
- 建议分区数: < 100
- 每个分区建议数据量: 10万-100万
8.2 分区键选择 #
text
分区键选择建议:
时间维度:
├── 按月分区 (推荐)
├── 按日分区 (数据量大)
└── 按年分区 (数据量小)
业务维度:
├── 按类别分区
├── 按地区分区
└── 按用户分区
8.3 分区维护 #
python
def maintain_partitions(collection, keep_days=90):
manager = TimePartitionManager(collection)
manager.cleanup_old_partitions(keep_days)
for partition in collection.partitions:
if partition.num_entities == 0 and partition.name != "_default":
print(f"清理空分区: {partition.name}")
collection.drop_partition(partition.name)
九、总结 #
分区操作速查表:
| 操作 | 方法 |
|---|---|
| 创建分区 | collection.create_partition() |
| 查看分区 | collection.partitions |
| 检查存在 | collection.has_partition() |
| 删除分区 | collection.drop_partition() |
| 分区插入 | partition.insert() |
| 分区搜索 | partition_names参数 |
下一步,让我们学习混合搜索!
最后更新:2026-04-04