排序任务实战 #
排序任务概述 #
什么是 Learning to Rank? #
text
┌─────────────────────────────────────────────────────────────┐
│ Learning to Rank │
├─────────────────────────────────────────────────────────────┤
│ │
│ 目标:学习一个排序函数,对文档/商品进行排序 │
│ │
│ 应用场景: │
│ - 搜索引擎结果排序 │
│ - 推荐系统商品排序 │
│ - 广告排序 │
│ - 问答系统答案排序 │
│ │
│ 方法分类: │
│ - Pointwise:独立预测每个样本 │
│ - Pairwise:预测样本对的关系 │
│ - Listwise:直接优化排序列表 │
│ │
└─────────────────────────────────────────────────────────────┘
排序评估指标 #
python
import numpy as np
def dcg_at_k(relevance, k):
"""
计算 DCG@K (Discounted Cumulative Gain)
DCG@K = Σ (2^rel_i - 1) / log2(i + 1)
"""
relevance = np.array(relevance)[:k]
if relevance.size == 0:
return 0.0
gains = 2 ** relevance - 1
discounts = np.log2(np.arange(len(relevance)) + 2)
return np.sum(gains / discounts)
def ndcg_at_k(relevance, k):
"""
计算 NDCG@K (Normalized DCG)
NDCG@K = DCG@K / IDCG@K
"""
dcg = dcg_at_k(relevance, k)
ideal_relevance = sorted(relevance, reverse=True)
idcg = dcg_at_k(ideal_relevance, k)
if idcg == 0:
return 0.0
return dcg / idcg
# 示例
relevance = [3, 2, 3, 0, 1, 2] # 相关性分数
print(f"NDCG@3: {ndcg_at_k(relevance, 3):.4f}")
print(f"NDCG@5: {ndcg_at_k(relevance, 5):.4f}")
搜索排序实战 #
数据准备 #
python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import GroupKFold
# 1. 创建模拟数据
def create_ranking_data(n_queries=1000, n_docs_per_query=10):
"""创建排序数据"""
np.random.seed(42)
data = []
for query_id in range(n_queries):
# 每个查询有 n_docs_per_query 个文档
for doc_id in range(n_docs_per_query):
# 特征
features = {
'query_id': query_id,
'doc_id': doc_id,
'title_match': np.random.uniform(0, 1),
'content_match': np.random.uniform(0, 1),
'pagerank': np.random.uniform(0, 10),
'url_length': np.random.randint(10, 100),
'click_rate': np.random.uniform(0, 0.5),
'freshness': np.random.randint(0, 365),
}
# 生成相关性分数(基于特征的组合)
relevance = (
features['title_match'] * 3 +
features['content_match'] * 2 +
features['pagerank'] * 0.5 +
features['click_rate'] * 2 -
features['url_length'] * 0.01 -
features['freshness'] * 0.001 +
np.random.normal(0, 0.5)
)
# 离散化为 0-4 级
relevance = int(np.clip(relevance, 0, 4))
features['relevance'] = relevance
data.append(features)
return pd.DataFrame(data)
df = create_ranking_data()
print("数据集信息:")
print(f" 查询数: {df['query_id'].nunique()}")
print(f" 文档数: {len(df)}")
print(f"\n相关性分布:")
print(df['relevance'].value_counts().sort_index())
模型训练 #
python
# 2. 准备数据
feature_cols = ['title_match', 'content_match', 'pagerank',
'url_length', 'click_rate', 'freshness']
X = df[feature_cols].values
y = df['relevance'].values
groups = df['query_id'].values
# 按查询分组划分
unique_queries = df['query_id'].unique()
train_queries = unique_queries[:800]
test_queries = unique_queries[800:]
train_mask = df['query_id'].isin(train_queries)
test_mask = df['query_id'].isin(test_queries)
X_train, X_test = X[train_mask], X[test_mask]
y_train, y_test = y[train_mask], y[test_mask]
groups_train = df[train_mask].groupby('query_id').size().values
groups_test = df[test_mask].groupby('query_id').size().values
# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# 设置组信息
dtrain.set_group(groups_train)
dtest.set_group(groups_test)
# 4. 参数配置
params = {
'objective': 'rank:ndcg', # NDCG 优化
'eval_metric': 'ndcg',
'max_depth': 6,
'eta': 0.1,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': 42
}
# 5. 训练模型
model = xgb.train(
params,
dtrain,
num_boost_round=200,
evals=[(dtrain, 'train'), (dtest, 'test')],
early_stopping_rounds=20,
verbose_eval=20
)
# 6. 预测
scores = model.predict(dtest)
# 7. 评估
def evaluate_ranking(df_test, scores, k=10):
"""评估排序性能"""
df_test = df_test.copy()
df_test['score'] = scores
ndcg_scores = []
for query_id in df_test['query_id'].unique():
query_data = df_test[df_test['query_id'] == query_id]
query_data = query_data.sort_values('score', ascending=False)
relevance = query_data['relevance'].tolist()
ndcg = ndcg_at_k(relevance, k)
ndcg_scores.append(ndcg)
return np.mean(ndcg_scores)
df_test = df[test_mask].copy()
ndcg_5 = evaluate_ranking(df_test, scores, k=5)
ndcg_10 = evaluate_ranking(df_test, scores, k=10)
print(f"\nNDCG@5: {ndcg_5:.4f}")
print(f"NDCG@10: {ndcg_10:.4f}")
可视化结果 #
python
import matplotlib.pyplot as plt
def plot_ranking_results(df_test, scores, n_queries=5):
"""可视化排序结果"""
df_test = df_test.copy()
df_test['score'] = scores
fig, axes = plt.subplots(1, n_queries, figsize=(15, 4))
for i, query_id in enumerate(df_test['query_id'].unique()[:n_queries]):
query_data = df_test[df_test['query_id'] == query_id]
query_data = query_data.sort_values('score', ascending=False)
axes[i].bar(range(len(query_data)), query_data['relevance'])
axes[i].set_xlabel('Rank')
axes[i].set_ylabel('Relevance')
axes[i].set_title(f'Query {query_id}')
plt.tight_layout()
plt.show()
plot_ranking_results(df_test, scores)
推荐排序实战 #
python
import numpy as np
import pandas as pd
import xgboost as xgb
# 1. 创建推荐数据
def create_recommendation_data(n_users=1000, n_items=500, n_interactions=50000):
"""创建推荐数据"""
np.random.seed(42)
data = []
for _ in range(n_interactions):
user_id = np.random.randint(0, n_users)
item_id = np.random.randint(0, n_items)
# 用户特征
user_features = {
f'user_feat_{i}': np.random.randn() for i in range(5)
}
# 物品特征
item_features = {
f'item_feat_{i}': np.random.randn() for i in range(5)
}
# 交互特征
interaction_features = {
'user_item_similarity': np.random.uniform(0, 1),
'historical_click_rate': np.random.uniform(0, 0.5),
}
# 生成评分
rating = (
user_features['user_feat_0'] * 0.5 +
item_features['item_feat_0'] * 0.5 +
interaction_features['user_item_similarity'] * 2 +
interaction_features['historical_click_rate'] * 3 +
np.random.normal(0, 0.5)
)
# 离散化
rating = int(np.clip(rating + 3, 1, 5))
row = {
'user_id': user_id,
'item_id': item_id,
**user_features,
**item_features,
**interaction_features,
'rating': rating
}
data.append(row)
return pd.DataFrame(data)
df = create_recommendation_data()
print("推荐数据集信息:")
print(f" 用户数: {df['user_id'].nunique()}")
print(f" 物品数: {df['item_id'].nunique()}")
print(f" 交互数: {len(df)}")
print(f"\n评分分布:")
print(df['rating'].value_counts().sort_index())
# 2. 准备数据
feature_cols = [col for col in df.columns if col not in ['user_id', 'item_id', 'rating']]
X = df[feature_cols].values
y = df['rating'].values
groups = df['user_id'].values
# 按用户分组
unique_users = df['user_id'].unique()
train_users = unique_users[:800]
test_users = unique_users[800:]
train_mask = df['user_id'].isin(train_users)
test_mask = df['user_id'].isin(test_users)
X_train, X_test = X[train_mask], X[test_mask]
y_train, y_test = y[train_mask], y[test_mask]
groups_train = df[train_mask].groupby('user_id').size().values
groups_test = df[test_mask].groupby('user_id').size().values
# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
dtrain.set_group(groups_train)
dtest.set_group(groups_test)
# 4. 训练
params = {
'objective': 'rank:pairwise', # 成对排序
'eval_metric': 'ndcg',
'max_depth': 6,
'eta': 0.1
}
model = xgb.train(params, dtrain, num_boost_round=100)
# 5. 预测和评估
scores = model.predict(dtest)
df_test = df[test_mask].copy()
df_test['score'] = scores
ndcg_5 = evaluate_ranking(df_test, scores, k=5)
ndcg_10 = evaluate_ranking(df_test, scores, k=10)
print(f"\nNDCG@5: {ndcg_5:.4f}")
print(f"NDCG@10: {ndcg_10:.4f}")
排序目标函数对比 #
python
def compare_ranking_objectives():
"""比较不同的排序目标函数"""
objectives = {
'rank:pairwise': {
'description': '成对排序损失',
'pros': ['简单有效', '训练稳定'],
'cons': ['不考虑位置', '计算复杂度高']
},
'rank:ndcg': {
'description': '直接优化 NDCG',
'pros': ['直接优化目标指标', '考虑位置'],
'cons': ['计算复杂', '可能不稳定']
},
'rank:map': {
'description': '优化 MAP',
'pros': ['适合二值相关性', '简单'],
'cons': ['不适用于多级相关性']
}
}
print("排序目标函数对比:\n")
for obj, info in objectives.items():
print(f"{obj}:")
print(f" 描述: {info['description']}")
print(f" 优点: {', '.join(info['pros'])}")
print(f" 缺点: {', '.join(info['cons'])}")
print()
compare_ranking_objectives()
排序任务最佳实践 #
python
def ranking_best_practices():
"""
排序任务最佳实践
"""
practices = {
'数据准备': [
'正确设置组信息',
'确保同一组数据连续存储',
'处理稀疏特征'
],
'模型训练': [
'选择合适的目标函数',
'使用早停策略',
'监控 NDCG 指标'
],
'模型评估': [
'使用 NDCG/MAP 评估',
'按组评估性能',
'分析不同位置的准确率'
],
'模型优化': [
'尝试不同的目标函数',
'调整特征工程',
'结合业务规则'
]
}
for category, items in practices.items():
print(f"\n{category}:")
for item in items:
print(f" • {item}")
ranking_best_practices()
总结 #
恭喜你完成了 XGBoost 完全指南的学习!现在你已经掌握了:
- 基础入门:XGBoost 简介、安装配置、第一个模型
- 核心概念:梯度提升原理、决策树基础、目标函数
- 模型训练:数据准备、参数配置、训练与评估
- 进阶功能:特征工程、调参技巧、交叉验证
- 高级应用:分布式训练、GPU 加速、自定义目标函数
- 实战案例:分类任务、回归任务、排序任务
继续实践,将所学知识应用到实际项目中!
最后更新:2026-04-04