回归任务实战 #
房价预测实战 #
问题描述 #
text
┌─────────────────────────────────────────────────────────────┐
│ 房价预测任务 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 目标:预测房屋价格 │
│ │
│ 特征: │
│ - 房屋面积、房间数量 │
│ - 地理位置、建造年份 │
│ - 社区特征、交通便利性 │
│ │
│ 评估指标: │
│ - RMSE (均方根误差) │
│ - MAE (平均绝对误差) │
│ - R² (决定系数) │
│ │
└─────────────────────────────────────────────────────────────┘
完整代码实现 #
python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
mean_squared_error, mean_absolute_error, r2_score,
mean_absolute_percentage_error
)
from sklearn.preprocessing import StandardScaler, LabelEncoder
import matplotlib.pyplot as plt
# 1. 数据加载与探索
def load_and_explore_data():
"""加载并探索数据"""
# 模拟房价数据
np.random.seed(42)
n_samples = 5000
data = {
'area': np.random.uniform(50, 300, n_samples),
'bedrooms': np.random.randint(1, 6, n_samples),
'bathrooms': np.random.randint(1, 4, n_samples),
'age': np.random.randint(0, 50, n_samples),
'floor': np.random.randint(1, 30, n_samples),
'distance_subway': np.random.uniform(0.1, 5, n_samples),
'distance_school': np.random.uniform(0.1, 3, n_samples),
'district': np.random.choice(['A', 'B', 'C', 'D'], n_samples),
'has_parking': np.random.randint(0, 2, n_samples),
'has_elevator': np.random.randint(0, 2, n_samples),
}
df = pd.DataFrame(data)
# 生成价格(基于特征的线性组合 + 噪声)
df['price'] = (
df['area'] * 500 +
df['bedrooms'] * 50000 +
df['bathrooms'] * 30000 +
(50 - df['age']) * 2000 +
df['floor'] * 5000 +
(5 - df['distance_subway']) * 50000 +
(3 - df['distance_school']) * 30000 +
df['has_parking'] * 100000 +
df['has_elevator'] * 50000 +
np.random.normal(0, 100000, n_samples)
)
# 区域影响
district_effect = {'A': 200000, 'B': 100000, 'C': 0, 'D': -50000}
df['price'] += df['district'].map(district_effect)
df['price'] = df['price'].clip(lower=100000)
print("数据集信息:")
print(f" 样本数: {len(df)}")
print(f" 特征数: {len(df.columns) - 1}")
print(f"\n价格统计:")
print(df['price'].describe())
return df
df = load_and_explore_data()
# 2. 数据预处理
def preprocess_data(df):
"""数据预处理"""
df = df.copy()
# 类别特征编码
le = LabelEncoder()
df['district_encoded'] = le.fit_transform(df['district'])
# 特征工程
df['area_per_bedroom'] = df['area'] / df['bedrooms']
df['total_rooms'] = df['bedrooms'] + df['bathrooms']
df['age_group'] = pd.cut(df['age'], bins=[0, 10, 20, 30, 50], labels=[0, 1, 2, 3])
df['age_group'] = df['age_group'].astype(float)
# 选择特征
feature_cols = [
'area', 'bedrooms', 'bathrooms', 'age', 'floor',
'distance_subway', 'distance_school', 'district_encoded',
'has_parking', 'has_elevator', 'area_per_bedroom',
'total_rooms', 'age_group'
]
X = df[feature_cols]
y = df['price']
# 划分数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"\n训练集: {len(X_train)} 样本")
print(f"测试集: {len(X_test)} 样本")
return X_train, X_test, y_train, y_test, feature_cols
X_train, X_test, y_train, y_test, feature_cols = preprocess_data(df)
# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=feature_cols)
dtest = xgb.DMatrix(X_test, label=y_test, feature_names=feature_cols)
# 4. 参数配置
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'max_depth': 6,
'eta': 0.1,
'subsample': 0.8,
'colsample_bytree': 0.8,
'lambda': 1,
'alpha': 0,
'seed': 42
}
# 5. 训练模型
def train_model(params, dtrain, dtest):
"""训练模型"""
evals_result = {}
model = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, 'train'), (dtest, 'test')],
early_stopping_rounds=50,
evals_result=evals_result,
verbose_eval=20
)
print(f"\n最佳迭代: {model.best_iteration}")
return model, evals_result
model, evals_result = train_model(params, dtrain, dtest)
# 6. 模型评估
def evaluate_model(model, dtest, y_test):
"""评估模型"""
y_pred = model.predict(dtest)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
print("\n" + "="*50)
print("模型评估结果")
print("="*50)
print(f"\nMSE: {mse:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R²: {r2:.4f}")
print(f"MAPE: {mape:.4f}")
return y_pred
y_pred = evaluate_model(model, dtest, y_test)
# 7. 可视化
def plot_results(evals_result, y_test, y_pred):
"""可视化结果"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 训练曲线
axes[0, 0].plot(evals_result['train']['rmse'], label='Train')
axes[0, 0].plot(evals_result['test']['rmse'], label='Test')
axes[0, 0].set_xlabel('Round')
axes[0, 0].set_ylabel('RMSE')
axes[0, 0].set_title('Training RMSE')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 预测 vs 实际
axes[0, 1].scatter(y_test, y_pred, alpha=0.5)
axes[0, 1].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
axes[0, 1].set_xlabel('Actual Price')
axes[0, 1].set_ylabel('Predicted Price')
axes[0, 1].set_title('Predicted vs Actual')
axes[0, 1].grid(True)
# 残差分布
residuals = y_test - y_pred
axes[1, 0].hist(residuals, bins=50, edgecolor='black')
axes[1, 0].set_xlabel('Residual')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Residual Distribution')
axes[1, 0].axvline(x=0, color='r', linestyle='--')
# 残差 vs 预测值
axes[1, 1].scatter(y_pred, residuals, alpha=0.5)
axes[1, 1].axhline(y=0, color='r', linestyle='--')
axes[1, 1].set_xlabel('Predicted Price')
axes[1, 1].set_ylabel('Residual')
axes[1, 1].set_title('Residual vs Predicted')
axes[1, 1].grid(True)
plt.tight_layout()
plt.show()
plot_results(evals_result, y_test, y_pred)
# 8. 特征重要性
plt.figure(figsize=(10, 6))
xgb.plot_importance(model, max_num_features=10, importance_type='gain')
plt.title('Feature Importance (Gain)')
plt.tight_layout()
plt.show()
# 9. 保存模型
model.save_model('house_price_model.json')
print("\n模型已保存为 house_price_model.json")
时间序列预测实战 #
python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
# 1. 创建时间序列数据
def create_time_series_data(n_samples=1000):
"""创建时间序列数据"""
dates = pd.date_range('2020-01-01', periods=n_samples, freq='D')
# 趋势
trend = np.linspace(100, 200, n_samples)
# 季节性
seasonality = 20 * np.sin(2 * np.pi * np.arange(n_samples) / 365)
# 周期性(周)
weekly = 10 * np.sin(2 * np.pi * np.arange(n_samples) / 7)
# 噪声
noise = np.random.normal(0, 5, n_samples)
# 销量
sales = trend + seasonality + weekly + noise
df = pd.DataFrame({
'date': dates,
'sales': sales
})
return df
df = create_time_series_data()
# 2. 特征工程
def create_time_features(df):
"""创建时间特征"""
df = df.copy()
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['dayofweek'] = df['date'].dt.dayofweek
df['dayofyear'] = df['date'].dt.dayofyear
df['weekofyear'] = df['date'].dt.isocalendar().week.astype(int)
df['quarter'] = df['date'].dt.quarter
df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
# 滞后特征
for lag in [1, 7, 14, 30]:
df[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# 滚动特征
for window in [7, 14, 30]:
df[f'sales_rolling_mean_{window}'] = df['sales'].rolling(window=window).mean()
df[f'sales_rolling_std_{window}'] = df['sales'].rolling(window=window).std()
# 差分特征
df['sales_diff_1'] = df['sales'].diff(1)
df['sales_diff_7'] = df['sales'].diff(7)
df = df.dropna()
return df
df_features = create_time_features(df)
# 3. 划分数据
feature_cols = [col for col in df_features.columns if col not in ['date', 'sales']]
X = df_features[feature_cols]
y = df_features['sales']
# 时间序列划分(不用随机划分)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 4. 训练模型
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'max_depth': 6,
'eta': 0.1
}
model = xgb.train(
params,
dtrain,
num_boost_round=200,
evals=[(dtrain, 'train'), (dtest, 'test')],
early_stopping_rounds=20,
verbose_eval=20
)
# 5. 评估
y_pred = model.predict(dtest)
print(f"\nRMSE: {np.sqrt(mean_squared_error(y_test, y_pred)):.2f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred):.2f}")
# 6. 可视化预测结果
plt.figure(figsize=(14, 6))
plt.plot(y_test.values, label='Actual', alpha=0.7)
plt.plot(y_pred, label='Predicted', alpha=0.7)
plt.xlabel('Time')
plt.ylabel('Sales')
plt.title('Sales Prediction')
plt.legend()
plt.grid(True)
plt.show()
分位数回归实战 #
python
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
# 自定义分位数损失
def quantile_loss(quantile):
def objective(preds, dtrain):
labels = dtrain.get_label()
residual = labels - preds
grad = np.where(residual > 0, -quantile, 1 - quantile)
hess = np.ones_like(residual)
return grad, hess
return objective
# 创建数据
np.random.seed(42)
X = np.random.rand(1000, 10)
y = X[:, 0] * 10 + X[:, 1] * 5 + np.random.randn(1000) * (1 + X[:, 2])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
params = {'max_depth': 6, 'eta': 0.1}
# 训练不同分位数模型
quantiles = [0.1, 0.5, 0.9]
predictions = {}
for q in quantiles:
model = xgb.train(
params,
dtrain,
num_boost_round=100,
obj=quantile_loss(q)
)
predictions[q] = model.predict(dtest)
# 可视化
plt.figure(figsize=(12, 6))
sorted_idx = np.argsort(y_test)
y_sorted = y_test[sorted_idx]
plt.fill_between(
range(len(y_sorted)),
predictions[0.1][sorted_idx],
predictions[0.9][sorted_idx],
alpha=0.3,
label='80% Prediction Interval'
)
plt.plot(y_sorted, 'k.', alpha=0.5, label='Actual')
plt.plot(predictions[0.5][sorted_idx], 'r-', label='Median Prediction')
plt.xlabel('Sample')
plt.ylabel('Value')
plt.title('Quantile Regression - Prediction Intervals')
plt.legend()
plt.grid(True)
plt.show()
回归任务最佳实践 #
python
def regression_best_practices():
"""
回归任务最佳实践
"""
practices = {
'数据准备': [
'处理异常值',
'特征缩放(可选)',
'创建有意义的特征'
],
'模型训练': [
'选择合适的损失函数',
'使用早停防止过拟合',
'交叉验证评估'
],
'模型评估': [
'使用多个评估指标',
'分析残差分布',
'检查预测偏差'
],
'模型优化': [
'尝试不同的目标函数',
'调整正则化参数',
'集成多个模型'
]
}
for category, items in practices.items():
print(f"\n{category}:")
for item in items:
print(f" • {item}")
regression_best_practices()
下一步 #
现在你已经掌握了回归任务实战,接下来学习 排序任务 了解 XGBoost 在排序问题中的应用!
最后更新:2026-04-04