数据插入 #

一、插入概述 #

1.1 插入流程 #

text
数据插入流程:

┌──────────┐     ┌──────────┐     ┌──────────┐
│  准备数据 │────▶│  插入操作 │────▶│  刷新数据 │
└──────────┘     └──────────┘     └──────────┘
                       │
                       ▼
                 ┌──────────┐
                 │ 建立索引  │
                 │ (可选)    │
                 └──────────┘

1.2 数据格式 #

python
data = [
    [id1, id2, id3],
    [field1_values],
    [field2_values],
    [embedding_values]
]

二、基本插入 #

2.1 列表格式插入 #

python
from pymilvus import Collection

collection = Collection("documents")

data = [
    [1, 2, 3],
    ["文档标题1", "文档标题2", "文档标题3"],
    [[0.1]*768, [0.2]*768, [0.3]*768]
]

result = collection.insert(data)

print(f"插入数量: {result.insert_count}")
print(f"主键列表: {result.primary_keys}")

2.2 字典格式插入 #

python
entities = [
    {"id": 1, "title": "文档1", "embedding": [0.1]*768},
    {"id": 2, "title": "文档2", "embedding": [0.2]*768},
    {"id": 3, "title": "文档3", "embedding": [0.3]*768}
]

result = collection.insert(entities)

2.3 DataFrame格式插入 #

python
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2, 3],
    "title": ["文档1", "文档2", "文档3"],
    "embedding": [[0.1]*768, [0.2]*768, [0.3]*768]
})

result = collection.insert(df.to_dict('records'))

三、批量插入 #

3.1 分批插入 #

python
import numpy as np

def batch_insert(collection, total=10000, batch_size=1000):
    for i in range(0, total, batch_size):
        batch_data = [
            list(range(i, min(i + batch_size, total))),
            [f"文档_{j}" for j in range(i, min(i + batch_size, total))],
            np.random.rand(min(batch_size, total - i), 768).tolist()
        ]
        collection.insert(batch_data)
        print(f"已插入 {min(i + batch_size, total)} 条数据")

batch_insert(collection, total=10000, batch_size=1000)

3.2 大数据量插入 #

python
import numpy as np
from tqdm import tqdm

def large_batch_insert(collection, total=1000000, batch_size=5000):
    batches = range(0, total, batch_size)
    
    for i in tqdm(batches):
        current_batch_size = min(batch_size, total - i)
        
        batch_data = [
            list(range(i, i + current_batch_size)),
            [f"doc_{j}" for j in range(i, i + current_batch_size)],
            np.random.rand(current_batch_size, 768).astype(np.float32).tolist()
        ]
        
        collection.insert(batch_data)
    
    collection.flush()
    print(f"总计插入 {total} 条数据")

3.3 并行插入 #

python
from concurrent.futures import ThreadPoolExecutor
import numpy as np

def insert_batch(collection, start_idx, batch_size):
    batch_data = [
        list(range(start_idx, start_idx + batch_size)),
        [f"doc_{j}" for j in range(start_idx, start_idx + batch_size)],
        np.random.rand(batch_size, 768).tolist()
    ]
    return collection.insert(batch_size)

def parallel_insert(collection, total=100000, batch_size=5000, workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for i in range(0, total, batch_size):
            future = executor.submit(
                insert_batch, 
                collection, 
                i, 
                min(batch_size, total - i)
            )
            futures.append(future)
        
        for future in futures:
            future.result()
    
    collection.flush()

四、分区插入 #

4.1 指定分区插入 #

python
partition = collection.partition("2024_01")

data = [
    [1, 2, 3],
    ["文档1", "文档2", "文档3"],
    [[0.1]*768, [0.2]*768, [0.3]*768]
]

partition.insert(data)

4.2 按时间分区插入 #

python
from datetime import datetime

def insert_by_date(collection, data, date_str):
    partition_name = f"partition_{date_str}"
    
    if not collection.has_partition(partition_name):
        collection.create_partition(partition_name)
    
    partition = collection.partition(partition_name)
    partition.insert(data)

today = datetime.now().strftime("%Y%m%d")
insert_by_date(collection, data, today)

4.3 按业务分区插入 #

python
def insert_by_category(collection, data, category):
    partition_name = f"category_{category}"
    
    if not collection.has_partition(partition_name):
        collection.create_partition(partition_name)
    
    partition = collection.partition(partition_name)
    partition.insert(data)

insert_by_category(collection, data, "electronics")
insert_by_category(collection, data, "clothing")

五、动态字段插入 #

5.1 启用动态字段 #

python
from pymilvus import CollectionSchema, FieldSchema, DataType

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]

schema = CollectionSchema(
    fields=fields,
    enable_dynamic_field=True
)

5.2 插入动态字段数据 #

python
data = [
    {
        "id": 1,
        "embedding": [0.1]*768,
        "title": "动态标题",
        "author": "张三",
        "views": 1000,
        "tags": ["AI", "ML"]
    },
    {
        "id": 2,
        "embedding": [0.2]*768,
        "title": "另一个标题",
        "category": "技术",
        "score": 95.5
    }
]

collection.insert(data)

六、Upsert操作 #

6.1 Upsert概念 #

text
Upsert = Update + Insert

┌─────────────────────────────────────────┐
│              Upsert逻辑                  │
├─────────────────────────────────────────┤
│                                         │
│  主键存在?                              │
│  ├── 是 → 更新数据                      │
│  └── 否 → 插入新数据                    │
│                                         │
└─────────────────────────────────────────┘

6.2 Upsert操作 #

python
data = [
    [1, 2],
    ["更新标题1", "新标题2"],
    [[0.5]*768, [0.6]*768]
]

result = collection.upsert(data)
print(f"Upsert数量: {result.insert_count}")

6.3 Upsert vs Insert #

python
def insert_or_update(collection, entity_id, data):
    existing = collection.query(
        expr=f'id == {entity_id}',
        output_fields=['id']
    )
    
    if existing:
        collection.upsert(data)
        print(f"更新实体 {entity_id}")
    else:
        collection.insert(data)
        print(f"插入实体 {entity_id}")

七、数据刷新 #

7.1 刷新操作 #

python
collection.insert(data)

collection.flush()

print(collection.num_entities)

7.2 自动刷新 #

python
collection.insert(data, timeout=60)

collection.flush(timeout=120)

7.3 刷新参数 #

python
collection.flush(
    timeout=120,
    auto_refresh_duration=30
)

八、插入验证 #

8.1 检查插入结果 #

python
result = collection.insert(data)

print(f"插入数量: {result.insert_count}")
print(f"主键列表: {result.primary_keys}")
print(f"时间戳: {result.timestamp}")

8.2 验证数据 #

python
collection.flush()

count = collection.num_entities
print(f"实体总数: {count}")

results = collection.query(
    expr='id in [1, 2, 3]',
    output_fields=['id', 'title']
)
print(results)

九、插入最佳实践 #

9.1 批量大小选择 #

text
批量大小建议:

数据量              建议批量大小
──────────────────────────────
< 10万              1000-5000
10万-100万          5000-10000
> 100万             10000-50000

考虑因素:
- 内存限制
- 网络延迟
- 索引构建时间

9.2 数据预处理 #

python
import numpy as np

def preprocess_embeddings(embeddings):
    embeddings = np.array(embeddings, dtype=np.float32)
    
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings = embeddings / norms
    
    return embeddings.tolist()

raw_embeddings = np.random.rand(1000, 768)
processed_embeddings = preprocess_embeddings(raw_embeddings)

9.3 错误处理 #

python
from pymilvus import MilvusException

def safe_insert(collection, data, max_retries=3):
    for attempt in range(max_retries):
        try:
            result = collection.insert(data)
            return result
        except MilvusException as e:
            print(f"插入失败 (尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt == max_retries - 1:
                raise

十、完整示例 #

10.1 文档数据插入 #

python
from pymilvus import (
    connections,
    FieldSchema,
    CollectionSchema,
    Collection,
    DataType
)
import numpy as np

connections.connect("default", host="localhost", port="19530")

fields = [
    FieldSchema(name="doc_id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
]

schema = CollectionSchema(fields, "文档集合")
collection = Collection("documents", schema)

index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index("embedding", index_params)
collection.load()

def generate_documents(count=1000):
    ids = list(range(count))
    titles = [f"文档标题_{i}" for i in range(count)]
    contents = [f"这是文档 {i} 的内容..." for i in range(count)]
    embeddings = np.random.rand(count, 768).astype(np.float32).tolist()
    
    return [ids, titles, contents, embeddings]

data = generate_documents(1000)
result = collection.insert(data)
collection.flush()

print(f"插入 {result.insert_count} 条文档")
print(f"总实体数: {collection.num_entities}")

十一、总结 #

插入操作速查表:

操作 方法
基本插入 collection.insert(data)
批量插入 分批调用insert
分区插入 partition.insert(data)
Upsert collection.upsert(data)
刷新 collection.flush()
检查数量 collection.num_entities

下一步,让我们学习向量搜索!

最后更新:2026-04-04