推荐系统实战 #
本章介绍如何使用 Qdrant 构建个性化推荐系统。
推荐系统概述 #
text
推荐系统架构:
┌─────────────────────────────────────────────────────────────┐
│ 推荐系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户画像 物品向量 推荐引擎 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 行为数据 │ │ 物品特征 │ │ 相似性计算 │ │
│ │ 偏好向量 │ → │ 向量嵌入 │ → │ 推荐排序 │ │
│ │ 历史记录 │ │ Qdrant 存储 │ │ 结果过滤 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ↓ │
│ 推荐结果 │
│ │
└─────────────────────────────────────────────────────────────┘
环境准备 #
安装依赖 #
bash
pip install qdrant-client numpy pandas scikit-learn
导入库 #
python
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
VectorParams,
PointStruct,
Filter,
FieldCondition,
MatchValue,
Range,
RecommendExample
)
import numpy as np
from typing import List, Dict, Optional
from collections import defaultdict
数据准备 #
物品数据 #
python
items = [
{
"id": "item_001",
"name": "Python 编程入门",
"category": "书籍",
"tags": ["Python", "编程", "入门"],
"price": 59.0,
"rating": 4.8,
"sales": 10000,
"features": [0.8, 0.6, 0.9, 0.3, 0.7]
},
{
"id": "item_002",
"name": "机器学习实战",
"category": "书籍",
"tags": ["机器学习", "AI", "Python"],
"price": 89.0,
"rating": 4.9,
"sales": 8000,
"features": [0.9, 0.8, 0.7, 0.5, 0.6]
},
{
"id": "item_003",
"name": "深度学习原理",
"category": "书籍",
"tags": ["深度学习", "神经网络", "AI"],
"price": 99.0,
"rating": 4.7,
"sales": 6000,
"features": [0.7, 0.9, 0.8, 0.6, 0.5]
},
{
"id": "item_004",
"name": "数据分析课程",
"category": "课程",
"tags": ["数据分析", "Python", "可视化"],
"price": 199.0,
"rating": 4.6,
"sales": 5000,
"features": [0.6, 0.5, 0.9, 0.8, 0.7]
},
{
"id": "item_005",
"name": "Web 开发教程",
"category": "课程",
"tags": ["Web", "前端", "JavaScript"],
"price": 149.0,
"rating": 4.5,
"sales": 7000,
"features": [0.5, 0.4, 0.6, 0.9, 0.8]
},
{
"id": "item_006",
"name": "算法与数据结构",
"category": "书籍",
"tags": ["算法", "数据结构", "编程"],
"price": 69.0,
"rating": 4.8,
"sales": 9000,
"features": [0.7, 0.7, 0.5, 0.4, 0.9]
},
{
"id": "item_007",
"name": "云计算入门",
"category": "课程",
"tags": ["云计算", "AWS", "DevOps"],
"price": 299.0,
"rating": 4.4,
"sales": 3000,
"features": [0.4, 0.6, 0.5, 0.7, 0.6]
},
{
"id": "item_008",
"name": "数据库设计",
"category": "书籍",
"tags": ["数据库", "SQL", "设计"],
"price": 79.0,
"rating": 4.6,
"sales": 4000,
"features": [0.5, 0.5, 0.7, 0.6, 0.8]
}
]
用户行为数据 #
python
user_behaviors = {
"user_001": {
"viewed": ["item_001", "item_002", "item_006"],
"purchased": ["item_001", "item_002"],
"rated": {"item_001": 5, "item_002": 4}
},
"user_002": {
"viewed": ["item_002", "item_003", "item_004"],
"purchased": ["item_002", "item_003"],
"rated": {"item_002": 5, "item_003": 5}
},
"user_003": {
"viewed": ["item_001", "item_005", "item_006"],
"purchased": ["item_005"],
"rated": {"item_005": 4}
},
"user_004": {
"viewed": ["item_004", "item_007", "item_008"],
"purchased": ["item_004", "item_007"],
"rated": {"item_004": 5, "item_007": 4}
}
}
物品索引 #
创建物品 Collection #
python
client = QdrantClient(":memory:")
client.create_collection(
collection_name="items",
vectors_config=VectorParams(
size=5,
distance=Distance.COSINE
)
)
print("物品 Collection 创建成功")
索引物品向量 #
python
def index_items(client: QdrantClient, items: List[Dict]):
points = []
for item in items:
point = PointStruct(
id=item["id"],
vector=item["features"],
payload={
"name": item["name"],
"category": item["category"],
"tags": item["tags"],
"price": item["price"],
"rating": item["rating"],
"sales": item["sales"]
}
)
points.append(point)
client.upsert(
collection_name="items",
points=points
)
print(f"索引了 {len(points)} 个物品")
index_items(client, items)
相似物品推荐 #
基于物品的推荐 #
python
def recommend_similar_items(item_id: str, limit: int = 5) -> List[Dict]:
results = client.recommend(
collection_name="items",
positive=[item_id],
limit=limit + 1
)
recommendations = [
{
"id": r.id,
"name": r.payload["name"],
"category": r.payload["category"],
"tags": r.payload["tags"],
"price": r.payload["price"],
"rating": r.payload["rating"],
"score": r.score
}
for r in results if r.id != item_id
]
return recommendations[:limit]
similar = recommend_similar_items("item_001")
print("与《Python 编程入门》相似的物品:")
for item in similar:
print(f" - {item['name']} (相似度: {item['score']:.4f})")
多物品推荐 #
python
def recommend_from_multiple(item_ids: List[str], limit: int = 5) -> List[Dict]:
results = client.recommend(
collection_name="items",
positive=item_ids,
limit=limit + len(item_ids)
)
recommendations = [
{
"id": r.id,
"name": r.payload["name"],
"score": r.score
}
for r in results if r.id not in item_ids
]
return recommendations[:limit]
multi_recs = recommend_from_multiple(["item_001", "item_002"])
print("基于多个物品的推荐:")
for item in multi_recs:
print(f" - {item['name']} (推荐分数: {item['score']:.4f})")
正负样本推荐 #
python
def recommend_with_preferences(
liked_items: List[str],
disliked_items: List[str],
limit: int = 5
) -> List[Dict]:
results = client.recommend(
collection_name="items",
positive=liked_items,
negative=disliked_items,
limit=limit + len(liked_items) + len(disliked_items)
)
excluded = set(liked_items) | set(disliked_items)
recommendations = [
{
"id": r.id,
"name": r.payload["name"],
"score": r.score
}
for r in results if r.id not in excluded
]
return recommendations[:limit]
pref_recs = recommend_with_preferences(
liked_items=["item_001", "item_002"],
disliked_items=["item_005"]
)
print("考虑偏好的推荐:")
for item in pref_recs:
print(f" - {item['name']} (推荐分数: {item['score']:.4f})")
用户画像推荐 #
构建用户向量 #
python
def build_user_vector(user_id: str, items_data: Dict, behaviors: Dict) -> np.ndarray:
behavior = behaviors.get(user_id, {})
user_vector = np.zeros(5)
total_weight = 0
for item_id in behavior.get("purchased", []):
item = next((i for i in items_data if i["id"] == item_id), None)
if item:
weight = behavior.get("rated", {}).get(item_id, 3) / 5
user_vector += np.array(item["features"]) * weight * 2
total_weight += weight * 2
for item_id in behavior.get("viewed", []):
if item_id not in behavior.get("purchased", []):
item = next((i for i in items_data if i["id"] == item_id), None)
if item:
user_vector += np.array(item["features"]) * 0.5
total_weight += 0.5
if total_weight > 0:
user_vector = user_vector / total_weight
return user_vector
user_vector = build_user_vector("user_001", items, user_behaviors)
print(f"用户向量: {user_vector}")
基于用户向量的推荐 #
python
def recommend_for_user(user_id: str, limit: int = 5) -> List[Dict]:
user_vector = build_user_vector(user_id, items, user_behaviors)
behavior = user_behaviors.get(user_id, {})
interacted = set(behavior.get("viewed", [])) | set(behavior.get("purchased", []))
results = client.search(
collection_name="items",
query_vector=user_vector.tolist(),
limit=limit + len(interacted)
)
recommendations = [
{
"id": r.id,
"name": r.payload["name"],
"category": r.payload["category"],
"price": r.payload["price"],
"rating": r.payload["rating"],
"score": r.score
}
for r in results if r.id not in interacted
]
return recommendations[:limit]
user_recs = recommend_for_user("user_001")
print("为用户 user_001 的推荐:")
for item in user_recs:
print(f" - {item['name']} (匹配度: {item['score']:.4f}, ¥{item['price']})")
过滤推荐 #
分类过滤 #
python
def recommend_by_category(user_id: str, category: str, limit: int = 5) -> List[Dict]:
user_vector = build_user_vector(user_id, items, user_behaviors)
results = client.search(
collection_name="items",
query_vector=user_vector.tolist(),
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value=category)
)
]
),
limit=limit
)
return [
{
"id": r.id,
"name": r.payload["name"],
"score": r.score
}
for r in results
]
book_recs = recommend_by_category("user_001", "书籍")
print("书籍类推荐:")
for item in book_recs:
print(f" - {item['name']} (匹配度: {item['score']:.4f})")
价格范围过滤 #
python
def recommend_by_price_range(user_id: str, min_price: float, max_price: float, limit: int = 5) -> List[Dict]:
user_vector = build_user_vector(user_id, items, user_behaviors)
results = client.search(
collection_name="items",
query_vector=user_vector.tolist(),
query_filter=Filter(
must=[
FieldCondition(
key="price",
range=Range(gte=min_price, lte=max_price)
)
]
),
limit=limit
)
return [
{
"id": r.id,
"name": r.payload["name"],
"price": r.payload["price"],
"score": r.score
}
for r in results
]
price_recs = recommend_by_price_range("user_001", 50, 100)
print("价格 50-100 元的推荐:")
for item in price_recs:
print(f" - {item['name']} (¥{item['price']}, 匹配度: {item['score']:.4f})")
综合过滤推荐 #
python
def recommend_with_filters(
user_id: str,
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
min_rating: Optional[float] = None,
limit: int = 5
) -> List[Dict]:
user_vector = build_user_vector(user_id, items, user_behaviors)
conditions = []
if category:
conditions.append(
FieldCondition(key="category", match=MatchValue(value=category))
)
if min_price is not None or max_price is not None:
price_range = {}
if min_price is not None:
price_range["gte"] = min_price
if max_price is not None:
price_range["lte"] = max_price
conditions.append(FieldCondition(key="price", range=Range(**price_range)))
if min_rating is not None:
conditions.append(
FieldCondition(key="rating", range=Range(gte=min_rating))
)
query_filter = Filter(must=conditions) if conditions else None
results = client.search(
collection_name="items",
query_vector=user_vector.tolist(),
query_filter=query_filter,
limit=limit
)
return [
{
"id": r.id,
"name": r.payload["name"],
"category": r.payload["category"],
"price": r.payload["price"],
"rating": r.payload["rating"],
"score": r.score
}
for r in results
]
filtered_recs = recommend_with_filters(
"user_001",
category="书籍",
min_price=50,
max_price=100,
min_rating=4.5
)
print("综合过滤推荐:")
for item in filtered_recs:
print(f" - {item['name']} (¥{item['price']}, 评分: {item['rating']}, 匹配度: {item['score']:.4f})")
协同过滤 #
用户相似度计算 #
python
def calculate_user_similarity(user1_id: str, user2_id: str) -> float:
vector1 = build_user_vector(user1_id, items, user_behaviors)
vector2 = build_user_vector(user2_id, items, user_behaviors)
similarity = np.dot(vector1, vector2) / (
np.linalg.norm(vector1) * np.linalg.norm(vector2)
)
return float(similarity)
sim = calculate_user_similarity("user_001", "user_002")
print(f"用户相似度: {sim:.4f}")
协同过滤推荐 #
python
def collaborative_filtering_recommend(user_id: str, limit: int = 5) -> List[Dict]:
user_vector = build_user_vector(user_id, items, user_behaviors)
similar_users = []
for other_user in user_behaviors:
if other_user != user_id:
other_vector = build_user_vector(other_user, items, user_behaviors)
similarity = np.dot(user_vector, other_vector) / (
np.linalg.norm(user_vector) * np.linalg.norm(other_vector)
)
similar_users.append((other_user, similarity))
similar_users.sort(key=lambda x: x[1], reverse=True)
user_items = set(user_behaviors[user_id].get("purchased", []))
recommendations = defaultdict(float)
for similar_user, similarity in similar_users[:3]:
for item_id in user_behaviors[similar_user].get("purchased", []):
if item_id not in user_items:
rating = user_behaviors[similar_user].get("rated", {}).get(item_id, 3)
recommendations[item_id] += similarity * rating
sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
result = []
for item_id, score in sorted_recs[:limit]:
item = next((i for i in items if i["id"] == item_id), None)
if item:
result.append({
"id": item_id,
"name": item["name"],
"collaborative_score": score
})
return result
cf_recs = collaborative_filtering_recommend("user_001")
print("协同过滤推荐:")
for item in cf_recs:
print(f" - {item['name']} (协同分数: {item['collaborative_score']:.4f})")
混合推荐 #
混合推荐策略 #
python
def hybrid_recommend(
user_id: str,
content_weight: float = 0.5,
collaborative_weight: float = 0.5,
limit: int = 5
) -> List[Dict]:
content_recs = recommend_for_user(user_id, limit=limit * 2)
cf_recs = collaborative_filtering_recommend(user_id, limit=limit * 2)
combined = {}
for rec in content_recs:
combined[rec["id"]] = {
**rec,
"content_score": rec["score"],
"collaborative_score": 0,
"final_score": rec["score"] * content_weight
}
for rec in cf_recs:
if rec["id"] in combined:
combined[rec["id"]]["collaborative_score"] = rec["collaborative_score"]
combined[rec["id"]]["final_score"] += rec["collaborative_score"] * collaborative_weight
else:
item = next((i for i in items if i["id"] == rec["id"]), None)
if item:
combined[rec["id"]] = {
"id": rec["id"],
"name": item["name"],
"content_score": 0,
"collaborative_score": rec["collaborative_score"],
"final_score": rec["collaborative_score"] * collaborative_weight
}
sorted_recs = sorted(combined.values(), key=lambda x: x["final_score"], reverse=True)
return sorted_recs[:limit]
hybrid_recs = hybrid_recommend("user_001")
print("混合推荐结果:")
for item in hybrid_recs:
print(f" - {item['name']}")
print(f" 内容分数: {item['content_score']:.4f}, 协同分数: {item['collaborative_score']:.4f}")
print(f" 最终分数: {item['final_score']:.4f}")
推荐系统类 #
python
class RecommendationSystem:
def __init__(self):
self.client = QdrantClient(":memory:")
self.items = []
def load_items(self, items: List[Dict]):
self.items = items
self.client.create_collection(
collection_name="items",
vectors_config=VectorParams(size=5, distance=Distance.COSINE)
)
index_items(self.client, items)
def similar_items(self, item_id: str, limit: int = 5) -> List[Dict]:
return recommend_similar_items(item_id, limit)
def personal_recommend(self, user_id: str, behaviors: Dict, limit: int = 5) -> List[Dict]:
user_vector = build_user_vector(user_id, self.items, behaviors)
interacted = set()
if user_id in behaviors:
interacted = set(behaviors[user_id].get("viewed", [])) | set(behaviors[user_id].get("purchased", []))
results = self.client.search(
collection_name="items",
query_vector=user_vector.tolist(),
limit=limit + len(interacted)
)
return [
{"id": r.id, "name": r.payload["name"], "score": r.score}
for r in results if r.id not in interacted
][:limit]
def filtered_recommend(
self,
user_id: str,
behaviors: Dict,
filters: Optional[Dict] = None,
limit: int = 5
) -> List[Dict]:
user_vector = build_user_vector(user_id, self.items, behaviors)
query_filter = None
if filters:
conditions = []
if "category" in filters:
conditions.append(
FieldCondition(key="category", match=MatchValue(value=filters["category"]))
)
if "min_price" in filters or "max_price" in filters:
price_range = {}
if "min_price" in filters:
price_range["gte"] = filters["min_price"]
if "max_price" in filters:
price_range["lte"] = filters["max_price"]
conditions.append(FieldCondition(key="price", range=Range(**price_range)))
if conditions:
query_filter = Filter(must=conditions)
results = self.client.search(
collection_name="items",
query_vector=user_vector.tolist(),
query_filter=query_filter,
limit=limit
)
return [
{"id": r.id, "name": r.payload["name"], "price": r.payload["price"], "score": r.score}
for r in results
]
rec_sys = RecommendationSystem()
rec_sys.load_items(items)
print("\n相似物品推荐:")
similar = rec_sys.similar_items("item_001")
for item in similar[:3]:
print(f" - {item['name']}")
print("\n个性化推荐:")
personal = rec_sys.personal_recommend("user_001", user_behaviors)
for item in personal:
print(f" - {item['name']} (匹配度: {item['score']:.4f})")
print("\n过滤推荐:")
filtered = rec_sys.filtered_recommend(
"user_001",
user_behaviors,
filters={"category": "书籍", "max_price": 100}
)
for item in filtered:
print(f" - {item['name']} (¥{item['price']})")
小结 #
本章实现了完整的推荐系统:
- 物品向量索引
- 相似物品推荐
- 用户画像构建
- 个性化推荐
- 过滤推荐
- 协同过滤
- 混合推荐策略
总结 #
恭喜你完成了 Qdrant 向量数据库的完整学习!从基础概念到实战应用,你已经掌握了:
- Qdrant 核心概念和架构
- 向量存储和搜索操作
- 高级特性和性能优化
- 分布式部署和运维
- 语义搜索、RAG、推荐系统等实战应用
继续探索 Qdrant 的更多可能性,构建你的 AI 应用!
最后更新:2026-04-04