分区管理 #

一、分区概述 #

1.1 什么是分区 #

分区是 Collection 的子集,用于优化查询性能和数据管理。

text
分区结构:

┌─────────────────────────────────────────┐
│         Collection: orders              │
├─────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐      │
│  │ Partition:  │  │ Partition:  │      │
│  │  _default   │  │  2024_01    │      │
│  │  (默认分区) │  │  (2024年1月)│      │
│  └─────────────┘  └─────────────┘      │
│                                         │
│  ┌─────────────┐  ┌─────────────┐      │
│  │ Partition:  │  │ Partition:  │      │
│  │  2024_02    │  │  2024_03    │      │
│  │  (2024年2月)│  │  (2024年3月)│      │
│  └─────────────┘  └─────────────┘      │
└─────────────────────────────────────────┘

1.2 分区的优势 #

text
分区优势:

┌─────────────────────────────────────────┐
│           性能优化                       │
├─────────────────────────────────────────┤
│  - 减少搜索范围                          │
│  - 提高查询效率                          │
│  - 降低内存占用                          │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│           数据管理                       │
├─────────────────────────────────────────┤
│  - 按时间/业务分类                       │
│  - 便于数据清理                          │
│  - 支持数据生命周期管理                  │
└─────────────────────────────────────────┘

二、创建分区 #

2.1 基本创建 #

python
from pymilvus import Collection

collection = Collection("documents")

collection.create_partition(
    partition_name="2024_01",
    description="2024年1月数据"
)

2.2 批量创建 #

python
def create_monthly_partitions(collection, year, months):
    for month in months:
        partition_name = f"{year}_{month:02d}"
        description = f"{year}年{month}月数据"
        collection.create_partition(partition_name, description)
        print(f"创建分区: {partition_name}")

create_monthly_partitions(collection, 2024, range(1, 13))

2.3 按业务创建 #

python
def create_category_partitions(collection, categories):
    for category in categories:
        partition_name = f"category_{category}"
        collection.create_partition(
            partition_name,
            description=f"{category}分类数据"
        )

create_category_partitions(
    collection,
    ["electronics", "books", "clothing", "food"]
)

三、分区操作 #

3.1 查看分区 #

python
print(collection.partitions)

for partition in collection.partitions:
    print(f"分区名: {partition.name}")
    print(f"描述: {partition.description}")
    print(f"实体数: {partition.num_entities}")
    print("-" * 30)

3.2 检查分区存在 #

python
has_partition = collection.has_partition("2024_01")
print(f"分区存在: {has_partition}")

3.3 获取分区对象 #

python
partition = collection.partition("2024_01")

print(f"分区名: {partition.name}")
print(f"描述: {partition.description}")
print(f"实体数: {partition.num_entities}")

3.4 删除分区 #

python
collection.drop_partition("2024_01")

if collection.has_partition("temp_partition"):
    collection.drop_partition("temp_partition")

四、分区数据操作 #

4.1 插入数据到分区 #

python
partition = collection.partition("2024_01")

data = [
    [1, 2, 3],
    ["文档1", "文档2", "文档3"],
    [[0.1]*768, [0.2]*768, [0.3]*768]
]

partition.insert(data)

4.2 自动分区插入 #

python
from datetime import datetime

def insert_with_auto_partition(collection, data, date=None):
    if date is None:
        date = datetime.now()
    
    partition_name = date.strftime("%Y_%m")
    
    if not collection.has_partition(partition_name):
        collection.create_partition(
            partition_name,
            description=f"{date.strftime('%Y年%m月')}数据"
        )
    
    partition = collection.partition(partition_name)
    partition.insert(data)

insert_with_auto_partition(collection, data)

4.3 分区数据统计 #

python
def get_partition_stats(collection):
    stats = {}
    for partition in collection.partitions:
        stats[partition.name] = {
            "description": partition.description,
            "num_entities": partition.num_entities
        }
    return stats

stats = get_partition_stats(collection)
for name, info in stats.items():
    print(f"{name}: {info['num_entities']} 条数据")

五、分区搜索 #

5.1 指定分区搜索 #

python
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}
}

results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    partition_names=["2024_01"]
)

5.2 多分区搜索 #

python
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=10,
    partition_names=["2024_01", "2024_02"]
)

5.3 动态分区搜索 #

python
def search_by_date_range(collection, query_vector, start_date, end_date):
    partitions = []
    current = start_date
    while current <= end_date:
        partition_name = current.strftime("%Y_%m")
        if collection.has_partition(partition_name):
            partitions.append(partition_name)
        
        if current.month == 12:
            current = current.replace(year=current.year + 1, month=1)
        else:
            current = current.replace(month=current.month + 1)
    
    if not partitions:
        return []
    
    return collection.search(
        data=[query_vector],
        anns_field="embedding",
        param={"metric_type": "L2", "params": {"nprobe": 10}},
        limit=10,
        partition_names=partitions
    )

六、分区查询 #

6.1 分区标量查询 #

python
results = collection.query(
    expr='id > 0',
    output_fields=["id", "title"],
    partition_names=["2024_01"]
)

6.2 分区删除 #

python
collection.delete(
    expr='id in [1, 2, 3]',
    partition_name="2024_01"
)

七、分区策略 #

7.1 时间分区策略 #

python
from datetime import datetime, timedelta

class TimePartitionManager:
    def __init__(self, collection, date_format="%Y_%m"):
        self.collection = collection
        self.date_format = date_format
    
    def get_partition_name(self, date):
        return date.strftime(self.date_format)
    
    def create_partition_for_date(self, date):
        partition_name = self.get_partition_name(date)
        if not self.collection.has_partition(partition_name):
            self.collection.create_partition(
                partition_name,
                description=f"{date.strftime('%Y年%m月%d日')}数据"
            )
        return partition_name
    
    def insert_with_date(self, data, date):
        partition_name = self.create_partition_for_date(date)
        partition = self.collection.partition(partition_name)
        partition.insert(data)
    
    def search_in_date_range(self, query_vector, start_date, end_date, **kwargs):
        partitions = []
        current = start_date
        while current <= end_date:
            partition_name = self.get_partition_name(current)
            if self.collection.has_partition(partition_name):
                partitions.append(partition_name)
            current += timedelta(days=1)
        
        if not partitions:
            return []
        
        return self.collection.search(
            data=[query_vector],
            anns_field="embedding",
            param=kwargs.get("param", {"metric_type": "L2"}),
            limit=kwargs.get("limit", 10),
            partition_names=partitions
        )
    
    def cleanup_old_partitions(self, keep_days=90):
        cutoff = datetime.now() - timedelta(days=keep_days)
        
        for partition in self.collection.partitions:
            if partition.name == "_default":
                continue
            
            try:
                partition_date = datetime.strptime(partition.name, self.date_format)
                if partition_date < cutoff:
                    print(f"删除分区: {partition.name}")
                    self.collection.drop_partition(partition.name)
            except ValueError:
                continue

manager = TimePartitionManager(collection)
manager.insert_with_date(data, datetime.now())

7.2 业务分区策略 #

python
class CategoryPartitionManager:
    def __init__(self, collection):
        self.collection = collection
    
    def get_partition_name(self, category):
        return f"category_{category}"
    
    def create_category_partition(self, category):
        partition_name = self.get_partition_name(category)
        if not self.collection.has_partition(partition_name):
            self.collection.create_partition(
                partition_name,
                description=f"{category}分类数据"
            )
        return partition_name
    
    def insert_by_category(self, data, category):
        partition_name = self.create_category_partition(category)
        partition = self.collection.partition(partition_name)
        partition.insert(data)
    
    def search_by_category(self, query_vector, category, **kwargs):
        partition_name = self.get_partition_name(category)
        
        if not self.collection.has_partition(partition_name):
            return []
        
        return self.collection.search(
            data=[query_vector],
            anns_field="embedding",
            param=kwargs.get("param", {"metric_type": "L2"}),
            limit=kwargs.get("limit", 10),
            partition_names=[partition_name]
        )

八、分区最佳实践 #

8.1 分区数量限制 #

text
分区数量建议:

- 单个Collection最大分区数: 4096
- 建议分区数: < 100
- 每个分区建议数据量: 10万-100万

8.2 分区键选择 #

text
分区键选择建议:

时间维度:
├── 按月分区 (推荐)
├── 按日分区 (数据量大)
└── 按年分区 (数据量小)

业务维度:
├── 按类别分区
├── 按地区分区
└── 按用户分区

8.3 分区维护 #

python
def maintain_partitions(collection, keep_days=90):
    manager = TimePartitionManager(collection)
    manager.cleanup_old_partitions(keep_days)
    
    for partition in collection.partitions:
        if partition.num_entities == 0 and partition.name != "_default":
            print(f"清理空分区: {partition.name}")
            collection.drop_partition(partition.name)

九、总结 #

分区操作速查表:

操作 方法
创建分区 collection.create_partition()
查看分区 collection.partitions
检查存在 collection.has_partition()
删除分区 collection.drop_partition()
分区插入 partition.insert()
分区搜索 partition_names参数

下一步,让我们学习混合搜索!

最后更新:2026-04-04