分布式训练 #
分布式训练概述 #
为什么需要分布式训练? #
text
┌─────────────────────────────────────────────────────────────┐
│ 分布式训练需求 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 数据规模挑战: │
│ - 单机内存不足 │
│ - 训练时间过长 │
│ - 需要处理 TB 级数据 │
│ │
│ 分布式解决方案: │
│ - 数据并行:分割数据到多个节点 │
│ - 特征并行:分割特征到多个节点 │
│ - 混合并行:结合数据和特征并行 │
│ │
└─────────────────────────────────────────────────────────────┘
XGBoost 分布式架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ XGBoost 分布式架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Driver │ │
│ │ (协调节点) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ┌────┴────┐ ┌────┴────┐ ┌────┴────┐ │
│ │ Worker1 │ │ Worker2 │ │ Worker3 │ │
│ │ 数据分片1│ │ 数据分片2│ │ 数据分片3│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 通信层:Rabit (Reliable AllReduce and Broadcast) │
│ │
└─────────────────────────────────────────────────────────────┘
Spark 集成 #
PySpark 安装 #
bash
pip install pyspark xgboost
Spark XGBoost 基本使用 #
python
from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier
# 创建 Spark Session
spark = SparkSession.builder \
.appName("XGBoost-Spark") \
.getOrCreate()
# 加载数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# 准备数据
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=[col for col in df.columns if col != 'label'],
outputCol='features'
)
df = assembler.transform(df)
# 创建分类器
classifier = SparkXGBClassifier(
num_estimators=100,
max_depth=6,
learning_rate=0.1,
label_col='label',
features_col='features'
)
# 训练模型
model = classifier.fit(df)
# 预测
predictions = model.transform(df)
predictions.show()
Spark 分布式参数 #
python
classifier = SparkXGBClassifier(
# XGBoost 参数
num_estimators=100,
max_depth=6,
learning_rate=0.1,
objective='binary:logistic',
# Spark 分布式参数
num_workers=4, # 工作节点数
use_gpu=False, # 是否使用 GPU
features_col='features',
label_col='label',
# 资源配置
max_depth=6,
min_child_weight=1
)
Spark Pipeline #
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from xgboost.spark import SparkXGBClassifier
# 创建 Pipeline
stages = []
# 类别特征编码
categorical_cols = ['category1', 'category2']
for col in categorical_cols:
indexer = StringIndexer(inputCol=col, outputCol=f'{col}_indexed')
encoder = OneHotEncoder(inputCol=f'{col}_indexed', outputCol=f'{col}_encoded')
stages.extend([indexer, encoder])
# 特征组装
numeric_cols = ['num1', 'num2', 'num3']
feature_cols = [f'{col}_encoded' for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
stages.append(assembler)
# XGBoost 分类器
xgb_classifier = SparkXGBClassifier(
num_estimators=100,
max_depth=6,
learning_rate=0.1,
label_col='label',
features_col='features'
)
stages.append(xgb_classifier)
# 创建 Pipeline
pipeline = Pipeline(stages=stages)
# 训练
model = pipeline.fit(df)
# 预测
predictions = model.transform(df)
Dask 集成 #
Dask 安装 #
bash
pip install dask xgboost dask-ml
Dask XGBoost 基本使用 #
python
import dask.dataframe as dd
from dask.distributed import Client
import xgboost as xgb
import dask_xgboost
# 创建 Dask 客户端
client = Client(n_workers=4)
# 读取数据
df = dd.read_csv('large_data.csv')
# 准备数据
X = df.drop('label', axis=1)
y = df['label']
# 创建 DMatrix
dtrain = xgb.dask.DaskDMatrix(
client,
X,
y
)
# 参数设置
params = {
'objective': 'binary:logistic',
'max_depth': 6,
'eta': 0.1,
'eval_metric': 'logloss'
}
# 训练模型
output = xgb.dask.train(
client,
params,
dtrain,
num_boost_round=100
)
model = output['booster']
history = output['history']
# 预测
predictions = xgb.dask.predict(client, model, dtrain)
Dask 分布式配置 #
python
from dask.distributed import Client, LocalCluster
# 本地集群
cluster = LocalCluster(
n_workers=4,
threads_per_worker=2,
memory_limit='4GB'
)
client = Client(cluster)
# 远程集群
from dask.distributed import Client
client = Client('scheduler-address:8786')
# 训练
output = xgb.dask.train(
client,
params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train')],
early_stopping_rounds=10
)
Dask 数据处理 #
python
import dask.dataframe as dd
# 读取大数据
df = dd.read_csv('s3://bucket/large_data/*.csv')
# 数据预处理
df = df.dropna()
df['feature1'] = df['feature1'].astype('float32')
# 特征工程
df['feature_interaction'] = df['feature1'] * df['feature2']
df['log_feature'] = da.log(df['feature1'] + 1)
# 分割数据
X = df.drop('label', axis=1)
y = df['label']
# 创建 DMatrix
dtrain = xgb.dask.DaskDMatrix(client, X, y)
# 训练
output = xgb.dask.train(client, params, dtrain, num_boost_round=100)
多机训练 #
MPI 配置 #
bash
# 安装 MPI
sudo apt-get install openmpi-bin libopenmpi-dev
# 安装 XGBoost MPI 支持
pip install xgboost --install-option="--with-mpi"
MPI 训练脚本 #
python
import xgboost as xgb
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# 加载本地数据分片
data = np.load(f'data_part_{rank}.npy')
labels = np.load(f'labels_part_{rank}.npy')
# 创建 DMatrix
dtrain = xgb.DMatrix(data, label=labels)
# 参数设置
params = {
'objective': 'binary:logistic',
'max_depth': 6,
'eta': 0.1,
'tree_method': 'hist',
'nthread': 4
}
# 分布式训练
model = xgb.train(
params,
dtrain,
num_boost_round=100,
xgb_model=None # 初始模型
)
# 保存模型(只在 rank 0 保存)
if rank == 0:
model.save_model('model.json')
运行 MPI 训练 #
bash
# 在 4 个节点上运行
mpirun -np 4 python train_mpi.py
# 在多台机器上运行
mpirun -hostfile hosts -np 4 python train_mpi.py
外部内存 #
处理超大数据 #
python
import xgboost as xgb
# 使用外部内存
dtrain = xgb.DMatrix(
'data.csv?format=csv',
cache_prefix='cache/',
missing=np.nan
)
# 训练
params = {'objective': 'binary:logistic', 'max_depth': 6}
model = xgb.train(params, dtrain, num_boost_round=100)
分块加载数据 #
python
import pandas as pd
import xgboost as xgb
import numpy as np
class ChunkedData:
def __init__(self, file_path, chunk_size=10000):
self.file_path = file_path
self.chunk_size = chunk_size
def __iter__(self):
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
X = chunk.drop('label', axis=1).values
y = chunk['label'].values
yield xgb.DMatrix(X, label=y)
# 使用迭代器
dtrain = xgb.DMatrix(ChunkedData('large_data.csv'))
params = {'objective': 'binary:logistic', 'max_depth': 6}
model = xgb.train(params, dtrain, num_boost_round=100)
分布式调优 #
分布式参数调优 #
python
import dask
from dask.distributed import Client
import optuna
def distributed_optimize(client, X, y, n_trials=100):
def objective(trial):
params = {
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'objective': 'binary:logistic'
}
dtrain = xgb.dask.DaskDMatrix(client, X, y)
output = xgb.dask.train(
client,
params,
dtrain,
num_boost_round=100
)
predictions = xgb.dask.predict(client, output['booster'], dtrain)
predictions = predictions.compute()
from sklearn.metrics import roc_auc_score
return roc_auc_score(y.compute(), predictions)
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=n_trials)
return study.best_params
# 使用示例
client = Client()
best_params = distributed_optimize(client, X, y)
分布式最佳实践 #
python
def distributed_best_practices():
"""
分布式训练最佳实践
"""
practices = {
'数据准备': [
'确保数据均匀分布在各节点',
'使用合适的数据格式(Parquet 比 CSV 更快)',
'合理设置分区大小'
],
'资源配置': [
'根据数据量选择合适的节点数',
'每个节点配置足够的内存',
'合理设置线程数'
],
'训练配置': [
'使用 hist 树方法提高效率',
'设置合适的 batch_size',
'监控网络通信开销'
],
'故障处理': [
'设置检查点保存模型',
'实现断点续训',
'监控节点健康状态'
]
}
for category, items in practices.items():
print(f"\n{category}:")
for item in items:
print(f" • {item}")
下一步 #
现在你已经了解了分布式训练,接下来学习 GPU 加速 进一步提升训练速度!
最后更新:2026-04-04