分布式训练 #

分布式训练概述 #

为什么需要分布式训练? #

text
┌─────────────────────────────────────────────────────────────┐
│                    分布式训练需求                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  数据规模挑战:                                               │
│  - 单机内存不足                                              │
│  - 训练时间过长                                              │
│  - 需要处理 TB 级数据                                        │
│                                                              │
│  分布式解决方案:                                             │
│  - 数据并行:分割数据到多个节点                              │
│  - 特征并行:分割特征到多个节点                              │
│  - 混合并行:结合数据和特征并行                              │
│                                                              │
└─────────────────────────────────────────────────────────────┘

XGBoost 分布式架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                  XGBoost 分布式架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│                    ┌─────────────┐                          │
│                    │   Driver    │                          │
│                    │  (协调节点)  │                          │
│                    └──────┬──────┘                          │
│                           │                                  │
│         ┌─────────────────┼─────────────────┐              │
│         │                 │                 │              │
│    ┌────┴────┐      ┌────┴────┐      ┌────┴────┐         │
│    │ Worker1 │      │ Worker2 │      │ Worker3 │         │
│    │ 数据分片1│      │ 数据分片2│      │ 数据分片3│         │
│    └─────────┘      └─────────┘      └─────────┘         │
│                                                              │
│    通信层:Rabit (Reliable AllReduce and Broadcast)         │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Spark 集成 #

PySpark 安装 #

bash
pip install pyspark xgboost

Spark XGBoost 基本使用 #

python
from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier

# 创建 Spark Session
spark = SparkSession.builder \
    .appName("XGBoost-Spark") \
    .getOrCreate()

# 加载数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 准备数据
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[col for col in df.columns if col != 'label'],
    outputCol='features'
)
df = assembler.transform(df)

# 创建分类器
classifier = SparkXGBClassifier(
    num_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    label_col='label',
    features_col='features'
)

# 训练模型
model = classifier.fit(df)

# 预测
predictions = model.transform(df)

predictions.show()

Spark 分布式参数 #

python
classifier = SparkXGBClassifier(
    # XGBoost 参数
    num_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    objective='binary:logistic',
    
    # Spark 分布式参数
    num_workers=4,           # 工作节点数
    use_gpu=False,           # 是否使用 GPU
    features_col='features',
    label_col='label',
    
    # 资源配置
    max_depth=6,
    min_child_weight=1
)

Spark Pipeline #

python
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from xgboost.spark import SparkXGBClassifier

# 创建 Pipeline
stages = []

# 类别特征编码
categorical_cols = ['category1', 'category2']
for col in categorical_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f'{col}_indexed')
    encoder = OneHotEncoder(inputCol=f'{col}_indexed', outputCol=f'{col}_encoded')
    stages.extend([indexer, encoder])

# 特征组装
numeric_cols = ['num1', 'num2', 'num3']
feature_cols = [f'{col}_encoded' for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
stages.append(assembler)

# XGBoost 分类器
xgb_classifier = SparkXGBClassifier(
    num_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    label_col='label',
    features_col='features'
)
stages.append(xgb_classifier)

# 创建 Pipeline
pipeline = Pipeline(stages=stages)

# 训练
model = pipeline.fit(df)

# 预测
predictions = model.transform(df)

Dask 集成 #

Dask 安装 #

bash
pip install dask xgboost dask-ml

Dask XGBoost 基本使用 #

python
import dask.dataframe as dd
from dask.distributed import Client
import xgboost as xgb
import dask_xgboost

# 创建 Dask 客户端
client = Client(n_workers=4)

# 读取数据
df = dd.read_csv('large_data.csv')

# 准备数据
X = df.drop('label', axis=1)
y = df['label']

# 创建 DMatrix
dtrain = xgb.dask.DaskDMatrix(
    client,
    X,
    y
)

# 参数设置
params = {
    'objective': 'binary:logistic',
    'max_depth': 6,
    'eta': 0.1,
    'eval_metric': 'logloss'
}

# 训练模型
output = xgb.dask.train(
    client,
    params,
    dtrain,
    num_boost_round=100
)

model = output['booster']
history = output['history']

# 预测
predictions = xgb.dask.predict(client, model, dtrain)

Dask 分布式配置 #

python
from dask.distributed import Client, LocalCluster

# 本地集群
cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='4GB'
)
client = Client(cluster)

# 远程集群
from dask.distributed import Client
client = Client('scheduler-address:8786')

# 训练
output = xgb.dask.train(
    client,
    params,
    dtrain,
    num_boost_round=100,
    evals=[(dtrain, 'train')],
    early_stopping_rounds=10
)

Dask 数据处理 #

python
import dask.dataframe as dd

# 读取大数据
df = dd.read_csv('s3://bucket/large_data/*.csv')

# 数据预处理
df = df.dropna()
df['feature1'] = df['feature1'].astype('float32')

# 特征工程
df['feature_interaction'] = df['feature1'] * df['feature2']
df['log_feature'] = da.log(df['feature1'] + 1)

# 分割数据
X = df.drop('label', axis=1)
y = df['label']

# 创建 DMatrix
dtrain = xgb.dask.DaskDMatrix(client, X, y)

# 训练
output = xgb.dask.train(client, params, dtrain, num_boost_round=100)

多机训练 #

MPI 配置 #

bash
# 安装 MPI
sudo apt-get install openmpi-bin libopenmpi-dev

# 安装 XGBoost MPI 支持
pip install xgboost --install-option="--with-mpi"

MPI 训练脚本 #

python
import xgboost as xgb
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# 加载本地数据分片
data = np.load(f'data_part_{rank}.npy')
labels = np.load(f'labels_part_{rank}.npy')

# 创建 DMatrix
dtrain = xgb.DMatrix(data, label=labels)

# 参数设置
params = {
    'objective': 'binary:logistic',
    'max_depth': 6,
    'eta': 0.1,
    'tree_method': 'hist',
    'nthread': 4
}

# 分布式训练
model = xgb.train(
    params,
    dtrain,
    num_boost_round=100,
    xgb_model=None  # 初始模型
)

# 保存模型(只在 rank 0 保存)
if rank == 0:
    model.save_model('model.json')

运行 MPI 训练 #

bash
# 在 4 个节点上运行
mpirun -np 4 python train_mpi.py

# 在多台机器上运行
mpirun -hostfile hosts -np 4 python train_mpi.py

外部内存 #

处理超大数据 #

python
import xgboost as xgb

# 使用外部内存
dtrain = xgb.DMatrix(
    'data.csv?format=csv',
    cache_prefix='cache/',
    missing=np.nan
)

# 训练
params = {'objective': 'binary:logistic', 'max_depth': 6}
model = xgb.train(params, dtrain, num_boost_round=100)

分块加载数据 #

python
import pandas as pd
import xgboost as xgb
import numpy as np

class ChunkedData:
    def __init__(self, file_path, chunk_size=10000):
        self.file_path = file_path
        self.chunk_size = chunk_size
    
    def __iter__(self):
        for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
            X = chunk.drop('label', axis=1).values
            y = chunk['label'].values
            yield xgb.DMatrix(X, label=y)

# 使用迭代器
dtrain = xgb.DMatrix(ChunkedData('large_data.csv'))

params = {'objective': 'binary:logistic', 'max_depth': 6}
model = xgb.train(params, dtrain, num_boost_round=100)

分布式调优 #

分布式参数调优 #

python
import dask
from dask.distributed import Client
import optuna

def distributed_optimize(client, X, y, n_trials=100):
    def objective(trial):
        params = {
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'objective': 'binary:logistic'
        }
        
        dtrain = xgb.dask.DaskDMatrix(client, X, y)
        
        output = xgb.dask.train(
            client,
            params,
            dtrain,
            num_boost_round=100
        )
        
        predictions = xgb.dask.predict(client, output['booster'], dtrain)
        predictions = predictions.compute()
        
        from sklearn.metrics import roc_auc_score
        return roc_auc_score(y.compute(), predictions)
    
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)
    
    return study.best_params

# 使用示例
client = Client()
best_params = distributed_optimize(client, X, y)

分布式最佳实践 #

python
def distributed_best_practices():
    """
    分布式训练最佳实践
    """
    practices = {
        '数据准备': [
            '确保数据均匀分布在各节点',
            '使用合适的数据格式(Parquet 比 CSV 更快)',
            '合理设置分区大小'
        ],
        '资源配置': [
            '根据数据量选择合适的节点数',
            '每个节点配置足够的内存',
            '合理设置线程数'
        ],
        '训练配置': [
            '使用 hist 树方法提高效率',
            '设置合适的 batch_size',
            '监控网络通信开销'
        ],
        '故障处理': [
            '设置检查点保存模型',
            '实现断点续训',
            '监控节点健康状态'
        ]
    }
    
    for category, items in practices.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  • {item}")

下一步 #

现在你已经了解了分布式训练,接下来学习 GPU 加速 进一步提升训练速度!

最后更新:2026-04-04