MPI 分布式训练 #

概述 #

MPIJob 是 Kubeflow 提供的基于 MPI(Message Passing Interface)的分布式训练作业类型,适用于高性能计算场景,支持 Horovod 等框架。

MPIJob 架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                     MPIJob 架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Launcher-Worker 架构:                                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                     │   │
│  │  ┌──────────────┐                                   │   │
│  │  │   Launcher   │  启动和管理 MPI 进程              │   │
│  │  │              │  执行 mpirun 命令                 │   │
│  │  └──────┬───────┘                                   │   │
│  │         │                                           │   │
│  │         │  MPI 通信                                 │   │
│  │         │                                           │   │
│  │  ┌──────┴───────┐  ┌───────────┐  ┌───────────┐   │   │
│  │  │   Worker 0   │  │ Worker 1  │  │ Worker 2  │   │   │
│  │  │  (Rank 0)    │  │ (Rank 1)  │  │ (Rank 2)  │   │   │
│  │  └──────────────┘  └───────────┘  └───────────┘   │   │
│  │                                                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  通信后端:                                                  │
│  ├── Open MPI - 开源 MPI 实现                               │
│  ├── Intel MPI - Intel 优化实现                            │
│  └── MVAPICH - 高性能实现                                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

MPIJob 角色 #

text
MPIJob 角色:

Launcher:
├── 启动节点
├── 执行 mpirun/mpiexec 命令
├── 管理 MPI 进程
├── 收集输出
└── 不执行实际训练

Worker:
├── 工作节点
├── 执行训练计算
├── MPI 进程运行
├── 梯度同步
└── 模型更新

创建 MPIJob #

基本配置 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: basic-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 1
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: mpioperator/mpi-operator:latest
            command:
            - mpirun
            - -np
            - "4"
            - python
            - /opt/model/train.py
    Worker:
      replicas: 4
      template:
        spec:
          containers:
          - name: mpi-worker
            image: mpioperator/mpi-operator:latest

Horovod 训练 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: horovod-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 2
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
            command:
            - mpirun
            - -np
            - "8"
            - -bind-to
            - none
            - -map-by
            - slot
            - -x
            - LD_LIBRARY_PATH
            - -x
            - PATH
            - -x
            - NCCL_DEBUG=INFO
            - python
            - /opt/model/train.py
            resources:
              requests:
                cpu: "1"
                memory: "2Gi"
    Worker:
      replicas: 4
      template:
        spec:
          containers:
          - name: mpi-worker
            image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
            resources:
              requests:
                cpu: "4"
                memory: "16Gi"
                nvidia.com/gpu: "2"
              limits:
                nvidia.com/gpu: "2"

GPU 训练配置 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: gpu-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 4
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
            command:
            - mpirun
            - -np
            - "16"
            - -bind-to
            - none
            - -map-by
            - slot
            - -x
            - NCCL_SOCKET_IFNAME=eth0
            - -x
            - NCCL_DEBUG=INFO
            - python
            - /opt/model/train.py
    Worker:
      replicas: 4
      template:
        spec:
          containers:
          - name: mpi-worker
            image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
            resources:
              limits:
                nvidia.com/gpu: "4"
            volumeMounts:
            - name: data
              mountPath: /data
            - name: output
              mountPath: /output
          volumes:
          - name: data
            persistentVolumeClaim:
              claimName: training-data-pvc
          - name: output
            persistentVolumeClaim:
              claimName: model-output-pvc

Horovod 训练脚本 #

TensorFlow + Horovod #

python
import tensorflow as tf
import horovod.tensorflow as hvd
import os

hvd.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

(mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
    (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
     tf.cast(mnist_labels, tf.int64))
)
dataset = dataset.shuffle(10000).batch(128)

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, [3, 3], activation='relu', input_shape=(28, 28, 1)),
    tf.keras.layers.Conv2D(64, [3, 3], activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Dropout(0.25),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(10, activation='softmax')
])

optimizer = tf.optimizers.Adam(learning_rate=0.001 * hvd.size())

optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

if hvd.rank() == 0:
    callbacks.append(tf.keras.callbacks.ModelCheckpoint(
        '/output/checkpoint-{epoch}.h5',
        save_weights_only=True
    ))

model.fit(dataset, epochs=10, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)

if hvd.rank() == 0:
    model.save('/output/model')

PyTorch + Horovod #

python
import torch
import torch.nn as nn
import torch.optim as optim
import horovod.torch as hvd
import os

hvd.init()

torch.cuda.set_device(hvd.local_rank())

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
    
    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

model = Net().cuda()
optimizer = optim.SGD(model.parameters(), lr=0.01 * hvd.size())

optimizer = hvd.DistributedOptimizer(
    optimizer,
    named_parameters=model.named_parameters()
)

hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

train_dataset = torch.utils.data.TensorDataset(
    torch.randn(10000, 1, 28, 28),
    torch.randint(0, 10, (10000,))
)

train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=64, sampler=train_sampler
)

criterion = nn.CrossEntropyLoss()

for epoch in range(10):
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        if batch_idx % 100 == 0 and hvd.rank() == 0:
            print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')

if hvd.rank() == 0:
    torch.save(model.state_dict(), '/output/model.pth')

MPI 配置 #

MPI 参数配置 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: mpi-config-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 2
  sshAuthMountPath: /root/.ssh
  mpiImplementation: OpenMPI
  runPolicy:
    cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: openmpi/openmpi:4.1.4
            command:
            - mpirun
            - --allow-run-as-root
            - -np
            - "8"
            - -bind-to
            - none
            - -map-by
            - slot
            - -mca
            - btl
            - tcp,self
            - -mca
            - btl_tcp_if_include
            - eth0
            - -x
            - LD_LIBRARY_PATH
            - python
            - /opt/model/train.py

环境变量配置 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: env-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 1
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: horovod/horovod:latest
            env:
            - name: OMP_NUM_THREADS
              value: "4"
            - name: KMP_AFFINITY
              value: "granularity=fine,compact,1,0"
            - name: MKL_NUM_THREADS
              value: "4"
            command:
            - mpirun
            - -np
            - "4"
            - python
            - /opt/model/train.py
    Worker:
      replicas: 4
      template:
        spec:
          containers:
          - name: mpi-worker
            image: horovod/horovod:latest
            env:
            - name: OMP_NUM_THREADS
              value: "4"
            - name: NCCL_DEBUG
              value: "INFO"

存储配置 #

共享存储 #

yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
  name: storage-mpijob
  namespace: kubeflow-user-example-com
spec:
  slotsPerWorker: 2
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - name: mpi-launcher
            image: horovod/horovod:latest
            volumeMounts:
            - name: shared-data
              mountPath: /shared
          volumes:
          - name: shared-data
            persistentVolumeClaim:
              claimName: shared-data-pvc
    Worker:
      replicas: 4
      template:
        spec:
          containers:
          - name: mpi-worker
            image: horovod/horovod:latest
            volumeMounts:
            - name: shared-data
              mountPath: /shared
            - name: training-data
              mountPath: /data
              readOnly: true
            - name: model-output
              mountPath: /output
          volumes:
          - name: shared-data
            persistentVolumeClaim:
              claimName: shared-data-pvc
          - name: training-data
            persistentVolumeClaim:
              claimName: training-data-pvc
          - name: model-output
            persistentVolumeClaim:
              claimName: model-output-pvc

管理 MPIJob #

查看状态 #

bash
# 列出 MPIJob
kubectl get mpijobs -n kubeflow-user-example-com

# 查看详情
kubectl describe mpijob basic-mpijob -n kubeflow-user-example-com

# 查看状态
kubectl get mpijob basic-mpijob -n kubeflow-user-example-com -o jsonpath='{.status}'

# 查看 Pod
kubectl get pods -n kubeflow-user-example-com -l kubeflow.org/job-name=basic-mpijob

查看日志 #

bash
# 查看 Launcher 日志
kubectl logs basic-mpijob-launcher-0 -n kubeflow-user-example-com

# 查看 Worker 日志
kubectl logs basic-mpijob-worker-0 -n kubeflow-user-example-com

# 实时日志
kubectl logs -f basic-mpijob-launcher-0 -n kubeflow-user-example-com

停止和删除 #

bash
# 删除 MPIJob
kubectl delete mpijob basic-mpijob -n kubeflow-user-example-com

# 强制删除
kubectl delete mpijob basic-mpijob -n kubeflow-user-example-com --force --grace-period=0

最佳实践 #

性能优化 #

text
1. MPI 参数优化
   ├── 合理设置 slotsPerWorker
   ├── 选择合适的绑定策略
   └── 优化网络通信

2. GPU 优化
   ├── 使用 NCCL 通信
   ├── 混合精度训练
   └── 梯度压缩

3. 数据优化
   ├── 数据预取
   ├── 内存缓存
   └── 并行数据加载

故障处理 #

text
1. 检查点保存
   ├── 定期保存模型
   ├── 保存优化器状态
   └── 保存到持久存储

2. 错误诊断
   ├── 查看 Launcher 日志
   ├── 检查 Worker 状态
   └── 分析 MPI 输出

3. 资源监控
   ├── 监控 GPU 使用
   ├── 监控网络通信
   └── 监控内存使用

下一步 #

现在你已经掌握了 MPIJob 的使用,接下来学习 高级配置,了解 Kubeflow 的高级功能和配置!

最后更新:2026-04-05