基础语法 #

一、PyMilvus SDK概述 #

1.1 SDK结构 #

text
PyMilvus SDK模块结构:

┌─────────────────────────────────────────┐
│              pymilvus                    │
├─────────────────────────────────────────┤
│  ├── connections      连接管理           │
│  ├── Collection       集合操作           │
│  ├── Partition        分区操作           │
│  ├── Index            索引操作           │
│  ├── utility          工具函数           │
│  ├── db               数据库操作         │
│  ├── schema           Schema定义         │
│  ├── types            类型定义           │
│  └── orm              ORM接口            │
└─────────────────────────────────────────┘

1.2 导入模块 #

python
from pymilvus import (
    connections,
    Collection,
    Partition,
    FieldSchema,
    CollectionSchema,
    DataType,
    utility,
    db
)

二、连接管理 #

2.1 建立连接 #

python
from pymilvus import connections

connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="root",
    password="Milvus",
    timeout=10
)

2.2 连接参数 #

参数 类型 说明
alias str 连接别名
host str Milvus服务器地址
port str 端口号
user str 用户名
password str 密码
timeout float 超时时间(秒)
secure bool 是否使用TLS
db_name str 数据库名

2.3 断开连接 #

python
connections.disconnect("default")

connections.remove_connection("default")

2.4 查看连接 #

python
print(connections.list_connections())

print(connections.get_connection_addr("default"))

三、数据库操作 #

3.1 创建数据库 #

python
from pymilvus import db

db.create_database("my_database")

3.2 切换数据库 #

python
db.using_database("my_database")

connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    db_name="my_database"
)

3.3 列出数据库 #

python
print(db.list_databases())

3.4 删除数据库 #

python
db.drop_database("my_database")

四、Collection操作 #

4.1 定义Schema #

python
from pymilvus import FieldSchema, CollectionSchema, DataType

fields = [
    FieldSchema(
        name="id",
        dtype=DataType.INT64,
        is_primary=True,
        auto_id=False
    ),
    FieldSchema(
        name="title",
        dtype=DataType.VARCHAR,
        max_length=256
    ),
    FieldSchema(
        name="embedding",
        dtype=DataType.FLOAT_VECTOR,
        dim=768
    )
]

schema = CollectionSchema(
    fields=fields,
    description="文档集合",
    enable_dynamic_field=True
)

4.2 创建Collection #

python
from pymilvus import Collection

collection = Collection(
    name="documents",
    schema=schema,
    using="default",
    shards_num=2
)

4.3 查看Collection #

python
from pymilvus import utility

print(utility.list_collections())

print(collection.schema)

print(collection.description)

print(collection.num_entities)

4.4 加载和释放 #

python
collection.load()

collection.release()

print(collection.is_loaded)

4.5 删除Collection #

python
utility.drop_collection("documents")

五、数据插入 #

5.1 基本插入 #

python
data = [
    [1, 2, 3],
    ["文档1", "文档2", "文档3"],
    [[0.1]*768, [0.2]*768, [0.3]*768]
]

result = collection.insert(data)
print(result.insert_count)

5.2 字典格式插入 #

python
entities = [
    {"id": 1, "title": "文档1", "embedding": [0.1]*768},
    {"id": 2, "title": "文档2", "embedding": [0.2]*768}
]

collection.insert(entities)

5.3 动态字段插入 #

python
entities = [
    {
        "id": 1,
        "title": "文档1",
        "embedding": [0.1]*768,
        "author": "张三",
        "tags": ["技术", "AI"]
    }
]

collection.insert(entities)

5.4 分区插入 #

python
partition = collection.partition("2024_01")

data = [
    [1, 2],
    ["文档1", "文档2"],
    [[0.1]*768, [0.2]*768]
]

partition.insert(data)

5.5 批量插入 #

python
import numpy as np

batch_size = 1000
total = 10000

for i in range(0, total, batch_size):
    batch_data = [
        list(range(i, i + batch_size)),
        [f"文档{j}" for j in range(i, i + batch_size)],
        np.random.rand(batch_size, 768).tolist()
    ]
    collection.insert(batch_data)

六、向量搜索 #

6.1 基本搜索 #

python
collection.load()

search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}
}

results = collection.search(
    data=[[0.1]*768],
    anns_field="embedding",
    param=search_params,
    limit=10
)

for hits in results:
    for hit in hits:
        print(f"ID: {hit.id}, Distance: {hit.distance}")

6.2 搜索参数 #

python
search_params = {
    "metric_type": "L2",
    "params": {
        "nprobe": 16,
        "offset": 0,
        "radius": 1.0,
        "range_filter": 0.5
    }
}

6.3 带过滤搜索 #

python
results = collection.search(
    data=[[0.1]*768],
    anns_field="embedding",
    param=search_params,
    limit=10,
    expr='title like "%AI%"',
    output_fields=["title", "author"]
)

6.4 分区搜索 #

python
results = collection.search(
    data=[[0.1]*768],
    anns_field="embedding",
    param=search_params,
    limit=10,
    partition_names=["2024_01"]
)

6.5 批量搜索 #

python
query_vectors = [
    [0.1]*768,
    [0.2]*768,
    [0.3]*768
]

results = collection.search(
    data=query_vectors,
    anns_field="embedding",
    param=search_params,
    limit=10
)

七、标量查询 #

7.1 基本查询 #

python
results = collection.query(
    expr='id in [1, 2, 3]',
    output_fields=["id", "title", "embedding"]
)

for result in results:
    print(result)

7.2 条件表达式 #

python
results = collection.query(
    expr='title like "%AI%" and id > 10',
    output_fields=["id", "title"]
)

results = collection.query(
    expr='author in ["张三", "李四"]',
    output_fields=["id", "title", "author"]
)

7.3 分页查询 #

python
results = collection.query(
    expr='id > 0',
    output_fields=["id", "title"],
    offset=0,
    limit=100
)

7.4 分区查询 #

python
results = collection.query(
    expr='id > 0',
    output_fields=["id", "title"],
    partition_names=["2024_01"]
)

八、数据更新 #

8.1 Upsert操作 #

python
data = [
    [1],
    ["更新后的标题"],
    [[0.5]*768]
]

collection.upsert(data)

8.2 删除数据 #

python
expr = 'id in [1, 2, 3]'
collection.delete(expr)

expr = 'title like "%test%"'
collection.delete(expr)

九、索引操作 #

9.1 创建索引 #

python
index_params = {
    "metric_type": "L2",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}

collection.create_index(
    field_name="embedding",
    index_params=index_params,
    index_name="embedding_idx"
)

9.2 查看索引 #

python
print(collection.indexes)

for index in collection.indexes:
    print(f"Field: {index.field_name}")
    print(f"Params: {index.params}")

9.3 删除索引 #

python
collection.drop_index(index_name="embedding_idx")

十、分区操作 #

10.1 创建分区 #

python
collection.create_partition("2024_01", description="2024年1月数据")

10.2 查看分区 #

python
print(collection.partitions)

for partition in collection.partitions:
    print(f"Name: {partition.name}")
    print(f"Description: {partition.description}")
    print(f"Num entities: {partition.num_entities}")

10.3 删除分区 #

python
collection.drop_partition("2024_01")

十一、表达式语法 #

11.1 比较运算符 #

python
expr = 'id == 1'
expr = 'id != 1'
expr = 'id > 10'
expr = 'id >= 10'
expr = 'id < 100'
expr = 'id <= 100'

11.2 逻辑运算符 #

python
expr = 'id > 10 and id < 100'
expr = 'id < 10 or id > 100'
expr = 'not (id == 50)'

11.3 集合运算 #

python
expr = 'id in [1, 2, 3]'
expr = 'id not in [1, 2, 3]'

11.4 字符串操作 #

python
expr = 'title like "%AI%"'
expr = 'title like "AI%"'
expr = 'title like "%AI"'

11.5 JSON操作 #

python
expr = 'metadata["category"] == "tech"'
expr = 'json_contains(tags, "AI")'
expr = 'json_contains_all(tags, ["AI", "ML"])'
expr = 'json_contains_any(tags, ["AI", "ML"])'

11.6 数组操作 #

python
expr = 'array_length(tags) > 2'
expr = 'array_contains(tags, "AI")'

十二、命名规范 #

12.1 Collection命名 #

text
规范:
- 只包含字母、数字、下划线
- 以字母或下划线开头
- 长度1-255字符
- 不区分大小写

正确示例:
- products
- user_profiles
- _temp_collection

错误示例:
- 123collection
- my-collection
- collection name

12.2 字段命名 #

text
规范:
- 只包含字母、数字、下划线
- 以字母开头
- 长度1-255字符
- 不能与保留字冲突

正确示例:
- id
- user_name
- embedding_768

错误示例:
- 123field
- field-name
- from (保留字)

12.3 分区命名 #

text
规范:
- 只包含字母、数字、下划线
- 以字母或下划线开头
- 长度1-255字符
- 不能使用 _default (保留)

正确示例:
- 2024_01
- region_beijing
- category_electronics

十三、错误处理 #

13.1 常见错误 #

python
from pymilvus import MilvusException

try:
    collection = Collection("non_existent")
except MilvusException as e:
    print(f"Error code: {e.code}")
    print(f"Error message: {e.message}")

13.2 错误码 #

错误码 说明
1 内部错误
2 未知错误
3 连接错误
4 超时错误
5 已存在
6 不存在
7 参数错误
8 权限错误

十四、最佳实践 #

14.1 连接管理 #

python
from pymilvus import connections
from contextlib import contextmanager

@contextmanager
def milvus_connection(alias, host, port):
    try:
        connections.connect(alias, host=host, port=port)
        yield
    finally:
        connections.disconnect(alias)

with milvus_connection("default", "localhost", "19530"):
    pass

14.2 批量操作 #

python
def batch_insert(collection, data, batch_size=1000):
    total = len(data[0])
    for i in range(0, total, batch_size):
        batch = [field[i:i+batch_size] for field in data]
        collection.insert(batch)
    collection.flush()

14.3 异常处理 #

python
from pymilvus import MilvusException
import time

def search_with_retry(collection, vectors, max_retries=3):
    for attempt in range(max_retries):
        try:
            return collection.search(
                data=vectors,
                anns_field="embedding",
                param={"metric_type": "L2", "params": {"nprobe": 10}},
                limit=10
            )
        except MilvusException as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

十五、总结 #

常用操作速查表:

操作 方法
连接 connections.connect()
创建Collection Collection(name, schema)
插入数据 collection.insert(data)
搜索 collection.search()
查询 collection.query()
删除 collection.delete(expr)
创建索引 collection.create_index()
创建分区 collection.create_partition()

下一步,让我们学习数据管理操作!

最后更新:2026-04-04