MPI 分布式训练 #
概述 #
MPIJob 是 Kubeflow 提供的基于 MPI(Message Passing Interface)的分布式训练作业类型,适用于高性能计算场景,支持 Horovod 等框架。
MPIJob 架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ MPIJob 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Launcher-Worker 架构: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Launcher │ 启动和管理 MPI 进程 │ │
│ │ │ │ 执行 mpirun 命令 │ │
│ │ └──────┬───────┘ │ │
│ │ │ │ │
│ │ │ MPI 通信 │ │
│ │ │ │ │
│ │ ┌──────┴───────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ │ │
│ │ │ (Rank 0) │ │ (Rank 1) │ │ (Rank 2) │ │ │
│ │ └──────────────┘ └───────────┘ └───────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 通信后端: │
│ ├── Open MPI - 开源 MPI 实现 │
│ ├── Intel MPI - Intel 优化实现 │
│ └── MVAPICH - 高性能实现 │
│ │
└─────────────────────────────────────────────────────────────┘
MPIJob 角色 #
text
MPIJob 角色:
Launcher:
├── 启动节点
├── 执行 mpirun/mpiexec 命令
├── 管理 MPI 进程
├── 收集输出
└── 不执行实际训练
Worker:
├── 工作节点
├── 执行训练计算
├── MPI 进程运行
├── 梯度同步
└── 模型更新
创建 MPIJob #
基本配置 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: basic-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 1
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: mpioperator/mpi-operator:latest
command:
- mpirun
- -np
- "4"
- python
- /opt/model/train.py
Worker:
replicas: 4
template:
spec:
containers:
- name: mpi-worker
image: mpioperator/mpi-operator:latest
Horovod 训练 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: horovod-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 2
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
command:
- mpirun
- -np
- "8"
- -bind-to
- none
- -map-by
- slot
- -x
- LD_LIBRARY_PATH
- -x
- PATH
- -x
- NCCL_DEBUG=INFO
- python
- /opt/model/train.py
resources:
requests:
cpu: "1"
memory: "2Gi"
Worker:
replicas: 4
template:
spec:
containers:
- name: mpi-worker
image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
resources:
requests:
cpu: "4"
memory: "16Gi"
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
GPU 训练配置 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: gpu-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 4
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
command:
- mpirun
- -np
- "16"
- -bind-to
- none
- -map-by
- slot
- -x
- NCCL_SOCKET_IFNAME=eth0
- -x
- NCCL_DEBUG=INFO
- python
- /opt/model/train.py
Worker:
replicas: 4
template:
spec:
containers:
- name: mpi-worker
image: horovod/horovod:0.28.0-tf2.11.0-torch1.13.0-mxnet1.9.1-py39-cuda11.8-cudnn8
resources:
limits:
nvidia.com/gpu: "4"
volumeMounts:
- name: data
mountPath: /data
- name: output
mountPath: /output
volumes:
- name: data
persistentVolumeClaim:
claimName: training-data-pvc
- name: output
persistentVolumeClaim:
claimName: model-output-pvc
Horovod 训练脚本 #
TensorFlow + Horovod #
python
import tensorflow as tf
import horovod.tensorflow as hvd
import os
hvd.init()
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
(mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(
(tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
tf.cast(mnist_labels, tf.int64))
)
dataset = dataset.shuffle(10000).batch(128)
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, [3, 3], activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(64, [3, 3], activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Dropout(0.25),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.optimizers.Adam(learning_rate=0.001 * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint(
'/output/checkpoint-{epoch}.h5',
save_weights_only=True
))
model.fit(dataset, epochs=10, callbacks=callbacks, verbose=1 if hvd.rank() == 0 else 0)
if hvd.rank() == 0:
model.save('/output/model')
PyTorch + Horovod #
python
import torch
import torch.nn as nn
import torch.optim as optim
import horovod.torch as hvd
import os
hvd.init()
torch.cuda.set_device(hvd.local_rank())
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = torch.relu(self.conv1(x))
x = torch.relu(self.conv2(x))
x = torch.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = torch.relu(self.fc1(x))
return self.fc2(x)
model = Net().cuda()
optimizer = optim.SGD(model.parameters(), lr=0.01 * hvd.size())
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters()
)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
train_dataset = torch.utils.data.TensorDataset(
torch.randn(10000, 1, 28, 28),
torch.randint(0, 10, (10000,))
)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=64, sampler=train_sampler
)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and hvd.rank() == 0:
print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
if hvd.rank() == 0:
torch.save(model.state_dict(), '/output/model.pth')
MPI 配置 #
MPI 参数配置 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: mpi-config-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 2
sshAuthMountPath: /root/.ssh
mpiImplementation: OpenMPI
runPolicy:
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: openmpi/openmpi:4.1.4
command:
- mpirun
- --allow-run-as-root
- -np
- "8"
- -bind-to
- none
- -map-by
- slot
- -mca
- btl
- tcp,self
- -mca
- btl_tcp_if_include
- eth0
- -x
- LD_LIBRARY_PATH
- python
- /opt/model/train.py
环境变量配置 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: env-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 1
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: horovod/horovod:latest
env:
- name: OMP_NUM_THREADS
value: "4"
- name: KMP_AFFINITY
value: "granularity=fine,compact,1,0"
- name: MKL_NUM_THREADS
value: "4"
command:
- mpirun
- -np
- "4"
- python
- /opt/model/train.py
Worker:
replicas: 4
template:
spec:
containers:
- name: mpi-worker
image: horovod/horovod:latest
env:
- name: OMP_NUM_THREADS
value: "4"
- name: NCCL_DEBUG
value: "INFO"
存储配置 #
共享存储 #
yaml
apiVersion: kubeflow.org/v1
kind: MPIJob
metadata:
name: storage-mpijob
namespace: kubeflow-user-example-com
spec:
slotsPerWorker: 2
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- name: mpi-launcher
image: horovod/horovod:latest
volumeMounts:
- name: shared-data
mountPath: /shared
volumes:
- name: shared-data
persistentVolumeClaim:
claimName: shared-data-pvc
Worker:
replicas: 4
template:
spec:
containers:
- name: mpi-worker
image: horovod/horovod:latest
volumeMounts:
- name: shared-data
mountPath: /shared
- name: training-data
mountPath: /data
readOnly: true
- name: model-output
mountPath: /output
volumes:
- name: shared-data
persistentVolumeClaim:
claimName: shared-data-pvc
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: model-output
persistentVolumeClaim:
claimName: model-output-pvc
管理 MPIJob #
查看状态 #
bash
# 列出 MPIJob
kubectl get mpijobs -n kubeflow-user-example-com
# 查看详情
kubectl describe mpijob basic-mpijob -n kubeflow-user-example-com
# 查看状态
kubectl get mpijob basic-mpijob -n kubeflow-user-example-com -o jsonpath='{.status}'
# 查看 Pod
kubectl get pods -n kubeflow-user-example-com -l kubeflow.org/job-name=basic-mpijob
查看日志 #
bash
# 查看 Launcher 日志
kubectl logs basic-mpijob-launcher-0 -n kubeflow-user-example-com
# 查看 Worker 日志
kubectl logs basic-mpijob-worker-0 -n kubeflow-user-example-com
# 实时日志
kubectl logs -f basic-mpijob-launcher-0 -n kubeflow-user-example-com
停止和删除 #
bash
# 删除 MPIJob
kubectl delete mpijob basic-mpijob -n kubeflow-user-example-com
# 强制删除
kubectl delete mpijob basic-mpijob -n kubeflow-user-example-com --force --grace-period=0
最佳实践 #
性能优化 #
text
1. MPI 参数优化
├── 合理设置 slotsPerWorker
├── 选择合适的绑定策略
└── 优化网络通信
2. GPU 优化
├── 使用 NCCL 通信
├── 混合精度训练
└── 梯度压缩
3. 数据优化
├── 数据预取
├── 内存缓存
└── 并行数据加载
故障处理 #
text
1. 检查点保存
├── 定期保存模型
├── 保存优化器状态
└── 保存到持久存储
2. 错误诊断
├── 查看 Launcher 日志
├── 检查 Worker 状态
└── 分析 MPI 输出
3. 资源监控
├── 监控 GPU 使用
├── 监控网络通信
└── 监控内存使用
下一步 #
现在你已经掌握了 MPIJob 的使用,接下来学习 高级配置,了解 Kubeflow 的高级功能和配置!
最后更新:2026-04-05