推荐系统实战 #

本章介绍如何使用 Qdrant 构建个性化推荐系统。

推荐系统概述 #

text
推荐系统架构:

┌─────────────────────────────────────────────────────────────┐
│                      推荐系统                                │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  用户画像              物品向量              推荐引擎        │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐ │
│  │ 行为数据    │      │ 物品特征    │      │ 相似性计算  │ │
│  │ 偏好向量    │  →   │ 向量嵌入    │  →   │ 推荐排序    │ │
│  │ 历史记录    │      │ Qdrant 存储 │      │ 结果过滤    │ │
│  └─────────────┘      └─────────────┘      └─────────────┘ │
│                              │                               │
│                              ↓                               │
│                         推荐结果                             │
│                                                              │
└─────────────────────────────────────────────────────────────┘

环境准备 #

安装依赖 #

bash
pip install qdrant-client numpy pandas scikit-learn

导入库 #

python
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    PointStruct,
    Filter,
    FieldCondition,
    MatchValue,
    Range,
    RecommendExample
)
import numpy as np
from typing import List, Dict, Optional
from collections import defaultdict

数据准备 #

物品数据 #

python
items = [
    {
        "id": "item_001",
        "name": "Python 编程入门",
        "category": "书籍",
        "tags": ["Python", "编程", "入门"],
        "price": 59.0,
        "rating": 4.8,
        "sales": 10000,
        "features": [0.8, 0.6, 0.9, 0.3, 0.7]
    },
    {
        "id": "item_002",
        "name": "机器学习实战",
        "category": "书籍",
        "tags": ["机器学习", "AI", "Python"],
        "price": 89.0,
        "rating": 4.9,
        "sales": 8000,
        "features": [0.9, 0.8, 0.7, 0.5, 0.6]
    },
    {
        "id": "item_003",
        "name": "深度学习原理",
        "category": "书籍",
        "tags": ["深度学习", "神经网络", "AI"],
        "price": 99.0,
        "rating": 4.7,
        "sales": 6000,
        "features": [0.7, 0.9, 0.8, 0.6, 0.5]
    },
    {
        "id": "item_004",
        "name": "数据分析课程",
        "category": "课程",
        "tags": ["数据分析", "Python", "可视化"],
        "price": 199.0,
        "rating": 4.6,
        "sales": 5000,
        "features": [0.6, 0.5, 0.9, 0.8, 0.7]
    },
    {
        "id": "item_005",
        "name": "Web 开发教程",
        "category": "课程",
        "tags": ["Web", "前端", "JavaScript"],
        "price": 149.0,
        "rating": 4.5,
        "sales": 7000,
        "features": [0.5, 0.4, 0.6, 0.9, 0.8]
    },
    {
        "id": "item_006",
        "name": "算法与数据结构",
        "category": "书籍",
        "tags": ["算法", "数据结构", "编程"],
        "price": 69.0,
        "rating": 4.8,
        "sales": 9000,
        "features": [0.7, 0.7, 0.5, 0.4, 0.9]
    },
    {
        "id": "item_007",
        "name": "云计算入门",
        "category": "课程",
        "tags": ["云计算", "AWS", "DevOps"],
        "price": 299.0,
        "rating": 4.4,
        "sales": 3000,
        "features": [0.4, 0.6, 0.5, 0.7, 0.6]
    },
    {
        "id": "item_008",
        "name": "数据库设计",
        "category": "书籍",
        "tags": ["数据库", "SQL", "设计"],
        "price": 79.0,
        "rating": 4.6,
        "sales": 4000,
        "features": [0.5, 0.5, 0.7, 0.6, 0.8]
    }
]

用户行为数据 #

python
user_behaviors = {
    "user_001": {
        "viewed": ["item_001", "item_002", "item_006"],
        "purchased": ["item_001", "item_002"],
        "rated": {"item_001": 5, "item_002": 4}
    },
    "user_002": {
        "viewed": ["item_002", "item_003", "item_004"],
        "purchased": ["item_002", "item_003"],
        "rated": {"item_002": 5, "item_003": 5}
    },
    "user_003": {
        "viewed": ["item_001", "item_005", "item_006"],
        "purchased": ["item_005"],
        "rated": {"item_005": 4}
    },
    "user_004": {
        "viewed": ["item_004", "item_007", "item_008"],
        "purchased": ["item_004", "item_007"],
        "rated": {"item_004": 5, "item_007": 4}
    }
}

物品索引 #

创建物品 Collection #

python
client = QdrantClient(":memory:")

client.create_collection(
    collection_name="items",
    vectors_config=VectorParams(
        size=5,
        distance=Distance.COSINE
    )
)

print("物品 Collection 创建成功")

索引物品向量 #

python
def index_items(client: QdrantClient, items: List[Dict]):
    points = []
    
    for item in items:
        point = PointStruct(
            id=item["id"],
            vector=item["features"],
            payload={
                "name": item["name"],
                "category": item["category"],
                "tags": item["tags"],
                "price": item["price"],
                "rating": item["rating"],
                "sales": item["sales"]
            }
        )
        points.append(point)
    
    client.upsert(
        collection_name="items",
        points=points
    )
    
    print(f"索引了 {len(points)} 个物品")

index_items(client, items)

相似物品推荐 #

基于物品的推荐 #

python
def recommend_similar_items(item_id: str, limit: int = 5) -> List[Dict]:
    results = client.recommend(
        collection_name="items",
        positive=[item_id],
        limit=limit + 1
    )
    
    recommendations = [
        {
            "id": r.id,
            "name": r.payload["name"],
            "category": r.payload["category"],
            "tags": r.payload["tags"],
            "price": r.payload["price"],
            "rating": r.payload["rating"],
            "score": r.score
        }
        for r in results if r.id != item_id
    ]
    
    return recommendations[:limit]

similar = recommend_similar_items("item_001")
print("与《Python 编程入门》相似的物品:")
for item in similar:
    print(f"  - {item['name']} (相似度: {item['score']:.4f})")

多物品推荐 #

python
def recommend_from_multiple(item_ids: List[str], limit: int = 5) -> List[Dict]:
    results = client.recommend(
        collection_name="items",
        positive=item_ids,
        limit=limit + len(item_ids)
    )
    
    recommendations = [
        {
            "id": r.id,
            "name": r.payload["name"],
            "score": r.score
        }
        for r in results if r.id not in item_ids
    ]
    
    return recommendations[:limit]

multi_recs = recommend_from_multiple(["item_001", "item_002"])
print("基于多个物品的推荐:")
for item in multi_recs:
    print(f"  - {item['name']} (推荐分数: {item['score']:.4f})")

正负样本推荐 #

python
def recommend_with_preferences(
    liked_items: List[str],
    disliked_items: List[str],
    limit: int = 5
) -> List[Dict]:
    results = client.recommend(
        collection_name="items",
        positive=liked_items,
        negative=disliked_items,
        limit=limit + len(liked_items) + len(disliked_items)
    )
    
    excluded = set(liked_items) | set(disliked_items)
    
    recommendations = [
        {
            "id": r.id,
            "name": r.payload["name"],
            "score": r.score
        }
        for r in results if r.id not in excluded
    ]
    
    return recommendations[:limit]

pref_recs = recommend_with_preferences(
    liked_items=["item_001", "item_002"],
    disliked_items=["item_005"]
)

print("考虑偏好的推荐:")
for item in pref_recs:
    print(f"  - {item['name']} (推荐分数: {item['score']:.4f})")

用户画像推荐 #

构建用户向量 #

python
def build_user_vector(user_id: str, items_data: Dict, behaviors: Dict) -> np.ndarray:
    behavior = behaviors.get(user_id, {})
    
    user_vector = np.zeros(5)
    total_weight = 0
    
    for item_id in behavior.get("purchased", []):
        item = next((i for i in items_data if i["id"] == item_id), None)
        if item:
            weight = behavior.get("rated", {}).get(item_id, 3) / 5
            user_vector += np.array(item["features"]) * weight * 2
            total_weight += weight * 2
    
    for item_id in behavior.get("viewed", []):
        if item_id not in behavior.get("purchased", []):
            item = next((i for i in items_data if i["id"] == item_id), None)
            if item:
                user_vector += np.array(item["features"]) * 0.5
                total_weight += 0.5
    
    if total_weight > 0:
        user_vector = user_vector / total_weight
    
    return user_vector

user_vector = build_user_vector("user_001", items, user_behaviors)
print(f"用户向量: {user_vector}")

基于用户向量的推荐 #

python
def recommend_for_user(user_id: str, limit: int = 5) -> List[Dict]:
    user_vector = build_user_vector(user_id, items, user_behaviors)
    
    behavior = user_behaviors.get(user_id, {})
    interacted = set(behavior.get("viewed", [])) | set(behavior.get("purchased", []))
    
    results = client.search(
        collection_name="items",
        query_vector=user_vector.tolist(),
        limit=limit + len(interacted)
    )
    
    recommendations = [
        {
            "id": r.id,
            "name": r.payload["name"],
            "category": r.payload["category"],
            "price": r.payload["price"],
            "rating": r.payload["rating"],
            "score": r.score
        }
        for r in results if r.id not in interacted
    ]
    
    return recommendations[:limit]

user_recs = recommend_for_user("user_001")
print("为用户 user_001 的推荐:")
for item in user_recs:
    print(f"  - {item['name']} (匹配度: {item['score']:.4f}, ¥{item['price']})")

过滤推荐 #

分类过滤 #

python
def recommend_by_category(user_id: str, category: str, limit: int = 5) -> List[Dict]:
    user_vector = build_user_vector(user_id, items, user_behaviors)
    
    results = client.search(
        collection_name="items",
        query_vector=user_vector.tolist(),
        query_filter=Filter(
            must=[
                FieldCondition(
                    key="category",
                    match=MatchValue(value=category)
                )
            ]
        ),
        limit=limit
    )
    
    return [
        {
            "id": r.id,
            "name": r.payload["name"],
            "score": r.score
        }
        for r in results
    ]

book_recs = recommend_by_category("user_001", "书籍")
print("书籍类推荐:")
for item in book_recs:
    print(f"  - {item['name']} (匹配度: {item['score']:.4f})")

价格范围过滤 #

python
def recommend_by_price_range(user_id: str, min_price: float, max_price: float, limit: int = 5) -> List[Dict]:
    user_vector = build_user_vector(user_id, items, user_behaviors)
    
    results = client.search(
        collection_name="items",
        query_vector=user_vector.tolist(),
        query_filter=Filter(
            must=[
                FieldCondition(
                    key="price",
                    range=Range(gte=min_price, lte=max_price)
                )
            ]
        ),
        limit=limit
    )
    
    return [
        {
            "id": r.id,
            "name": r.payload["name"],
            "price": r.payload["price"],
            "score": r.score
        }
        for r in results
    ]

price_recs = recommend_by_price_range("user_001", 50, 100)
print("价格 50-100 元的推荐:")
for item in price_recs:
    print(f"  - {item['name']} (¥{item['price']}, 匹配度: {item['score']:.4f})")

综合过滤推荐 #

python
def recommend_with_filters(
    user_id: str,
    category: Optional[str] = None,
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    min_rating: Optional[float] = None,
    limit: int = 5
) -> List[Dict]:
    user_vector = build_user_vector(user_id, items, user_behaviors)
    
    conditions = []
    
    if category:
        conditions.append(
            FieldCondition(key="category", match=MatchValue(value=category))
        )
    
    if min_price is not None or max_price is not None:
        price_range = {}
        if min_price is not None:
            price_range["gte"] = min_price
        if max_price is not None:
            price_range["lte"] = max_price
        conditions.append(FieldCondition(key="price", range=Range(**price_range)))
    
    if min_rating is not None:
        conditions.append(
            FieldCondition(key="rating", range=Range(gte=min_rating))
        )
    
    query_filter = Filter(must=conditions) if conditions else None
    
    results = client.search(
        collection_name="items",
        query_vector=user_vector.tolist(),
        query_filter=query_filter,
        limit=limit
    )
    
    return [
        {
            "id": r.id,
            "name": r.payload["name"],
            "category": r.payload["category"],
            "price": r.payload["price"],
            "rating": r.payload["rating"],
            "score": r.score
        }
        for r in results
    ]

filtered_recs = recommend_with_filters(
    "user_001",
    category="书籍",
    min_price=50,
    max_price=100,
    min_rating=4.5
)

print("综合过滤推荐:")
for item in filtered_recs:
    print(f"  - {item['name']} (¥{item['price']}, 评分: {item['rating']}, 匹配度: {item['score']:.4f})")

协同过滤 #

用户相似度计算 #

python
def calculate_user_similarity(user1_id: str, user2_id: str) -> float:
    vector1 = build_user_vector(user1_id, items, user_behaviors)
    vector2 = build_user_vector(user2_id, items, user_behaviors)
    
    similarity = np.dot(vector1, vector2) / (
        np.linalg.norm(vector1) * np.linalg.norm(vector2)
    )
    
    return float(similarity)

sim = calculate_user_similarity("user_001", "user_002")
print(f"用户相似度: {sim:.4f}")

协同过滤推荐 #

python
def collaborative_filtering_recommend(user_id: str, limit: int = 5) -> List[Dict]:
    user_vector = build_user_vector(user_id, items, user_behaviors)
    
    similar_users = []
    for other_user in user_behaviors:
        if other_user != user_id:
            other_vector = build_user_vector(other_user, items, user_behaviors)
            similarity = np.dot(user_vector, other_vector) / (
                np.linalg.norm(user_vector) * np.linalg.norm(other_vector)
            )
            similar_users.append((other_user, similarity))
    
    similar_users.sort(key=lambda x: x[1], reverse=True)
    
    user_items = set(user_behaviors[user_id].get("purchased", []))
    
    recommendations = defaultdict(float)
    
    for similar_user, similarity in similar_users[:3]:
        for item_id in user_behaviors[similar_user].get("purchased", []):
            if item_id not in user_items:
                rating = user_behaviors[similar_user].get("rated", {}).get(item_id, 3)
                recommendations[item_id] += similarity * rating
    
    sorted_recs = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)
    
    result = []
    for item_id, score in sorted_recs[:limit]:
        item = next((i for i in items if i["id"] == item_id), None)
        if item:
            result.append({
                "id": item_id,
                "name": item["name"],
                "collaborative_score": score
            })
    
    return result

cf_recs = collaborative_filtering_recommend("user_001")
print("协同过滤推荐:")
for item in cf_recs:
    print(f"  - {item['name']} (协同分数: {item['collaborative_score']:.4f})")

混合推荐 #

混合推荐策略 #

python
def hybrid_recommend(
    user_id: str,
    content_weight: float = 0.5,
    collaborative_weight: float = 0.5,
    limit: int = 5
) -> List[Dict]:
    content_recs = recommend_for_user(user_id, limit=limit * 2)
    
    cf_recs = collaborative_filtering_recommend(user_id, limit=limit * 2)
    
    combined = {}
    
    for rec in content_recs:
        combined[rec["id"]] = {
            **rec,
            "content_score": rec["score"],
            "collaborative_score": 0,
            "final_score": rec["score"] * content_weight
        }
    
    for rec in cf_recs:
        if rec["id"] in combined:
            combined[rec["id"]]["collaborative_score"] = rec["collaborative_score"]
            combined[rec["id"]]["final_score"] += rec["collaborative_score"] * collaborative_weight
        else:
            item = next((i for i in items if i["id"] == rec["id"]), None)
            if item:
                combined[rec["id"]] = {
                    "id": rec["id"],
                    "name": item["name"],
                    "content_score": 0,
                    "collaborative_score": rec["collaborative_score"],
                    "final_score": rec["collaborative_score"] * collaborative_weight
                }
    
    sorted_recs = sorted(combined.values(), key=lambda x: x["final_score"], reverse=True)
    
    return sorted_recs[:limit]

hybrid_recs = hybrid_recommend("user_001")
print("混合推荐结果:")
for item in hybrid_recs:
    print(f"  - {item['name']}")
    print(f"    内容分数: {item['content_score']:.4f}, 协同分数: {item['collaborative_score']:.4f}")
    print(f"    最终分数: {item['final_score']:.4f}")

推荐系统类 #

python
class RecommendationSystem:
    def __init__(self):
        self.client = QdrantClient(":memory:")
        self.items = []
    
    def load_items(self, items: List[Dict]):
        self.items = items
        
        self.client.create_collection(
            collection_name="items",
            vectors_config=VectorParams(size=5, distance=Distance.COSINE)
        )
        
        index_items(self.client, items)
    
    def similar_items(self, item_id: str, limit: int = 5) -> List[Dict]:
        return recommend_similar_items(item_id, limit)
    
    def personal_recommend(self, user_id: str, behaviors: Dict, limit: int = 5) -> List[Dict]:
        user_vector = build_user_vector(user_id, self.items, behaviors)
        
        interacted = set()
        if user_id in behaviors:
            interacted = set(behaviors[user_id].get("viewed", [])) | set(behaviors[user_id].get("purchased", []))
        
        results = self.client.search(
            collection_name="items",
            query_vector=user_vector.tolist(),
            limit=limit + len(interacted)
        )
        
        return [
            {"id": r.id, "name": r.payload["name"], "score": r.score}
            for r in results if r.id not in interacted
        ][:limit]
    
    def filtered_recommend(
        self,
        user_id: str,
        behaviors: Dict,
        filters: Optional[Dict] = None,
        limit: int = 5
    ) -> List[Dict]:
        user_vector = build_user_vector(user_id, self.items, behaviors)
        
        query_filter = None
        if filters:
            conditions = []
            if "category" in filters:
                conditions.append(
                    FieldCondition(key="category", match=MatchValue(value=filters["category"]))
                )
            if "min_price" in filters or "max_price" in filters:
                price_range = {}
                if "min_price" in filters:
                    price_range["gte"] = filters["min_price"]
                if "max_price" in filters:
                    price_range["lte"] = filters["max_price"]
                conditions.append(FieldCondition(key="price", range=Range(**price_range)))
            
            if conditions:
                query_filter = Filter(must=conditions)
        
        results = self.client.search(
            collection_name="items",
            query_vector=user_vector.tolist(),
            query_filter=query_filter,
            limit=limit
        )
        
        return [
            {"id": r.id, "name": r.payload["name"], "price": r.payload["price"], "score": r.score}
            for r in results
        ]

rec_sys = RecommendationSystem()
rec_sys.load_items(items)

print("\n相似物品推荐:")
similar = rec_sys.similar_items("item_001")
for item in similar[:3]:
    print(f"  - {item['name']}")

print("\n个性化推荐:")
personal = rec_sys.personal_recommend("user_001", user_behaviors)
for item in personal:
    print(f"  - {item['name']} (匹配度: {item['score']:.4f})")

print("\n过滤推荐:")
filtered = rec_sys.filtered_recommend(
    "user_001",
    user_behaviors,
    filters={"category": "书籍", "max_price": 100}
)
for item in filtered:
    print(f"  - {item['name']} (¥{item['price']})")

小结 #

本章实现了完整的推荐系统:

  • 物品向量索引
  • 相似物品推荐
  • 用户画像构建
  • 个性化推荐
  • 过滤推荐
  • 协同过滤
  • 混合推荐策略

总结 #

恭喜你完成了 Qdrant 向量数据库的完整学习!从基础概念到实战应用,你已经掌握了:

  • Qdrant 核心概念和架构
  • 向量存储和搜索操作
  • 高级特性和性能优化
  • 分布式部署和运维
  • 语义搜索、RAG、推荐系统等实战应用

继续探索 Qdrant 的更多可能性,构建你的 AI 应用!

最后更新:2026-04-04