数据准备 #
DMatrix 数据结构 #
DMatrix 是 XGBoost 的核心数据结构,针对内存效率和训练速度进行了优化。
DMatrix 特点 #
text
┌─────────────────────────────────────────────────────────────┐
│ DMatrix 特点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 内存优化 │
│ - 稀疏矩阵存储 │
│ - 数据压缩 │
│ - 外部内存支持 │
│ │
│ 2. 预计算 │
│ - 预排序特征值 │
│ - 预计算分裂信息 │
│ - 缓存优化 │
│ │
│ 3. 灵活性 │
│ - 支持多种数据格式 │
│ - 支持样本权重 │
│ - 支持特征名称 │
│ │
└─────────────────────────────────────────────────────────────┘
创建 DMatrix #
python
import xgboost as xgb
import numpy as np
import pandas as pd
from scipy import sparse
# 1. 从 NumPy 数组创建
X = np.random.rand(100, 10)
y = np.random.randint(0, 2, 100)
dtrain = xgb.DMatrix(X, label=y)
# 2. 从 Pandas DataFrame 创建
df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(10)])
dtrain = xgb.DMatrix(df, label=y)
# 3. 从稀疏矩阵创建
X_sparse = sparse.csr_matrix(X)
dtrain = xgb.DMatrix(X_sparse, label=y)
# 4. 从文件创建
dtrain = xgb.DMatrix('data.csv?format=csv')
# 5. 从 LibSVM 格式创建
dtrain = xgb.DMatrix('data.libsvm')
DMatrix 参数 #
python
dtrain = xgb.DMatrix(
data=X, # 特征数据
label=y, # 标签
weight=None, # 样本权重
base_margin=None, # 初始预测值
missing=None, # 缺失值标识(默认 np.nan)
silent=False, # 是否静默
feature_names=None, # 特征名称
feature_types=None, # 特征类型
nthread=None # 线程数
)
DMatrix 操作 #
python
# 获取基本信息
print(f"样本数: {dtrain.num_row()}")
print(f"特征数: {dtrain.num_features()}")
print(f"特征名称: {dtrain.feature_names}")
# 获取标签
labels = dtrain.get_label()
# 获取权重
weights = dtrain.get_weight()
# 设置权重
dtrain.set_weight(sample_weights)
# 设置特征名称
dtrain.set_feature_names([f'f{i}' for i in range(10)])
# 保存 DMatrix
dtrain.save_binary('train.buffer')
# 加载 DMatrix
dtrain_loaded = xgb.DMatrix('train.buffer')
# 切片
dtrain_subset = dtrain.slice([0, 1, 2, 3, 4])
数据格式转换 #
CSV 格式 #
python
import pandas as pd
# 读取 CSV
df = pd.read_csv('data.csv')
# 分离特征和标签
X = df.drop('target', axis=1)
y = df['target']
# 创建 DMatrix
dtrain = xgb.DMatrix(X, label=y)
# 保存为 CSV
df.to_csv('output.csv', index=False)
# 直接从 CSV 创建
dtrain = xgb.DMatrix('data.csv?format=csv&label_column=target')
LibSVM 格式 #
text
# LibSVM 格式
# <label> <index1>:<value1> <index2>:<value2> ...
1 1:0.1 2:0.5 5:0.3
0 1:0.2 3:0.8 4:0.1
1 2:0.4 3:0.6 5:0.9
python
# 从 LibSVM 文件创建
dtrain = xgb.DMatrix('data.libsvm')
# 保存为 LibSVM 格式
dtrain.save_binary('data.buffer')
稀疏矩阵 #
python
from scipy import sparse
# 创建稀疏矩阵
X_sparse = sparse.random(1000, 100, density=0.1, format='csr')
# 不同格式
X_csr = sparse.csr_matrix(X) # 压缩稀疏行
X_csc = sparse.csc_matrix(X) # 压缩稀疏列
X_coo = sparse.coo_matrix(X) # 坐标格式
# 从稀疏矩阵创建 DMatrix
dtrain = xgb.DMatrix(X_sparse, label=y)
缺失值处理 #
XGBoost 的缺失值处理 #
XGBoost 自动处理缺失值,会学习缺失值的最优分裂方向。
python
import numpy as np
import xgboost as xgb
# 创建包含缺失值的数据
X = np.array([
[1.0, 2.0, np.nan],
[4.0, np.nan, 6.0],
[7.0, 8.0, 9.0],
[np.nan, 2.0, 3.0]
])
y = np.array([0, 1, 0, 1])
# 创建 DMatrix(自动处理缺失值)
dtrain = xgb.DMatrix(X, label=y)
# 指定缺失值标识
dtrain = xgb.DMatrix(X, label=y, missing=np.nan)
# 训练模型
params = {'objective': 'binary:logistic'}
model = xgb.train(params, dtrain, num_boost_round=10)
缺失值策略 #
python
import pandas as pd
import numpy as np
# 检查缺失值
def check_missing_values(df):
missing = df.isnull().sum()
missing_pct = 100 * missing / len(df)
missing_table = pd.DataFrame({
'Missing Count': missing,
'Missing %': missing_pct
})
return missing_table[missing_table['Missing Count'] > 0]
# 处理策略
def handle_missing_values(df, strategy='auto'):
if strategy == 'auto':
# 让 XGBoost 自动处理
return df
elif strategy == 'mean':
return df.fillna(df.mean())
elif strategy == 'median':
return df.fillna(df.median())
elif strategy == 'mode':
return df.fillna(df.mode().iloc[0])
elif strategy == 'drop':
return df.dropna()
elif strategy == 'special':
# 使用特殊值填充
return df.fillna(-999)
特征编码 #
类别特征编码 #
python
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
# 示例数据
df = pd.DataFrame({
'color': ['red', 'blue', 'green', 'red', 'blue'],
'size': ['S', 'M', 'L', 'M', 'S'],
'price': [10, 20, 30, 25, 15],
'target': [0, 1, 1, 0, 1]
})
# 1. 标签编码
le = LabelEncoder()
df['color_encoded'] = le.fit_transform(df['color'])
# 2. One-Hot 编码
df_onehot = pd.get_dummies(df, columns=['color', 'size'])
# 3. 目标编码(Target Encoding)
def target_encode(df, column, target):
means = df.groupby(column)[target].mean()
return df[column].map(means)
df['color_target'] = target_encode(df, 'color', 'target')
# 4. 频率编码
def frequency_encode(df, column):
freq = df[column].value_counts() / len(df)
return df[column].map(freq)
df['color_freq'] = frequency_encode(df, 'color')
XGBoost 的类别特征处理 #
python
import xgboost as xgb
# XGBoost 1.3+ 支持原生类别特征
params = {
'objective': 'binary:logistic',
'enable_categorical': True, # 启用类别特征
'max_depth': 6
}
# 需要将类别列转换为 category 类型
df['color'] = df['color'].astype('category')
df['size'] = df['size'].astype('category')
dtrain = xgb.DMatrix(df.drop('target', axis=1), label=df['target'])
model = xgb.train(params, dtrain, num_boost_round=100)
数值特征处理 #
python
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 归一化
minmax = MinMaxScaler()
X_normalized = minmax.fit_transform(X)
# 鲁棒缩放(对异常值不敏感)
robust = RobustScaler()
X_robust = robust.fit_transform(X)
# 注意:XGBoost 对特征缩放不敏感,通常不需要
特征工程 #
特征创建 #
python
import pandas as pd
import numpy as np
# 示例数据
df = pd.DataFrame({
'price': [100, 200, 150, 300, 250],
'quantity': [10, 5, 8, 3, 6],
'date': pd.date_range('2024-01-01', periods=5)
})
# 1. 交互特征
df['total'] = df['price'] * df['quantity']
df['price_per_unit'] = df['price'] / df['quantity']
# 2. 统计特征
df['price_log'] = np.log1p(df['price'])
df['price_sqrt'] = np.sqrt(df['price'])
# 3. 时间特征
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['dayofweek'] = df['date'].dt.dayofweek
df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
# 4. 滚动特征
df['price_rolling_mean'] = df['price'].rolling(window=3).mean()
df['price_rolling_std'] = df['price'].rolling(window=3).std()
特征选择 #
python
import xgboost as xgb
import numpy as np
import matplotlib.pyplot as plt
# 训练模型获取特征重要性
def get_feature_importance(X, y, importance_type='gain'):
dtrain = xgb.DMatrix(X, label=y)
params = {'objective': 'binary:logistic', 'max_depth': 6}
model = xgb.train(params, dtrain, num_boost_round=100)
importance = model.get_score(importance_type=importance_type)
return importance
# 根据重要性选择特征
def select_features(X, y, threshold=0.01):
importance = get_feature_importance(X, y)
selected_features = [
feat for feat, imp in importance.items()
if imp > threshold
]
return selected_features
# 可视化特征重要性
def plot_feature_importance(model, max_num_features=20):
plt.figure(figsize=(10, 8))
xgb.plot_importance(model, max_num_features=max_num_features)
plt.tight_layout()
plt.show()
数据划分 #
训练集/验证集/测试集划分 #
python
from sklearn.model_selection import train_test_split
# 两层划分
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.4, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)
print(f"训练集: {X_train.shape[0]}")
print(f"验证集: {X_val.shape[0]}")
print(f"测试集: {X_test.shape[0]}")
# 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
dtest = xgb.DMatrix(X_test, label=y_test)
时间序列划分 #
python
from sklearn.model_selection import TimeSeriesSplit
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
print(f"Fold {fold}: Train={len(train_idx)}, Val={len(val_idx)}")
样本权重 #
设置样本权重 #
python
import numpy as np
# 创建样本权重
sample_weights = np.random.rand(len(y_train))
# 方法1:创建 DMatrix 时设置
dtrain = xgb.DMatrix(X_train, label=y_train, weight=sample_weights)
# 方法2:使用 set_weight
dtrain = xgb.DMatrix(X_train, label=y_train)
dtrain.set_weight(sample_weights)
# 类别不平衡时设置权重
class_weights = {0: 1.0, 1: 5.0} # 少数类权重更高
sample_weights = np.array([class_weights[y] for y in y_train])
dtrain.set_weight(sample_weights)
处理类别不平衡 #
python
from sklearn.utils.class_weight import compute_sample_weight
# 自动计算类别权重
sample_weights = compute_sample_weight('balanced', y_train)
# 或使用 scale_pos_weight 参数
params = {
'objective': 'binary:logistic',
'scale_pos_weight': len(y_train[y_train==0]) / len(y_train[y_train==1])
}
外部内存 #
处理大数据集 #
python
# 当数据太大无法全部加载到内存时
# 方法1:使用外部内存
dtrain = xgb.DMatrix(
'data.csv?format=csv',
# 缓存文件前缀
cache_prefix='cache_'
)
# 方法2:分块加载
import pandas as pd
chunk_size = 10000
chunks = pd.read_csv('large_data.csv', chunksize=chunk_size)
for i, chunk in enumerate(chunks):
X_chunk = chunk.drop('target', axis=1)
y_chunk = chunk['target']
dtrain_chunk = xgb.DMatrix(X_chunk, label=y_chunk)
dtrain_chunk.save_binary(f'chunk_{i}.buffer')
# 方法3:使用迭代器
class DataIterator:
def __init__(self, file_path, chunk_size=10000):
self.file_path = file_path
self.chunk_size = chunk_size
def __iter__(self):
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
X = chunk.drop('target', axis=1)
y = chunk['target']
yield xgb.DMatrix(X, label=y)
# 使用迭代器训练
dtrain = xgb.DMatrix(DataIterator('large_data.csv'))
数据准备最佳实践 #
python
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
def prepare_data(df, target_column, test_size=0.2, random_state=42):
"""
数据准备完整流程
"""
# 1. 分离特征和标签
X = df.drop(target_column, axis=1)
y = df[target_column]
# 2. 处理类别特征
categorical_cols = X.select_dtypes(include=['object']).columns
for col in categorical_cols:
X[col] = X[col].astype('category')
# 3. 划分数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=y
)
# 4. 创建 DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train, enable_categorical=True)
dtest = xgb.DMatrix(X_test, label=y_test, enable_categorical=True)
return dtrain, dtest, X_train, X_test, y_train, y_test
# 使用示例
df = pd.read_csv('data.csv')
dtrain, dtest, X_train, X_test, y_train, y_test = prepare_data(df, 'target')
下一步 #
现在你已经了解了数据准备,接下来学习 参数配置 掌握 XGBoost 的参数调优!
最后更新:2026-04-04