端到端示例 #
概述 #
本章通过一个完整的图像分类项目,演示如何使用 Kubeflow 构建端到端的机器学习工作流。
项目架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ 端到端 ML 工作流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 数据准备 │
│ ├── 数据下载 │
│ ├── 数据预处理 │
│ └── 数据验证 │
│ │
│ 2. 模型训练 │
│ ├── 超参数调优 (Katib) │
│ ├── 分布式训练 (TFJob) │
│ └── 模型评估 │
│ │
│ 3. 模型服务 │
│ ├── 模型打包 │
│ ├── 服务部署 (KServe) │
│ └── 性能测试 │
│ │
│ 4. 监控告警 │
│ ├── 训练监控 │
│ ├── 服务监控 │
│ └── 告警配置 │
│ │
└─────────────────────────────────────────────────────────────┘
项目结构 #
text
mnist-classification/
├── pipelines/
│ ├── components/
│ │ ├── download_data.py
│ │ ├── preprocess.py
│ │ ├── train.py
│ │ └── evaluate.py
│ └── pipeline.py
├── training/
│ ├── train.py
│ ├── model.py
│ └── Dockerfile
├── serving/
│ ├── handler.py
│ └── Dockerfile
├── katib/
│ └── experiment.yaml
├── kubernetes/
│ ├── tfjob.yaml
│ ├── inferenceservice.yaml
│ └── configmap.yaml
└── README.md
数据准备 Pipeline #
数据下载组件 #
python
from kfp import dsl
from kfp.dsl import Output, Dataset
@dsl.component(base_image='python:3.9', packages_to_install=['tensorflow'])
def download_mnist_data(
output_data: Output[Dataset]
):
import tensorflow as tf
import numpy as np
import os
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
os.makedirs(output_data.path, exist_ok=True)
np.save(os.path.join(output_data.path, 'x_train.npy'), x_train)
np.save(os.path.join(output_data.path, 'y_train.npy'), y_train)
np.save(os.path.join(output_data.path, 'x_test.npy'), x_test)
np.save(os.path.join(output_data.path, 'y_test.npy'), y_test)
print(f"Downloaded {len(x_train)} training samples")
print(f"Downloaded {len(x_test)} test samples")
数据预处理组件 #
python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Metrics
@dsl.component(base_image='python:3.9', packages_to_install=['numpy'])
def preprocess_data(
input_data: Input[Dataset],
train_data: Output[Dataset],
test_data: Output[Dataset],
metrics: Output[Metrics]
):
import numpy as np
import os
x_train = np.load(os.path.join(input_data.path, 'x_train.npy'))
y_train = np.load(os.path.join(input_data.path, 'y_train.npy'))
x_test = np.load(os.path.join(input_data.path, 'x_test.npy'))
y_test = np.load(os.path.join(input_data.path, 'y_test.npy'))
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)
os.makedirs(train_data.path, exist_ok=True)
os.makedirs(test_data.path, exist_ok=True)
np.save(os.path.join(train_data.path, 'x.npy'), x_train)
np.save(os.path.join(train_data.path, 'y.npy'), y_train)
np.save(os.path.join(test_data.path, 'x.npy'), x_test)
np.save(os.path.join(test_data.path, 'y.npy'), y_test)
metrics.log_metric('train_samples', len(x_train))
metrics.log_metric('test_samples', len(x_test))
metrics.log_metric('image_shape', str(x_train.shape[1:]))
模型训练组件 #
python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image='tensorflow/tensorflow:2.12.0',
packages_to_install=['numpy']
)
def train_model(
train_data: Input[Dataset],
test_data: Input[Dataset],
output_model: Output[Model],
metrics: Output[Metrics],
epochs: int = 10,
learning_rate: float = 0.001,
batch_size: int = 64
):
import tensorflow as tf
import numpy as np
import os
x_train = np.load(os.path.join(train_data.path, 'x.npy'))
y_train = np.load(os.path.join(train_data.path, 'y.npy'))
x_test = np.load(os.path.join(test_data.path, 'x.npy'))
y_test = np.load(os.path.join(test_data.path, 'y.npy'))
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(
optimizer=optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
history = model.fit(
x_train, y_train,
epochs=epochs,
batch_size=batch_size,
validation_data=(x_test, y_test),
verbose=1
)
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
model.save(output_model.path)
metrics.log_metric('test_accuracy', float(test_acc))
metrics.log_metric('test_loss', float(test_loss))
metrics.log_metric('epochs', epochs)
metrics.log_metric('learning_rate', learning_rate)
print(f"Test accuracy: {test_acc:.4f}")
print(f"Test loss: {test_loss:.4f}")
模型评估组件 #
python
from kfp import dsl
from kfp.dsl import Input, Output, Model, Metrics, ClassificationMetrics
@dsl.component(
base_image='tensorflow/tensorflow:2.12.0',
packages_to_install=['numpy', 'scikit-learn']
)
def evaluate_model(
test_data: Input[Dataset],
model_input: Input[Model],
metrics: Output[Metrics],
classification_metrics: Output[ClassificationMetrics]
):
import tensorflow as tf
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score
import os
x_test = np.load(os.path.join(test_data.path, 'x.npy'))
y_test = np.load(os.path.join(test_data.path, 'y.npy'))
model = tf.keras.models.load_model(model_input.path)
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
accuracy = np.mean(y_pred_classes == y_test)
precision = precision_score(y_test, y_pred_classes, average='weighted')
recall = recall_score(y_test, y_pred_classes, average='weighted')
f1 = f1_score(y_test, y_pred_classes, average='weighted')
metrics.log_metric('accuracy', float(accuracy))
metrics.log_metric('precision', float(precision))
metrics.log_metric('recall', float(recall))
metrics.log_metric('f1_score', float(f1))
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
完整 Pipeline #
python
from kfp import dsl, compiler
from kfp.dsl import Output, Dataset, Model, Metrics
@dsl.pipeline(
name='MNIST Classification Pipeline',
description='端到端 MNIST 图像分类流水线'
)
def mnist_pipeline(
epochs: int = 10,
learning_rate: float = 0.001,
batch_size: int = 64
):
download_task = download_mnist_data()
preprocess_task = preprocess_data(
input_data=download_task.outputs['output_data']
)
train_task = train_model(
train_data=preprocess_task.outputs['train_data'],
test_data=preprocess_task.outputs['test_data'],
epochs=epochs,
learning_rate=learning_rate,
batch_size=batch_size
)
evaluate_task = evaluate_model(
test_data=preprocess_task.outputs['test_data'],
model_input=train_task.outputs['output_model']
)
if __name__ == '__main__':
compiler.Compiler().compile(mnist_pipeline, 'mnist_pipeline.yaml')
超参数调优 #
Katib 实验 #
yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: mnist-hyperparameter-tuning
namespace: kubeflow-user-example-com
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: accuracy
algorithm:
algorithmName: bayesianoptimization
algorithmSettings:
- name: random_state
value: "42"
parallelTrialCount: 3
maxTrialCount: 20
maxFailedTrialCount: 5
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.0001"
max: "0.01"
- name: batch_size
parameterType: int
feasibleSpace:
min: "32"
max: "256"
- name: epochs
parameterType: int
feasibleSpace:
min: "5"
max: "20"
trialTemplate:
trialSpec:
apiVersion: kubeflow.org/v1
kind: TFJob
spec:
tfReplicaSpecs:
Chief:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:2.12.0
command:
- python
- /opt/model/train.py
- --learning-rate=${trialParameters.learning_rate}
- --batch-size=${trialParameters.batch_size}
- --epochs=${trialParameters.epochs}
resources:
limits:
nvidia.com/gpu: "1"
trialParameters:
- name: learning_rate
reference: learning_rate
- name: batch_size
reference: batch_size
- name: epochs
reference: epochs
分布式训练 #
TFJob 配置 #
yaml
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: mnist-distributed-training
namespace: kubeflow-user-example-com
spec:
runPolicy:
cleanPodPolicy: Running
tfReplicaSpecs:
Chief:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: your-registry/mnist-trainer:latest
command:
- python
- /opt/model/train.py
- --epochs=20
- --batch-size=128
resources:
limits:
nvidia.com/gpu: "2"
volumeMounts:
- name: data
mountPath: /data
- name: output
mountPath: /output
volumes:
- name: data
persistentVolumeClaim:
claimName: mnist-data-pvc
- name: output
persistentVolumeClaim:
claimName: model-output-pvc
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: your-registry/mnist-trainer:latest
command:
- python
- /opt/model/train.py
- --epochs=20
- --batch-size=128
resources:
limits:
nvidia.com/gpu: "2"
分布式训练脚本 #
python
import tensorflow as tf
import os
import json
def setup_distributed_training():
tf_config = os.environ.get('TF_CONFIG')
if tf_config:
config = json.loads(tf_config)
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
return strategy
return tf.distribute.get_strategy()
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D((2, 2)),
tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
return model
def main():
strategy = setup_distributed_training()
with strategy.scope():
model = create_model()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0
model.fit(
x_train, y_train,
epochs=20,
batch_size=128,
validation_data=(x_test, y_test)
)
if 'chief' in os.environ.get('TF_CONFIG', ''):
model.save('/output/model')
if __name__ == '__main__':
main()
模型服务 #
KServe 部署 #
yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: mnist-classifier
namespace: kubeflow-user-example-com
annotations:
autoscaling.knative.dev/target: "10"
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "10"
spec:
predictor:
tensorflow:
storageUri: s3://models/mnist
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
推理测试 #
python
import requests
import numpy as np
import json
url = "http://mnist-classifier.kubeflow-user-example-com.example.com/v1/models/mnist-classifier:predict"
x_test = np.random.rand(1, 28, 28, 1).astype(np.float32)
data = {
"instances": x_test.tolist()
}
response = requests.post(url, json=data)
predictions = response.json()
print(f"Predictions: {predictions}")
print(f"Predicted class: {np.argmax(predictions['predictions'][0])}")
完整工作流 #
运行 Pipeline #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
experiment = client.create_experiment(
name='MNIST Classification',
description='MNIST 图像分类实验'
)
run = client.create_run_from_pipeline_package(
pipeline_package_path='mnist_pipeline.yaml',
arguments={
'epochs': 15,
'learning_rate': 0.001,
'batch_size': 128
},
experiment_name='MNIST Classification',
run_name='mnist-run-001'
)
print(f"Run ID: {run.run_id}")
监控训练 #
python
import time
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
run_id = 'your-run-id'
while True:
run = client.get_run(run_id)
status = run.run.status
print(f"Run status: {status}")
if status in ['Succeeded', 'Failed', 'Error']:
break
time.sleep(30)
print(f"Final status: {status}")
最佳实践 #
项目组织 #
text
1. 代码管理
├── 使用 Git 版本控制
├── 模块化组件设计
└── 文档完善
2. 数据管理
├── 数据版本控制
├── 数据血缘追踪
└── 数据质量检查
3. 模型管理
├── 模型版本管理
├── 模型元数据记录
└── 模型评估标准
工作流优化 #
text
1. Pipeline 优化
├── 组件缓存
├── 并行执行
└── 资源优化
2. 训练优化
├── 分布式训练
├── 混合精度
└── 梯度累积
3. 服务优化
├── 模型量化
├── 批处理推理
└── 自动扩缩容
下一步 #
现在你已经完成了端到端示例,接下来学习 生产环境部署,了解企业级部署的最佳实践!
最后更新:2026-04-05