排序任务实战 #

排序任务概述 #

什么是 Learning to Rank? #

text
┌─────────────────────────────────────────────────────────────┐
│                    Learning to Rank                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  目标:学习一个排序函数,对文档/商品进行排序                 │
│                                                              │
│  应用场景:                                                  │
│  - 搜索引擎结果排序                                          │
│  - 推荐系统商品排序                                          │
│  - 广告排序                                                  │
│  - 问答系统答案排序                                          │
│                                                              │
│  方法分类:                                                  │
│  - Pointwise:独立预测每个样本                               │
│  - Pairwise:预测样本对的关系                                │
│  - Listwise:直接优化排序列表                                │
│                                                              │
└─────────────────────────────────────────────────────────────┘

排序评估指标 #

python
import numpy as np

def dcg_at_k(relevance, k):
    """
    计算 DCG@K (Discounted Cumulative Gain)
    
    DCG@K = Σ (2^rel_i - 1) / log2(i + 1)
    """
    relevance = np.array(relevance)[:k]
    if relevance.size == 0:
        return 0.0
    
    gains = 2 ** relevance - 1
    discounts = np.log2(np.arange(len(relevance)) + 2)
    
    return np.sum(gains / discounts)

def ndcg_at_k(relevance, k):
    """
    计算 NDCG@K (Normalized DCG)
    
    NDCG@K = DCG@K / IDCG@K
    """
    dcg = dcg_at_k(relevance, k)
    
    ideal_relevance = sorted(relevance, reverse=True)
    idcg = dcg_at_k(ideal_relevance, k)
    
    if idcg == 0:
        return 0.0
    
    return dcg / idcg

# 示例
relevance = [3, 2, 3, 0, 1, 2]  # 相关性分数
print(f"NDCG@3: {ndcg_at_k(relevance, 3):.4f}")
print(f"NDCG@5: {ndcg_at_k(relevance, 5):.4f}")

搜索排序实战 #

数据准备 #

python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import GroupKFold

# 1. 创建模拟数据
def create_ranking_data(n_queries=1000, n_docs_per_query=10):
    """创建排序数据"""
    np.random.seed(42)
    
    data = []
    
    for query_id in range(n_queries):
        # 每个查询有 n_docs_per_query 个文档
        for doc_id in range(n_docs_per_query):
            # 特征
            features = {
                'query_id': query_id,
                'doc_id': doc_id,
                'title_match': np.random.uniform(0, 1),
                'content_match': np.random.uniform(0, 1),
                'pagerank': np.random.uniform(0, 10),
                'url_length': np.random.randint(10, 100),
                'click_rate': np.random.uniform(0, 0.5),
                'freshness': np.random.randint(0, 365),
            }
            
            # 生成相关性分数(基于特征的组合)
            relevance = (
                features['title_match'] * 3 +
                features['content_match'] * 2 +
                features['pagerank'] * 0.5 +
                features['click_rate'] * 2 -
                features['url_length'] * 0.01 -
                features['freshness'] * 0.001 +
                np.random.normal(0, 0.5)
            )
            
            # 离散化为 0-4 级
            relevance = int(np.clip(relevance, 0, 4))
            
            features['relevance'] = relevance
            data.append(features)
    
    return pd.DataFrame(data)

df = create_ranking_data()

print("数据集信息:")
print(f"  查询数: {df['query_id'].nunique()}")
print(f"  文档数: {len(df)}")
print(f"\n相关性分布:")
print(df['relevance'].value_counts().sort_index())

模型训练 #

python
# 2. 准备数据
feature_cols = ['title_match', 'content_match', 'pagerank', 
                'url_length', 'click_rate', 'freshness']

X = df[feature_cols].values
y = df['relevance'].values
groups = df['query_id'].values

# 按查询分组划分
unique_queries = df['query_id'].unique()
train_queries = unique_queries[:800]
test_queries = unique_queries[800:]

train_mask = df['query_id'].isin(train_queries)
test_mask = df['query_id'].isin(test_queries)

X_train, X_test = X[train_mask], X[test_mask]
y_train, y_test = y[train_mask], y[test_mask]
groups_train = df[train_mask].groupby('query_id').size().values
groups_test = df[test_mask].groupby('query_id').size().values

# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# 设置组信息
dtrain.set_group(groups_train)
dtest.set_group(groups_test)

# 4. 参数配置
params = {
    'objective': 'rank:ndcg',  # NDCG 优化
    'eval_metric': 'ndcg',
    'max_depth': 6,
    'eta': 0.1,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'seed': 42
}

# 5. 训练模型
model = xgb.train(
    params,
    dtrain,
    num_boost_round=200,
    evals=[(dtrain, 'train'), (dtest, 'test')],
    early_stopping_rounds=20,
    verbose_eval=20
)

# 6. 预测
scores = model.predict(dtest)

# 7. 评估
def evaluate_ranking(df_test, scores, k=10):
    """评估排序性能"""
    df_test = df_test.copy()
    df_test['score'] = scores
    
    ndcg_scores = []
    
    for query_id in df_test['query_id'].unique():
        query_data = df_test[df_test['query_id'] == query_id]
        query_data = query_data.sort_values('score', ascending=False)
        
        relevance = query_data['relevance'].tolist()
        ndcg = ndcg_at_k(relevance, k)
        ndcg_scores.append(ndcg)
    
    return np.mean(ndcg_scores)

df_test = df[test_mask].copy()
ndcg_5 = evaluate_ranking(df_test, scores, k=5)
ndcg_10 = evaluate_ranking(df_test, scores, k=10)

print(f"\nNDCG@5: {ndcg_5:.4f}")
print(f"NDCG@10: {ndcg_10:.4f}")

可视化结果 #

python
import matplotlib.pyplot as plt

def plot_ranking_results(df_test, scores, n_queries=5):
    """可视化排序结果"""
    df_test = df_test.copy()
    df_test['score'] = scores
    
    fig, axes = plt.subplots(1, n_queries, figsize=(15, 4))
    
    for i, query_id in enumerate(df_test['query_id'].unique()[:n_queries]):
        query_data = df_test[df_test['query_id'] == query_id]
        query_data = query_data.sort_values('score', ascending=False)
        
        axes[i].bar(range(len(query_data)), query_data['relevance'])
        axes[i].set_xlabel('Rank')
        axes[i].set_ylabel('Relevance')
        axes[i].set_title(f'Query {query_id}')
    
    plt.tight_layout()
    plt.show()

plot_ranking_results(df_test, scores)

推荐排序实战 #

python
import numpy as np
import pandas as pd
import xgboost as xgb

# 1. 创建推荐数据
def create_recommendation_data(n_users=1000, n_items=500, n_interactions=50000):
    """创建推荐数据"""
    np.random.seed(42)
    
    data = []
    
    for _ in range(n_interactions):
        user_id = np.random.randint(0, n_users)
        item_id = np.random.randint(0, n_items)
        
        # 用户特征
        user_features = {
            f'user_feat_{i}': np.random.randn() for i in range(5)
        }
        
        # 物品特征
        item_features = {
            f'item_feat_{i}': np.random.randn() for i in range(5)
        }
        
        # 交互特征
        interaction_features = {
            'user_item_similarity': np.random.uniform(0, 1),
            'historical_click_rate': np.random.uniform(0, 0.5),
        }
        
        # 生成评分
        rating = (
            user_features['user_feat_0'] * 0.5 +
            item_features['item_feat_0'] * 0.5 +
            interaction_features['user_item_similarity'] * 2 +
            interaction_features['historical_click_rate'] * 3 +
            np.random.normal(0, 0.5)
        )
        
        # 离散化
        rating = int(np.clip(rating + 3, 1, 5))
        
        row = {
            'user_id': user_id,
            'item_id': item_id,
            **user_features,
            **item_features,
            **interaction_features,
            'rating': rating
        }
        
        data.append(row)
    
    return pd.DataFrame(data)

df = create_recommendation_data()

print("推荐数据集信息:")
print(f"  用户数: {df['user_id'].nunique()}")
print(f"  物品数: {df['item_id'].nunique()}")
print(f"  交互数: {len(df)}")
print(f"\n评分分布:")
print(df['rating'].value_counts().sort_index())

# 2. 准备数据
feature_cols = [col for col in df.columns if col not in ['user_id', 'item_id', 'rating']]

X = df[feature_cols].values
y = df['rating'].values
groups = df['user_id'].values

# 按用户分组
unique_users = df['user_id'].unique()
train_users = unique_users[:800]
test_users = unique_users[800:]

train_mask = df['user_id'].isin(train_users)
test_mask = df['user_id'].isin(test_users)

X_train, X_test = X[train_mask], X[test_mask]
y_train, y_test = y[train_mask], y[test_mask]
groups_train = df[train_mask].groupby('user_id').size().values
groups_test = df[test_mask].groupby('user_id').size().values

# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

dtrain.set_group(groups_train)
dtest.set_group(groups_test)

# 4. 训练
params = {
    'objective': 'rank:pairwise',  # 成对排序
    'eval_metric': 'ndcg',
    'max_depth': 6,
    'eta': 0.1
}

model = xgb.train(params, dtrain, num_boost_round=100)

# 5. 预测和评估
scores = model.predict(dtest)

df_test = df[test_mask].copy()
df_test['score'] = scores

ndcg_5 = evaluate_ranking(df_test, scores, k=5)
ndcg_10 = evaluate_ranking(df_test, scores, k=10)

print(f"\nNDCG@5: {ndcg_5:.4f}")
print(f"NDCG@10: {ndcg_10:.4f}")

排序目标函数对比 #

python
def compare_ranking_objectives():
    """比较不同的排序目标函数"""
    
    objectives = {
        'rank:pairwise': {
            'description': '成对排序损失',
            'pros': ['简单有效', '训练稳定'],
            'cons': ['不考虑位置', '计算复杂度高']
        },
        'rank:ndcg': {
            'description': '直接优化 NDCG',
            'pros': ['直接优化目标指标', '考虑位置'],
            'cons': ['计算复杂', '可能不稳定']
        },
        'rank:map': {
            'description': '优化 MAP',
            'pros': ['适合二值相关性', '简单'],
            'cons': ['不适用于多级相关性']
        }
    }
    
    print("排序目标函数对比:\n")
    for obj, info in objectives.items():
        print(f"{obj}:")
        print(f"  描述: {info['description']}")
        print(f"  优点: {', '.join(info['pros'])}")
        print(f"  缺点: {', '.join(info['cons'])}")
        print()

compare_ranking_objectives()

排序任务最佳实践 #

python
def ranking_best_practices():
    """
    排序任务最佳实践
    """
    practices = {
        '数据准备': [
            '正确设置组信息',
            '确保同一组数据连续存储',
            '处理稀疏特征'
        ],
        '模型训练': [
            '选择合适的目标函数',
            '使用早停策略',
            '监控 NDCG 指标'
        ],
        '模型评估': [
            '使用 NDCG/MAP 评估',
            '按组评估性能',
            '分析不同位置的准确率'
        ],
        '模型优化': [
            '尝试不同的目标函数',
            '调整特征工程',
            '结合业务规则'
        ]
    }
    
    for category, items in practices.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  • {item}")

ranking_best_practices()

总结 #

恭喜你完成了 XGBoost 完全指南的学习!现在你已经掌握了:

  1. 基础入门:XGBoost 简介、安装配置、第一个模型
  2. 核心概念:梯度提升原理、决策树基础、目标函数
  3. 模型训练:数据准备、参数配置、训练与评估
  4. 进阶功能:特征工程、调参技巧、交叉验证
  5. 高级应用:分布式训练、GPU 加速、自定义目标函数
  6. 实战案例:分类任务、回归任务、排序任务

继续实践,将所学知识应用到实际项目中!

最后更新:2026-04-04