数据插入 #
一、插入概述 #
1.1 插入流程 #
text
数据插入流程:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 准备数据 │────▶│ 插入操作 │────▶│ 刷新数据 │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ 建立索引 │
│ (可选) │
└──────────┘
1.2 数据格式 #
python
data = [
[id1, id2, id3],
[field1_values],
[field2_values],
[embedding_values]
]
二、基本插入 #
2.1 列表格式插入 #
python
from pymilvus import Collection
collection = Collection("documents")
data = [
[1, 2, 3],
["文档标题1", "文档标题2", "文档标题3"],
[[0.1]*768, [0.2]*768, [0.3]*768]
]
result = collection.insert(data)
print(f"插入数量: {result.insert_count}")
print(f"主键列表: {result.primary_keys}")
2.2 字典格式插入 #
python
entities = [
{"id": 1, "title": "文档1", "embedding": [0.1]*768},
{"id": 2, "title": "文档2", "embedding": [0.2]*768},
{"id": 3, "title": "文档3", "embedding": [0.3]*768}
]
result = collection.insert(entities)
2.3 DataFrame格式插入 #
python
import pandas as pd
df = pd.DataFrame({
"id": [1, 2, 3],
"title": ["文档1", "文档2", "文档3"],
"embedding": [[0.1]*768, [0.2]*768, [0.3]*768]
})
result = collection.insert(df.to_dict('records'))
三、批量插入 #
3.1 分批插入 #
python
import numpy as np
def batch_insert(collection, total=10000, batch_size=1000):
for i in range(0, total, batch_size):
batch_data = [
list(range(i, min(i + batch_size, total))),
[f"文档_{j}" for j in range(i, min(i + batch_size, total))],
np.random.rand(min(batch_size, total - i), 768).tolist()
]
collection.insert(batch_data)
print(f"已插入 {min(i + batch_size, total)} 条数据")
batch_insert(collection, total=10000, batch_size=1000)
3.2 大数据量插入 #
python
import numpy as np
from tqdm import tqdm
def large_batch_insert(collection, total=1000000, batch_size=5000):
batches = range(0, total, batch_size)
for i in tqdm(batches):
current_batch_size = min(batch_size, total - i)
batch_data = [
list(range(i, i + current_batch_size)),
[f"doc_{j}" for j in range(i, i + current_batch_size)],
np.random.rand(current_batch_size, 768).astype(np.float32).tolist()
]
collection.insert(batch_data)
collection.flush()
print(f"总计插入 {total} 条数据")
3.3 并行插入 #
python
from concurrent.futures import ThreadPoolExecutor
import numpy as np
def insert_batch(collection, start_idx, batch_size):
batch_data = [
list(range(start_idx, start_idx + batch_size)),
[f"doc_{j}" for j in range(start_idx, start_idx + batch_size)],
np.random.rand(batch_size, 768).tolist()
]
return collection.insert(batch_size)
def parallel_insert(collection, total=100000, batch_size=5000, workers=4):
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i in range(0, total, batch_size):
future = executor.submit(
insert_batch,
collection,
i,
min(batch_size, total - i)
)
futures.append(future)
for future in futures:
future.result()
collection.flush()
四、分区插入 #
4.1 指定分区插入 #
python
partition = collection.partition("2024_01")
data = [
[1, 2, 3],
["文档1", "文档2", "文档3"],
[[0.1]*768, [0.2]*768, [0.3]*768]
]
partition.insert(data)
4.2 按时间分区插入 #
python
from datetime import datetime
def insert_by_date(collection, data, date_str):
partition_name = f"partition_{date_str}"
if not collection.has_partition(partition_name):
collection.create_partition(partition_name)
partition = collection.partition(partition_name)
partition.insert(data)
today = datetime.now().strftime("%Y%m%d")
insert_by_date(collection, data, today)
4.3 按业务分区插入 #
python
def insert_by_category(collection, data, category):
partition_name = f"category_{category}"
if not collection.has_partition(partition_name):
collection.create_partition(partition_name)
partition = collection.partition(partition_name)
partition.insert(data)
insert_by_category(collection, data, "electronics")
insert_by_category(collection, data, "clothing")
五、动态字段插入 #
5.1 启用动态字段 #
python
from pymilvus import CollectionSchema, FieldSchema, DataType
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]
schema = CollectionSchema(
fields=fields,
enable_dynamic_field=True
)
5.2 插入动态字段数据 #
python
data = [
{
"id": 1,
"embedding": [0.1]*768,
"title": "动态标题",
"author": "张三",
"views": 1000,
"tags": ["AI", "ML"]
},
{
"id": 2,
"embedding": [0.2]*768,
"title": "另一个标题",
"category": "技术",
"score": 95.5
}
]
collection.insert(data)
六、Upsert操作 #
6.1 Upsert概念 #
text
Upsert = Update + Insert
┌─────────────────────────────────────────┐
│ Upsert逻辑 │
├─────────────────────────────────────────┤
│ │
│ 主键存在? │
│ ├── 是 → 更新数据 │
│ └── 否 → 插入新数据 │
│ │
└─────────────────────────────────────────┘
6.2 Upsert操作 #
python
data = [
[1, 2],
["更新标题1", "新标题2"],
[[0.5]*768, [0.6]*768]
]
result = collection.upsert(data)
print(f"Upsert数量: {result.insert_count}")
6.3 Upsert vs Insert #
python
def insert_or_update(collection, entity_id, data):
existing = collection.query(
expr=f'id == {entity_id}',
output_fields=['id']
)
if existing:
collection.upsert(data)
print(f"更新实体 {entity_id}")
else:
collection.insert(data)
print(f"插入实体 {entity_id}")
七、数据刷新 #
7.1 刷新操作 #
python
collection.insert(data)
collection.flush()
print(collection.num_entities)
7.2 自动刷新 #
python
collection.insert(data, timeout=60)
collection.flush(timeout=120)
7.3 刷新参数 #
python
collection.flush(
timeout=120,
auto_refresh_duration=30
)
八、插入验证 #
8.1 检查插入结果 #
python
result = collection.insert(data)
print(f"插入数量: {result.insert_count}")
print(f"主键列表: {result.primary_keys}")
print(f"时间戳: {result.timestamp}")
8.2 验证数据 #
python
collection.flush()
count = collection.num_entities
print(f"实体总数: {count}")
results = collection.query(
expr='id in [1, 2, 3]',
output_fields=['id', 'title']
)
print(results)
九、插入最佳实践 #
9.1 批量大小选择 #
text
批量大小建议:
数据量 建议批量大小
──────────────────────────────
< 10万 1000-5000
10万-100万 5000-10000
> 100万 10000-50000
考虑因素:
- 内存限制
- 网络延迟
- 索引构建时间
9.2 数据预处理 #
python
import numpy as np
def preprocess_embeddings(embeddings):
embeddings = np.array(embeddings, dtype=np.float32)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
embeddings = embeddings / norms
return embeddings.tolist()
raw_embeddings = np.random.rand(1000, 768)
processed_embeddings = preprocess_embeddings(raw_embeddings)
9.3 错误处理 #
python
from pymilvus import MilvusException
def safe_insert(collection, data, max_retries=3):
for attempt in range(max_retries):
try:
result = collection.insert(data)
return result
except MilvusException as e:
print(f"插入失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
raise
十、完整示例 #
10.1 文档数据插入 #
python
from pymilvus import (
connections,
FieldSchema,
CollectionSchema,
Collection,
DataType
)
import numpy as np
connections.connect("default", host="localhost", port="19530")
fields = [
FieldSchema(name="doc_id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]
schema = CollectionSchema(fields, "文档集合")
collection = Collection("documents", schema)
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
collection.create_index("embedding", index_params)
collection.load()
def generate_documents(count=1000):
ids = list(range(count))
titles = [f"文档标题_{i}" for i in range(count)]
contents = [f"这是文档 {i} 的内容..." for i in range(count)]
embeddings = np.random.rand(count, 768).astype(np.float32).tolist()
return [ids, titles, contents, embeddings]
data = generate_documents(1000)
result = collection.insert(data)
collection.flush()
print(f"插入 {result.insert_count} 条文档")
print(f"总实体数: {collection.num_entities}")
十一、总结 #
插入操作速查表:
| 操作 | 方法 |
|---|---|
| 基本插入 | collection.insert(data) |
| 批量插入 | 分批调用insert |
| 分区插入 | partition.insert(data) |
| Upsert | collection.upsert(data) |
| 刷新 | collection.flush() |
| 检查数量 | collection.num_entities |
下一步,让我们学习向量搜索!
最后更新:2026-04-04