基础语法 #
一、PyMilvus SDK概述 #
1.1 SDK结构 #
text
PyMilvus SDK模块结构:
┌─────────────────────────────────────────┐
│ pymilvus │
├─────────────────────────────────────────┤
│ ├── connections 连接管理 │
│ ├── Collection 集合操作 │
│ ├── Partition 分区操作 │
│ ├── Index 索引操作 │
│ ├── utility 工具函数 │
│ ├── db 数据库操作 │
│ ├── schema Schema定义 │
│ ├── types 类型定义 │
│ └── orm ORM接口 │
└─────────────────────────────────────────┘
1.2 导入模块 #
python
from pymilvus import (
connections,
Collection,
Partition,
FieldSchema,
CollectionSchema,
DataType,
utility,
db
)
二、连接管理 #
2.1 建立连接 #
python
from pymilvus import connections
connections.connect(
alias="default",
host="localhost",
port="19530",
user="root",
password="Milvus",
timeout=10
)
2.2 连接参数 #
| 参数 | 类型 | 说明 |
|---|---|---|
| alias | str | 连接别名 |
| host | str | Milvus服务器地址 |
| port | str | 端口号 |
| user | str | 用户名 |
| password | str | 密码 |
| timeout | float | 超时时间(秒) |
| secure | bool | 是否使用TLS |
| db_name | str | 数据库名 |
2.3 断开连接 #
python
connections.disconnect("default")
connections.remove_connection("default")
2.4 查看连接 #
python
print(connections.list_connections())
print(connections.get_connection_addr("default"))
三、数据库操作 #
3.1 创建数据库 #
python
from pymilvus import db
db.create_database("my_database")
3.2 切换数据库 #
python
db.using_database("my_database")
connections.connect(
alias="default",
host="localhost",
port="19530",
db_name="my_database"
)
3.3 列出数据库 #
python
print(db.list_databases())
3.4 删除数据库 #
python
db.drop_database("my_database")
四、Collection操作 #
4.1 定义Schema #
python
from pymilvus import FieldSchema, CollectionSchema, DataType
fields = [
FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=False
),
FieldSchema(
name="title",
dtype=DataType.VARCHAR,
max_length=256
),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=768
)
]
schema = CollectionSchema(
fields=fields,
description="文档集合",
enable_dynamic_field=True
)
4.2 创建Collection #
python
from pymilvus import Collection
collection = Collection(
name="documents",
schema=schema,
using="default",
shards_num=2
)
4.3 查看Collection #
python
from pymilvus import utility
print(utility.list_collections())
print(collection.schema)
print(collection.description)
print(collection.num_entities)
4.4 加载和释放 #
python
collection.load()
collection.release()
print(collection.is_loaded)
4.5 删除Collection #
python
utility.drop_collection("documents")
五、数据插入 #
5.1 基本插入 #
python
data = [
[1, 2, 3],
["文档1", "文档2", "文档3"],
[[0.1]*768, [0.2]*768, [0.3]*768]
]
result = collection.insert(data)
print(result.insert_count)
5.2 字典格式插入 #
python
entities = [
{"id": 1, "title": "文档1", "embedding": [0.1]*768},
{"id": 2, "title": "文档2", "embedding": [0.2]*768}
]
collection.insert(entities)
5.3 动态字段插入 #
python
entities = [
{
"id": 1,
"title": "文档1",
"embedding": [0.1]*768,
"author": "张三",
"tags": ["技术", "AI"]
}
]
collection.insert(entities)
5.4 分区插入 #
python
partition = collection.partition("2024_01")
data = [
[1, 2],
["文档1", "文档2"],
[[0.1]*768, [0.2]*768]
]
partition.insert(data)
5.5 批量插入 #
python
import numpy as np
batch_size = 1000
total = 10000
for i in range(0, total, batch_size):
batch_data = [
list(range(i, i + batch_size)),
[f"文档{j}" for j in range(i, i + batch_size)],
np.random.rand(batch_size, 768).tolist()
]
collection.insert(batch_data)
六、向量搜索 #
6.1 基本搜索 #
python
collection.load()
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10}
}
results = collection.search(
data=[[0.1]*768],
anns_field="embedding",
param=search_params,
limit=10
)
for hits in results:
for hit in hits:
print(f"ID: {hit.id}, Distance: {hit.distance}")
6.2 搜索参数 #
python
search_params = {
"metric_type": "L2",
"params": {
"nprobe": 16,
"offset": 0,
"radius": 1.0,
"range_filter": 0.5
}
}
6.3 带过滤搜索 #
python
results = collection.search(
data=[[0.1]*768],
anns_field="embedding",
param=search_params,
limit=10,
expr='title like "%AI%"',
output_fields=["title", "author"]
)
6.4 分区搜索 #
python
results = collection.search(
data=[[0.1]*768],
anns_field="embedding",
param=search_params,
limit=10,
partition_names=["2024_01"]
)
6.5 批量搜索 #
python
query_vectors = [
[0.1]*768,
[0.2]*768,
[0.3]*768
]
results = collection.search(
data=query_vectors,
anns_field="embedding",
param=search_params,
limit=10
)
七、标量查询 #
7.1 基本查询 #
python
results = collection.query(
expr='id in [1, 2, 3]',
output_fields=["id", "title", "embedding"]
)
for result in results:
print(result)
7.2 条件表达式 #
python
results = collection.query(
expr='title like "%AI%" and id > 10',
output_fields=["id", "title"]
)
results = collection.query(
expr='author in ["张三", "李四"]',
output_fields=["id", "title", "author"]
)
7.3 分页查询 #
python
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
offset=0,
limit=100
)
7.4 分区查询 #
python
results = collection.query(
expr='id > 0',
output_fields=["id", "title"],
partition_names=["2024_01"]
)
八、数据更新 #
8.1 Upsert操作 #
python
data = [
[1],
["更新后的标题"],
[[0.5]*768]
]
collection.upsert(data)
8.2 删除数据 #
python
expr = 'id in [1, 2, 3]'
collection.delete(expr)
expr = 'title like "%test%"'
collection.delete(expr)
九、索引操作 #
9.1 创建索引 #
python
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
collection.create_index(
field_name="embedding",
index_params=index_params,
index_name="embedding_idx"
)
9.2 查看索引 #
python
print(collection.indexes)
for index in collection.indexes:
print(f"Field: {index.field_name}")
print(f"Params: {index.params}")
9.3 删除索引 #
python
collection.drop_index(index_name="embedding_idx")
十、分区操作 #
10.1 创建分区 #
python
collection.create_partition("2024_01", description="2024年1月数据")
10.2 查看分区 #
python
print(collection.partitions)
for partition in collection.partitions:
print(f"Name: {partition.name}")
print(f"Description: {partition.description}")
print(f"Num entities: {partition.num_entities}")
10.3 删除分区 #
python
collection.drop_partition("2024_01")
十一、表达式语法 #
11.1 比较运算符 #
python
expr = 'id == 1'
expr = 'id != 1'
expr = 'id > 10'
expr = 'id >= 10'
expr = 'id < 100'
expr = 'id <= 100'
11.2 逻辑运算符 #
python
expr = 'id > 10 and id < 100'
expr = 'id < 10 or id > 100'
expr = 'not (id == 50)'
11.3 集合运算 #
python
expr = 'id in [1, 2, 3]'
expr = 'id not in [1, 2, 3]'
11.4 字符串操作 #
python
expr = 'title like "%AI%"'
expr = 'title like "AI%"'
expr = 'title like "%AI"'
11.5 JSON操作 #
python
expr = 'metadata["category"] == "tech"'
expr = 'json_contains(tags, "AI")'
expr = 'json_contains_all(tags, ["AI", "ML"])'
expr = 'json_contains_any(tags, ["AI", "ML"])'
11.6 数组操作 #
python
expr = 'array_length(tags) > 2'
expr = 'array_contains(tags, "AI")'
十二、命名规范 #
12.1 Collection命名 #
text
规范:
- 只包含字母、数字、下划线
- 以字母或下划线开头
- 长度1-255字符
- 不区分大小写
正确示例:
- products
- user_profiles
- _temp_collection
错误示例:
- 123collection
- my-collection
- collection name
12.2 字段命名 #
text
规范:
- 只包含字母、数字、下划线
- 以字母开头
- 长度1-255字符
- 不能与保留字冲突
正确示例:
- id
- user_name
- embedding_768
错误示例:
- 123field
- field-name
- from (保留字)
12.3 分区命名 #
text
规范:
- 只包含字母、数字、下划线
- 以字母或下划线开头
- 长度1-255字符
- 不能使用 _default (保留)
正确示例:
- 2024_01
- region_beijing
- category_electronics
十三、错误处理 #
13.1 常见错误 #
python
from pymilvus import MilvusException
try:
collection = Collection("non_existent")
except MilvusException as e:
print(f"Error code: {e.code}")
print(f"Error message: {e.message}")
13.2 错误码 #
| 错误码 | 说明 |
|---|---|
| 1 | 内部错误 |
| 2 | 未知错误 |
| 3 | 连接错误 |
| 4 | 超时错误 |
| 5 | 已存在 |
| 6 | 不存在 |
| 7 | 参数错误 |
| 8 | 权限错误 |
十四、最佳实践 #
14.1 连接管理 #
python
from pymilvus import connections
from contextlib import contextmanager
@contextmanager
def milvus_connection(alias, host, port):
try:
connections.connect(alias, host=host, port=port)
yield
finally:
connections.disconnect(alias)
with milvus_connection("default", "localhost", "19530"):
pass
14.2 批量操作 #
python
def batch_insert(collection, data, batch_size=1000):
total = len(data[0])
for i in range(0, total, batch_size):
batch = [field[i:i+batch_size] for field in data]
collection.insert(batch)
collection.flush()
14.3 异常处理 #
python
from pymilvus import MilvusException
import time
def search_with_retry(collection, vectors, max_retries=3):
for attempt in range(max_retries):
try:
return collection.search(
data=vectors,
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10
)
except MilvusException as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
十五、总结 #
常用操作速查表:
| 操作 | 方法 |
|---|---|
| 连接 | connections.connect() |
| 创建Collection | Collection(name, schema) |
| 插入数据 | collection.insert(data) |
| 搜索 | collection.search() |
| 查询 | collection.query() |
| 删除 | collection.delete(expr) |
| 创建索引 | collection.create_index() |
| 创建分区 | collection.create_partition() |
下一步,让我们学习数据管理操作!
最后更新:2026-04-04