回归任务实战 #

房价预测实战 #

问题描述 #

text
┌─────────────────────────────────────────────────────────────┐
│                    房价预测任务                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  目标:预测房屋价格                                          │
│                                                              │
│  特征:                                                      │
│  - 房屋面积、房间数量                                        │
│  - 地理位置、建造年份                                        │
│  - 社区特征、交通便利性                                      │
│                                                              │
│  评估指标:                                                  │
│  - RMSE (均方根误差)                                         │
│  - MAE (平均绝对误差)                                        │
│  - R² (决定系数)                                             │
│                                                              │
└─────────────────────────────────────────────────────────────┘

完整代码实现 #

python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, r2_score,
    mean_absolute_percentage_error
)
from sklearn.preprocessing import StandardScaler, LabelEncoder
import matplotlib.pyplot as plt

# 1. 数据加载与探索
def load_and_explore_data():
    """加载并探索数据"""
    # 模拟房价数据
    np.random.seed(42)
    n_samples = 5000
    
    data = {
        'area': np.random.uniform(50, 300, n_samples),
        'bedrooms': np.random.randint(1, 6, n_samples),
        'bathrooms': np.random.randint(1, 4, n_samples),
        'age': np.random.randint(0, 50, n_samples),
        'floor': np.random.randint(1, 30, n_samples),
        'distance_subway': np.random.uniform(0.1, 5, n_samples),
        'distance_school': np.random.uniform(0.1, 3, n_samples),
        'district': np.random.choice(['A', 'B', 'C', 'D'], n_samples),
        'has_parking': np.random.randint(0, 2, n_samples),
        'has_elevator': np.random.randint(0, 2, n_samples),
    }
    
    df = pd.DataFrame(data)
    
    # 生成价格(基于特征的线性组合 + 噪声)
    df['price'] = (
        df['area'] * 500 +
        df['bedrooms'] * 50000 +
        df['bathrooms'] * 30000 +
        (50 - df['age']) * 2000 +
        df['floor'] * 5000 +
        (5 - df['distance_subway']) * 50000 +
        (3 - df['distance_school']) * 30000 +
        df['has_parking'] * 100000 +
        df['has_elevator'] * 50000 +
        np.random.normal(0, 100000, n_samples)
    )
    
    # 区域影响
    district_effect = {'A': 200000, 'B': 100000, 'C': 0, 'D': -50000}
    df['price'] += df['district'].map(district_effect)
    
    df['price'] = df['price'].clip(lower=100000)
    
    print("数据集信息:")
    print(f"  样本数: {len(df)}")
    print(f"  特征数: {len(df.columns) - 1}")
    print(f"\n价格统计:")
    print(df['price'].describe())
    
    return df

df = load_and_explore_data()

# 2. 数据预处理
def preprocess_data(df):
    """数据预处理"""
    df = df.copy()
    
    # 类别特征编码
    le = LabelEncoder()
    df['district_encoded'] = le.fit_transform(df['district'])
    
    # 特征工程
    df['area_per_bedroom'] = df['area'] / df['bedrooms']
    df['total_rooms'] = df['bedrooms'] + df['bathrooms']
    df['age_group'] = pd.cut(df['age'], bins=[0, 10, 20, 30, 50], labels=[0, 1, 2, 3])
    df['age_group'] = df['age_group'].astype(float)
    
    # 选择特征
    feature_cols = [
        'area', 'bedrooms', 'bathrooms', 'age', 'floor',
        'distance_subway', 'distance_school', 'district_encoded',
        'has_parking', 'has_elevator', 'area_per_bedroom',
        'total_rooms', 'age_group'
    ]
    
    X = df[feature_cols]
    y = df['price']
    
    # 划分数据
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    print(f"\n训练集: {len(X_train)} 样本")
    print(f"测试集: {len(X_test)} 样本")
    
    return X_train, X_test, y_train, y_test, feature_cols

X_train, X_test, y_train, y_test, feature_cols = preprocess_data(df)

# 3. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=feature_cols)
dtest = xgb.DMatrix(X_test, label=y_test, feature_names=feature_cols)

# 4. 参数配置
params = {
    'objective': 'reg:squarederror',
    'eval_metric': 'rmse',
    'max_depth': 6,
    'eta': 0.1,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'lambda': 1,
    'alpha': 0,
    'seed': 42
}

# 5. 训练模型
def train_model(params, dtrain, dtest):
    """训练模型"""
    evals_result = {}
    
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=500,
        evals=[(dtrain, 'train'), (dtest, 'test')],
        early_stopping_rounds=50,
        evals_result=evals_result,
        verbose_eval=20
    )
    
    print(f"\n最佳迭代: {model.best_iteration}")
    
    return model, evals_result

model, evals_result = train_model(params, dtrain, dtest)

# 6. 模型评估
def evaluate_model(model, dtest, y_test):
    """评估模型"""
    y_pred = model.predict(dtest)
    
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    mape = mean_absolute_percentage_error(y_test, y_pred)
    
    print("\n" + "="*50)
    print("模型评估结果")
    print("="*50)
    print(f"\nMSE: {mse:.2f}")
    print(f"RMSE: {rmse:.2f}")
    print(f"MAE: {mae:.2f}")
    print(f"R²: {r2:.4f}")
    print(f"MAPE: {mape:.4f}")
    
    return y_pred

y_pred = evaluate_model(model, dtest, y_test)

# 7. 可视化
def plot_results(evals_result, y_test, y_pred):
    """可视化结果"""
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    
    # 训练曲线
    axes[0, 0].plot(evals_result['train']['rmse'], label='Train')
    axes[0, 0].plot(evals_result['test']['rmse'], label='Test')
    axes[0, 0].set_xlabel('Round')
    axes[0, 0].set_ylabel('RMSE')
    axes[0, 0].set_title('Training RMSE')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # 预测 vs 实际
    axes[0, 1].scatter(y_test, y_pred, alpha=0.5)
    axes[0, 1].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
    axes[0, 1].set_xlabel('Actual Price')
    axes[0, 1].set_ylabel('Predicted Price')
    axes[0, 1].set_title('Predicted vs Actual')
    axes[0, 1].grid(True)
    
    # 残差分布
    residuals = y_test - y_pred
    axes[1, 0].hist(residuals, bins=50, edgecolor='black')
    axes[1, 0].set_xlabel('Residual')
    axes[1, 0].set_ylabel('Frequency')
    axes[1, 0].set_title('Residual Distribution')
    axes[1, 0].axvline(x=0, color='r', linestyle='--')
    
    # 残差 vs 预测值
    axes[1, 1].scatter(y_pred, residuals, alpha=0.5)
    axes[1, 1].axhline(y=0, color='r', linestyle='--')
    axes[1, 1].set_xlabel('Predicted Price')
    axes[1, 1].set_ylabel('Residual')
    axes[1, 1].set_title('Residual vs Predicted')
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()

plot_results(evals_result, y_test, y_pred)

# 8. 特征重要性
plt.figure(figsize=(10, 6))
xgb.plot_importance(model, max_num_features=10, importance_type='gain')
plt.title('Feature Importance (Gain)')
plt.tight_layout()
plt.show()

# 9. 保存模型
model.save_model('house_price_model.json')
print("\n模型已保存为 house_price_model.json")

时间序列预测实战 #

python
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# 1. 创建时间序列数据
def create_time_series_data(n_samples=1000):
    """创建时间序列数据"""
    dates = pd.date_range('2020-01-01', periods=n_samples, freq='D')
    
    # 趋势
    trend = np.linspace(100, 200, n_samples)
    
    # 季节性
    seasonality = 20 * np.sin(2 * np.pi * np.arange(n_samples) / 365)
    
    # 周期性(周)
    weekly = 10 * np.sin(2 * np.pi * np.arange(n_samples) / 7)
    
    # 噪声
    noise = np.random.normal(0, 5, n_samples)
    
    # 销量
    sales = trend + seasonality + weekly + noise
    
    df = pd.DataFrame({
        'date': dates,
        'sales': sales
    })
    
    return df

df = create_time_series_data()

# 2. 特征工程
def create_time_features(df):
    """创建时间特征"""
    df = df.copy()
    
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['day'] = df['date'].dt.day
    df['dayofweek'] = df['date'].dt.dayofweek
    df['dayofyear'] = df['date'].dt.dayofyear
    df['weekofyear'] = df['date'].dt.isocalendar().week.astype(int)
    df['quarter'] = df['date'].dt.quarter
    df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
    
    # 滞后特征
    for lag in [1, 7, 14, 30]:
        df[f'sales_lag_{lag}'] = df['sales'].shift(lag)
    
    # 滚动特征
    for window in [7, 14, 30]:
        df[f'sales_rolling_mean_{window}'] = df['sales'].rolling(window=window).mean()
        df[f'sales_rolling_std_{window}'] = df['sales'].rolling(window=window).std()
    
    # 差分特征
    df['sales_diff_1'] = df['sales'].diff(1)
    df['sales_diff_7'] = df['sales'].diff(7)
    
    df = df.dropna()
    
    return df

df_features = create_time_features(df)

# 3. 划分数据
feature_cols = [col for col in df_features.columns if col not in ['date', 'sales']]
X = df_features[feature_cols]
y = df_features['sales']

# 时间序列划分(不用随机划分)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# 4. 训练模型
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

params = {
    'objective': 'reg:squarederror',
    'eval_metric': 'rmse',
    'max_depth': 6,
    'eta': 0.1
}

model = xgb.train(
    params,
    dtrain,
    num_boost_round=200,
    evals=[(dtrain, 'train'), (dtest, 'test')],
    early_stopping_rounds=20,
    verbose_eval=20
)

# 5. 评估
y_pred = model.predict(dtest)

print(f"\nRMSE: {np.sqrt(mean_squared_error(y_test, y_pred)):.2f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred):.2f}")

# 6. 可视化预测结果
plt.figure(figsize=(14, 6))
plt.plot(y_test.values, label='Actual', alpha=0.7)
plt.plot(y_pred, label='Predicted', alpha=0.7)
plt.xlabel('Time')
plt.ylabel('Sales')
plt.title('Sales Prediction')
plt.legend()
plt.grid(True)
plt.show()

分位数回归实战 #

python
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# 自定义分位数损失
def quantile_loss(quantile):
    def objective(preds, dtrain):
        labels = dtrain.get_label()
        residual = labels - preds
        
        grad = np.where(residual > 0, -quantile, 1 - quantile)
        hess = np.ones_like(residual)
        
        return grad, hess
    return objective

# 创建数据
np.random.seed(42)
X = np.random.rand(1000, 10)
y = X[:, 0] * 10 + X[:, 1] * 5 + np.random.randn(1000) * (1 + X[:, 2])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

params = {'max_depth': 6, 'eta': 0.1}

# 训练不同分位数模型
quantiles = [0.1, 0.5, 0.9]
predictions = {}

for q in quantiles:
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=100,
        obj=quantile_loss(q)
    )
    predictions[q] = model.predict(dtest)

# 可视化
plt.figure(figsize=(12, 6))

sorted_idx = np.argsort(y_test)
y_sorted = y_test[sorted_idx]

plt.fill_between(
    range(len(y_sorted)),
    predictions[0.1][sorted_idx],
    predictions[0.9][sorted_idx],
    alpha=0.3,
    label='80% Prediction Interval'
)
plt.plot(y_sorted, 'k.', alpha=0.5, label='Actual')
plt.plot(predictions[0.5][sorted_idx], 'r-', label='Median Prediction')

plt.xlabel('Sample')
plt.ylabel('Value')
plt.title('Quantile Regression - Prediction Intervals')
plt.legend()
plt.grid(True)
plt.show()

回归任务最佳实践 #

python
def regression_best_practices():
    """
    回归任务最佳实践
    """
    practices = {
        '数据准备': [
            '处理异常值',
            '特征缩放(可选)',
            '创建有意义的特征'
        ],
        '模型训练': [
            '选择合适的损失函数',
            '使用早停防止过拟合',
            '交叉验证评估'
        ],
        '模型评估': [
            '使用多个评估指标',
            '分析残差分布',
            '检查预测偏差'
        ],
        '模型优化': [
            '尝试不同的目标函数',
            '调整正则化参数',
            '集成多个模型'
        ]
    }
    
    for category, items in practices.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  • {item}")

regression_best_practices()

下一步 #

现在你已经掌握了回归任务实战,接下来学习 排序任务 了解 XGBoost 在排序问题中的应用!

最后更新:2026-04-04