数据删除 #
一、删除概述 #
1.1 删除类型 #
text
删除类型:
┌─────────────────────────────────────────┐
│ 删除方式 │
├─────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────┐ │
│ │ 按主键删除 │ │
│ │ - 指定ID列表 │ │
│ │ - 最快速度 │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ 按条件删除 │ │
│ │ - 使用表达式 │ │
│ │ - 灵活过滤 │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ 分区删除 │ │
│ │ - 删除整个分区 │ │
│ │ - 批量清理 │ │
│ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
1.2 删除流程 #
text
删除流程:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 构建删除条件│────▶│ 执行删除 │────▶│ 刷新数据 │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ 压缩数据 │
│ (可选) │
└──────────┘
二、按主键删除 #
2.1 单条删除 #
python
from pymilvus import Collection
collection = Collection("documents")
collection.load()
expr = 'id == 1'
result = collection.delete(expr)
print(f"删除数量: {result.delete_count}")
2.2 批量删除 #
python
expr = 'id in [1, 2, 3, 4, 5]'
result = collection.delete(expr)
print(f"删除数量: {result.delete_count}")
2.3 大批量删除 #
python
def batch_delete_by_ids(collection, ids, batch_size=1000):
total_deleted = 0
for i in range(0, len(ids), batch_size):
batch_ids = ids[i:i+batch_size]
expr = f'id in {batch_ids}'
result = collection.delete(expr)
total_deleted += result.delete_count
return total_deleted
ids_to_delete = list(range(1, 10001))
deleted = batch_delete_by_ids(collection, ids_to_delete)
print(f"总计删除: {deleted} 条")
三、按条件删除 #
3.1 基本条件删除 #
python
expr = 'category == "deprecated"'
result = collection.delete(expr)
expr = 'created_at < 1704067200'
result = collection.delete(expr)
expr = 'status == "deleted"'
result = collection.delete(expr)
3.2 复合条件删除 #
python
expr = 'category == "test" and created_at < 1704067200'
result = collection.delete(expr)
expr = 'views < 10 and created_at < 1704067200'
result = collection.delete(expr)
3.3 字符串匹配删除 #
python
expr = 'title like "%test%"'
result = collection.delete(expr)
expr = 'title like "temp_%"'
result = collection.delete(expr)
3.4 JSON字段删除 #
python
expr = 'metadata["status"] == "deleted"'
result = collection.delete(expr)
expr = 'json_contains(metadata["tags"], "deprecated")'
result = collection.delete(expr)
3.5 数组字段删除 #
python
expr = 'array_contains(tags, "deprecated")'
result = collection.delete(expr)
expr = 'array_length(tags) == 0'
result = collection.delete(expr)
四、分区删除 #
4.1 删除分区数据 #
python
expr = 'id > 0'
result = collection.delete(expr, partition_name="2024_01")
4.2 删除整个分区 #
python
collection.drop_partition("2024_01")
collection.drop_partition("temp_partition")
4.3 按时间清理分区 #
python
from datetime import datetime, timedelta
def cleanup_old_partitions(collection, keep_days=90):
cutoff_date = datetime.now() - timedelta(days=keep_days)
cutoff_str = cutoff_date.strftime("%Y%m")
partitions = collection.partitions
for partition in partitions:
if partition.name.startswith("partition_"):
partition_date = partition.name.replace("partition_", "")
if partition_date < cutoff_str:
print(f"删除分区: {partition.name}")
collection.drop_partition(partition.name)
cleanup_old_partitions(collection, keep_days=90)
五、删除验证 #
5.1 验证删除结果 #
python
expr = 'id in [1, 2, 3]'
result = collection.delete(expr)
print(f"删除数量: {result.delete_count}")
results = collection.query(expr=expr)
print(f"剩余数量: {len(results)}")
5.2 检查实体数量 #
python
before_count = collection.num_entities
collection.delete('category == "test"')
collection.flush()
after_count = collection.num_entities
print(f"删除数量: {before_count - after_count}")
六、删除后处理 #
6.1 刷新数据 #
python
collection.delete('category == "test"')
collection.flush()
6.2 压缩数据 #
python
collection.delete('id in [1, 2, 3, 4, 5]')
collection.flush()
collection.compact()
6.3 重建索引 #
python
collection.delete('category == "deprecated"')
collection.flush()
collection.compact()
collection.release()
collection.load()
七、安全删除 #
7.1 删除前确认 #
python
def safe_delete(collection, expr, confirm=True):
results = collection.query(expr=expr, output_fields=["id"])
count = len(results)
if count == 0:
print("没有匹配的数据")
return 0
print(f"将删除 {count} 条数据")
if confirm:
response = input("确认删除?(y/n): ")
if response.lower() != 'y':
print("取消删除")
return 0
result = collection.delete(expr)
collection.flush()
return result.delete_count
7.2 软删除模式 #
python
def soft_delete(collection, ids):
for id in ids:
results = collection.query(
expr=f'id == {id}',
output_fields=["*"]
)
if results:
entity = results[0]
entity["deleted_at"] = int(time.time())
entity["status"] = "deleted"
collection.upsert([entity])
def get_active_entities(collection):
return collection.query(
expr='status != "deleted"',
output_fields=["*"]
)
7.3 批量安全删除 #
python
def batch_safe_delete(collection, expr, batch_size=1000):
results = collection.query(expr=expr, output_fields=["id"])
ids = [r["id"] for r in results]
total_deleted = 0
for i in range(0, len(ids), batch_size):
batch_ids = ids[i:i+batch_size]
batch_expr = f'id in {batch_ids}'
result = collection.delete(batch_expr)
total_deleted += result.delete_count
collection.flush()
return total_deleted
八、删除最佳实践 #
8.1 删除策略 #
text
删除策略建议:
┌─────────────────────────────────────────┐
│ 删除场景 │
├─────────────────────────────────────────┤
│ │
│ 少量精确删除 │
│ └── 按主键删除 │
│ │
│ 批量条件删除 │
│ └── 先查询再批量删除 │
│ │
│ 时间分区清理 │
│ └── 删除整个分区 │
│ │
│ 需要恢复能力 │
│ └── 使用软删除 │
│ │
└─────────────────────────────────────────┘
8.2 性能优化 #
python
def optimized_delete(collection, expr):
collection.release()
result = collection.delete(expr)
collection.flush()
collection.compact()
collection.load()
return result.delete_count
8.3 错误处理 #
python
from pymilvus import MilvusException
def safe_delete_with_retry(collection, expr, max_retries=3):
for attempt in range(max_retries):
try:
result = collection.delete(expr)
collection.flush()
return result.delete_count
except MilvusException as e:
print(f"删除失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
raise
九、完整示例 #
9.1 数据清理脚本 #
python
from pymilvus import Collection
from datetime import datetime, timedelta
import time
class DataCleaner:
def __init__(self, collection):
self.collection = collection
def delete_by_ids(self, ids):
if not ids:
return 0
expr = f'id in {ids}'
result = self.collection.delete(expr)
self.collection.flush()
return result.delete_count
def delete_by_condition(self, expr):
results = self.collection.query(expr=expr, output_fields=["id"])
count = len(results)
if count > 0:
result = self.collection.delete(expr)
self.collection.flush()
return result.delete_count
return 0
def delete_old_data(self, days=90):
cutoff = int((datetime.now() - timedelta(days=days)).timestamp())
expr = f'created_at < {cutoff}'
return self.delete_by_condition(expr)
def delete_by_category(self, category):
expr = f'category == "{category}"'
return self.delete_by_condition(expr)
def cleanup_partitions(self, keep_days=90):
cutoff_date = datetime.now() - timedelta(days=keep_days)
cutoff_str = cutoff_date.strftime("%Y%m")
deleted_partitions = []
for partition in self.collection.partitions:
if partition.name.startswith("partition_"):
partition_date = partition.name.replace("partition_", "")
if partition_date < cutoff_str:
self.collection.drop_partition(partition.name)
deleted_partitions.append(partition.name)
return deleted_partitions
def compact_after_delete(self):
self.collection.flush()
self.collection.compact()
cleaner = DataCleaner(collection)
deleted = cleaner.delete_old_data(days=30)
print(f"删除30天前数据: {deleted} 条")
deleted = cleaner.delete_by_category("deprecated")
print(f"删除deprecated分类: {deleted} 条")
cleaner.compact_after_delete()
十、总结 #
删除操作速查表:
| 操作 | 方法 |
|---|---|
| 按主键删除 | collection.delete(‘id == 1’) |
| 批量删除 | collection.delete(‘id in [1,2,3]’) |
| 条件删除 | collection.delete(expr) |
| 分区删除 | collection.drop_partition(name) |
| 刷新 | collection.flush() |
| 压缩 | collection.compact() |
下一步,让我们学习索引管理!
最后更新:2026-04-04