数据删除 #

一、删除概述 #

1.1 删除类型 #

text
删除类型:

┌─────────────────────────────────────────┐
│           删除方式                       │
├─────────────────────────────────────────┤
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  按主键删除                      │   │
│  │  - 指定ID列表                    │   │
│  │  - 最快速度                      │   │
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  按条件删除                      │   │
│  │  - 使用表达式                    │   │
│  │  - 灵活过滤                      │   │
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  分区删除                        │   │
│  │  - 删除整个分区                  │   │
│  │  - 批量清理                      │   │
│  └─────────────────────────────────┘   │
│                                         │
└─────────────────────────────────────────┘

1.2 删除流程 #

text
删除流程:

┌──────────┐     ┌──────────┐     ┌──────────┐
│ 构建删除条件│────▶│ 执行删除  │────▶│ 刷新数据  │
└──────────┘     └──────────┘     └──────────┘
                        │
                        ▼
                  ┌──────────┐
                  │ 压缩数据  │
                  │ (可选)    │
                  └──────────┘

二、按主键删除 #

2.1 单条删除 #

python
from pymilvus import Collection

collection = Collection("documents")
collection.load()

expr = 'id == 1'
result = collection.delete(expr)

print(f"删除数量: {result.delete_count}")

2.2 批量删除 #

python
expr = 'id in [1, 2, 3, 4, 5]'
result = collection.delete(expr)

print(f"删除数量: {result.delete_count}")

2.3 大批量删除 #

python
def batch_delete_by_ids(collection, ids, batch_size=1000):
    total_deleted = 0
    
    for i in range(0, len(ids), batch_size):
        batch_ids = ids[i:i+batch_size]
        expr = f'id in {batch_ids}'
        result = collection.delete(expr)
        total_deleted += result.delete_count
    
    return total_deleted

ids_to_delete = list(range(1, 10001))
deleted = batch_delete_by_ids(collection, ids_to_delete)
print(f"总计删除: {deleted} 条")

三、按条件删除 #

3.1 基本条件删除 #

python
expr = 'category == "deprecated"'
result = collection.delete(expr)

expr = 'created_at < 1704067200'
result = collection.delete(expr)

expr = 'status == "deleted"'
result = collection.delete(expr)

3.2 复合条件删除 #

python
expr = 'category == "test" and created_at < 1704067200'
result = collection.delete(expr)

expr = 'views < 10 and created_at < 1704067200'
result = collection.delete(expr)

3.3 字符串匹配删除 #

python
expr = 'title like "%test%"'
result = collection.delete(expr)

expr = 'title like "temp_%"'
result = collection.delete(expr)

3.4 JSON字段删除 #

python
expr = 'metadata["status"] == "deleted"'
result = collection.delete(expr)

expr = 'json_contains(metadata["tags"], "deprecated")'
result = collection.delete(expr)

3.5 数组字段删除 #

python
expr = 'array_contains(tags, "deprecated")'
result = collection.delete(expr)

expr = 'array_length(tags) == 0'
result = collection.delete(expr)

四、分区删除 #

4.1 删除分区数据 #

python
expr = 'id > 0'
result = collection.delete(expr, partition_name="2024_01")

4.2 删除整个分区 #

python
collection.drop_partition("2024_01")

collection.drop_partition("temp_partition")

4.3 按时间清理分区 #

python
from datetime import datetime, timedelta

def cleanup_old_partitions(collection, keep_days=90):
    cutoff_date = datetime.now() - timedelta(days=keep_days)
    cutoff_str = cutoff_date.strftime("%Y%m")
    
    partitions = collection.partitions
    for partition in partitions:
        if partition.name.startswith("partition_"):
            partition_date = partition.name.replace("partition_", "")
            if partition_date < cutoff_str:
                print(f"删除分区: {partition.name}")
                collection.drop_partition(partition.name)

cleanup_old_partitions(collection, keep_days=90)

五、删除验证 #

5.1 验证删除结果 #

python
expr = 'id in [1, 2, 3]'
result = collection.delete(expr)

print(f"删除数量: {result.delete_count}")

results = collection.query(expr=expr)
print(f"剩余数量: {len(results)}")

5.2 检查实体数量 #

python
before_count = collection.num_entities

collection.delete('category == "test"')
collection.flush()

after_count = collection.num_entities
print(f"删除数量: {before_count - after_count}")

六、删除后处理 #

6.1 刷新数据 #

python
collection.delete('category == "test"')

collection.flush()

6.2 压缩数据 #

python
collection.delete('id in [1, 2, 3, 4, 5]')
collection.flush()

collection.compact()

6.3 重建索引 #

python
collection.delete('category == "deprecated"')
collection.flush()

collection.compact()

collection.release()
collection.load()

七、安全删除 #

7.1 删除前确认 #

python
def safe_delete(collection, expr, confirm=True):
    results = collection.query(expr=expr, output_fields=["id"])
    count = len(results)
    
    if count == 0:
        print("没有匹配的数据")
        return 0
    
    print(f"将删除 {count} 条数据")
    
    if confirm:
        response = input("确认删除?(y/n): ")
        if response.lower() != 'y':
            print("取消删除")
            return 0
    
    result = collection.delete(expr)
    collection.flush()
    
    return result.delete_count

7.2 软删除模式 #

python
def soft_delete(collection, ids):
    for id in ids:
        results = collection.query(
            expr=f'id == {id}',
            output_fields=["*"]
        )
        
        if results:
            entity = results[0]
            entity["deleted_at"] = int(time.time())
            entity["status"] = "deleted"
            collection.upsert([entity])

def get_active_entities(collection):
    return collection.query(
        expr='status != "deleted"',
        output_fields=["*"]
    )

7.3 批量安全删除 #

python
def batch_safe_delete(collection, expr, batch_size=1000):
    results = collection.query(expr=expr, output_fields=["id"])
    ids = [r["id"] for r in results]
    
    total_deleted = 0
    for i in range(0, len(ids), batch_size):
        batch_ids = ids[i:i+batch_size]
        batch_expr = f'id in {batch_ids}'
        result = collection.delete(batch_expr)
        total_deleted += result.delete_count
    
    collection.flush()
    return total_deleted

八、删除最佳实践 #

8.1 删除策略 #

text
删除策略建议:

┌─────────────────────────────────────────┐
│           删除场景                       │
├─────────────────────────────────────────┤
│                                         │
│  少量精确删除                            │
│  └── 按主键删除                         │
│                                         │
│  批量条件删除                            │
│  └── 先查询再批量删除                   │
│                                         │
│  时间分区清理                            │
│  └── 删除整个分区                       │
│                                         │
│  需要恢复能力                            │
│  └── 使用软删除                         │
│                                         │
└─────────────────────────────────────────┘

8.2 性能优化 #

python
def optimized_delete(collection, expr):
    collection.release()
    
    result = collection.delete(expr)
    
    collection.flush()
    collection.compact()
    
    collection.load()
    
    return result.delete_count

8.3 错误处理 #

python
from pymilvus import MilvusException

def safe_delete_with_retry(collection, expr, max_retries=3):
    for attempt in range(max_retries):
        try:
            result = collection.delete(expr)
            collection.flush()
            return result.delete_count
        except MilvusException as e:
            print(f"删除失败 (尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt == max_retries - 1:
                raise

九、完整示例 #

9.1 数据清理脚本 #

python
from pymilvus import Collection
from datetime import datetime, timedelta
import time

class DataCleaner:
    def __init__(self, collection):
        self.collection = collection
    
    def delete_by_ids(self, ids):
        if not ids:
            return 0
        
        expr = f'id in {ids}'
        result = self.collection.delete(expr)
        self.collection.flush()
        return result.delete_count
    
    def delete_by_condition(self, expr):
        results = self.collection.query(expr=expr, output_fields=["id"])
        count = len(results)
        
        if count > 0:
            result = self.collection.delete(expr)
            self.collection.flush()
            return result.delete_count
        return 0
    
    def delete_old_data(self, days=90):
        cutoff = int((datetime.now() - timedelta(days=days)).timestamp())
        expr = f'created_at < {cutoff}'
        return self.delete_by_condition(expr)
    
    def delete_by_category(self, category):
        expr = f'category == "{category}"'
        return self.delete_by_condition(expr)
    
    def cleanup_partitions(self, keep_days=90):
        cutoff_date = datetime.now() - timedelta(days=keep_days)
        cutoff_str = cutoff_date.strftime("%Y%m")
        
        deleted_partitions = []
        for partition in self.collection.partitions:
            if partition.name.startswith("partition_"):
                partition_date = partition.name.replace("partition_", "")
                if partition_date < cutoff_str:
                    self.collection.drop_partition(partition.name)
                    deleted_partitions.append(partition.name)
        
        return deleted_partitions
    
    def compact_after_delete(self):
        self.collection.flush()
        self.collection.compact()

cleaner = DataCleaner(collection)

deleted = cleaner.delete_old_data(days=30)
print(f"删除30天前数据: {deleted} 条")

deleted = cleaner.delete_by_category("deprecated")
print(f"删除deprecated分类: {deleted} 条")

cleaner.compact_after_delete()

十、总结 #

删除操作速查表:

操作 方法
按主键删除 collection.delete(‘id == 1’)
批量删除 collection.delete(‘id in [1,2,3]’)
条件删除 collection.delete(expr)
分区删除 collection.drop_partition(name)
刷新 collection.flush()
压缩 collection.compact()

下一步,让我们学习索引管理!

最后更新:2026-04-04