端到端示例 #

概述 #

本章通过一个完整的图像分类项目,演示如何使用 Kubeflow 构建端到端的机器学习工作流。

项目架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                   端到端 ML 工作流                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 数据准备                                                │
│     ├── 数据下载                                           │
│     ├── 数据预处理                                         │
│     └── 数据验证                                           │
│                                                             │
│  2. 模型训练                                                │
│     ├── 超参数调优 (Katib)                                 │
│     ├── 分布式训练 (TFJob)                                 │
│     └── 模型评估                                           │
│                                                             │
│  3. 模型服务                                                │
│     ├── 模型打包                                           │
│     ├── 服务部署 (KServe)                                  │
│     └── 性能测试                                           │
│                                                             │
│  4. 监控告警                                                │
│     ├── 训练监控                                           │
│     ├── 服务监控                                           │
│     └── 告警配置                                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

项目结构 #

text
mnist-classification/
├── pipelines/
│   ├── components/
│   │   ├── download_data.py
│   │   ├── preprocess.py
│   │   ├── train.py
│   │   └── evaluate.py
│   └── pipeline.py
├── training/
│   ├── train.py
│   ├── model.py
│   └── Dockerfile
├── serving/
│   ├── handler.py
│   └── Dockerfile
├── katib/
│   └── experiment.yaml
├── kubernetes/
│   ├── tfjob.yaml
│   ├── inferenceservice.yaml
│   └── configmap.yaml
└── README.md

数据准备 Pipeline #

数据下载组件 #

python
from kfp import dsl
from kfp.dsl import Output, Dataset

@dsl.component(base_image='python:3.9', packages_to_install=['tensorflow'])
def download_mnist_data(
    output_data: Output[Dataset]
):
    import tensorflow as tf
    import numpy as np
    import os
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    os.makedirs(output_data.path, exist_ok=True)
    
    np.save(os.path.join(output_data.path, 'x_train.npy'), x_train)
    np.save(os.path.join(output_data.path, 'y_train.npy'), y_train)
    np.save(os.path.join(output_data.path, 'x_test.npy'), x_test)
    np.save(os.path.join(output_data.path, 'y_test.npy'), y_test)
    
    print(f"Downloaded {len(x_train)} training samples")
    print(f"Downloaded {len(x_test)} test samples")

数据预处理组件 #

python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Metrics

@dsl.component(base_image='python:3.9', packages_to_install=['numpy'])
def preprocess_data(
    input_data: Input[Dataset],
    train_data: Output[Dataset],
    test_data: Output[Dataset],
    metrics: Output[Metrics]
):
    import numpy as np
    import os
    
    x_train = np.load(os.path.join(input_data.path, 'x_train.npy'))
    y_train = np.load(os.path.join(input_data.path, 'y_train.npy'))
    x_test = np.load(os.path.join(input_data.path, 'x_test.npy'))
    y_test = np.load(os.path.join(input_data.path, 'y_test.npy'))
    
    x_train = x_train.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0
    
    x_train = x_train.reshape(-1, 28, 28, 1)
    x_test = x_test.reshape(-1, 28, 28, 1)
    
    os.makedirs(train_data.path, exist_ok=True)
    os.makedirs(test_data.path, exist_ok=True)
    
    np.save(os.path.join(train_data.path, 'x.npy'), x_train)
    np.save(os.path.join(train_data.path, 'y.npy'), y_train)
    np.save(os.path.join(test_data.path, 'x.npy'), x_test)
    np.save(os.path.join(test_data.path, 'y.npy'), y_test)
    
    metrics.log_metric('train_samples', len(x_train))
    metrics.log_metric('test_samples', len(x_test))
    metrics.log_metric('image_shape', str(x_train.shape[1:]))

模型训练组件 #

python
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image='tensorflow/tensorflow:2.12.0',
    packages_to_install=['numpy']
)
def train_model(
    train_data: Input[Dataset],
    test_data: Input[Dataset],
    output_model: Output[Model],
    metrics: Output[Metrics],
    epochs: int = 10,
    learning_rate: float = 0.001,
    batch_size: int = 64
):
    import tensorflow as tf
    import numpy as np
    import os
    
    x_train = np.load(os.path.join(train_data.path, 'x.npy'))
    y_train = np.load(os.path.join(train_data.path, 'y.npy'))
    x_test = np.load(os.path.join(test_data.path, 'x.npy'))
    y_test = np.load(os.path.join(test_data.path, 'y.npy'))
    
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(
        optimizer=optimizer,
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    history = model.fit(
        x_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(x_test, y_test),
        verbose=1
    )
    
    test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
    
    model.save(output_model.path)
    
    metrics.log_metric('test_accuracy', float(test_acc))
    metrics.log_metric('test_loss', float(test_loss))
    metrics.log_metric('epochs', epochs)
    metrics.log_metric('learning_rate', learning_rate)
    
    print(f"Test accuracy: {test_acc:.4f}")
    print(f"Test loss: {test_loss:.4f}")

模型评估组件 #

python
from kfp import dsl
from kfp.dsl import Input, Output, Model, Metrics, ClassificationMetrics

@dsl.component(
    base_image='tensorflow/tensorflow:2.12.0',
    packages_to_install=['numpy', 'scikit-learn']
)
def evaluate_model(
    test_data: Input[Dataset],
    model_input: Input[Model],
    metrics: Output[Metrics],
    classification_metrics: Output[ClassificationMetrics]
):
    import tensorflow as tf
    import numpy as np
    from sklearn.metrics import precision_score, recall_score, f1_score
    import os
    
    x_test = np.load(os.path.join(test_data.path, 'x.npy'))
    y_test = np.load(os.path.join(test_data.path, 'y.npy'))
    
    model = tf.keras.models.load_model(model_input.path)
    
    y_pred = model.predict(x_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    
    accuracy = np.mean(y_pred_classes == y_test)
    precision = precision_score(y_test, y_pred_classes, average='weighted')
    recall = recall_score(y_test, y_pred_classes, average='weighted')
    f1 = f1_score(y_test, y_pred_classes, average='weighted')
    
    metrics.log_metric('accuracy', float(accuracy))
    metrics.log_metric('precision', float(precision))
    metrics.log_metric('recall', float(recall))
    metrics.log_metric('f1_score', float(f1))
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

完整 Pipeline #

python
from kfp import dsl, compiler
from kfp.dsl import Output, Dataset, Model, Metrics

@dsl.pipeline(
    name='MNIST Classification Pipeline',
    description='端到端 MNIST 图像分类流水线'
)
def mnist_pipeline(
    epochs: int = 10,
    learning_rate: float = 0.001,
    batch_size: int = 64
):
    download_task = download_mnist_data()
    
    preprocess_task = preprocess_data(
        input_data=download_task.outputs['output_data']
    )
    
    train_task = train_model(
        train_data=preprocess_task.outputs['train_data'],
        test_data=preprocess_task.outputs['test_data'],
        epochs=epochs,
        learning_rate=learning_rate,
        batch_size=batch_size
    )
    
    evaluate_task = evaluate_model(
        test_data=preprocess_task.outputs['test_data'],
        model_input=train_task.outputs['output_model']
    )

if __name__ == '__main__':
    compiler.Compiler().compile(mnist_pipeline, 'mnist_pipeline.yaml')

超参数调优 #

Katib 实验 #

yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: mnist-hyperparameter-tuning
  namespace: kubeflow-user-example-com
spec:
  objective:
    type: maximize
    goal: 0.99
    objectiveMetricName: accuracy
  algorithm:
    algorithmName: bayesianoptimization
    algorithmSettings:
    - name: random_state
      value: "42"
  parallelTrialCount: 3
  maxTrialCount: 20
  maxFailedTrialCount: 5
  parameters:
  - name: learning_rate
    parameterType: double
    feasibleSpace:
      min: "0.0001"
      max: "0.01"
  - name: batch_size
    parameterType: int
    feasibleSpace:
      min: "32"
      max: "256"
  - name: epochs
    parameterType: int
    feasibleSpace:
      min: "5"
      max: "20"
  trialTemplate:
    trialSpec:
      apiVersion: kubeflow.org/v1
      kind: TFJob
      spec:
        tfReplicaSpecs:
          Chief:
            replicas: 1
            restartPolicy: OnFailure
            template:
              spec:
                containers:
                - name: tensorflow
                  image: tensorflow/tensorflow:2.12.0
                  command:
                  - python
                  - /opt/model/train.py
                  - --learning-rate=${trialParameters.learning_rate}
                  - --batch-size=${trialParameters.batch_size}
                  - --epochs=${trialParameters.epochs}
                  resources:
                    limits:
                      nvidia.com/gpu: "1"
    trialParameters:
    - name: learning_rate
      reference: learning_rate
    - name: batch_size
      reference: batch_size
    - name: epochs
      reference: epochs

分布式训练 #

TFJob 配置 #

yaml
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: mnist-distributed-training
  namespace: kubeflow-user-example-com
spec:
  runPolicy:
    cleanPodPolicy: Running
  tfReplicaSpecs:
    Chief:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: tensorflow
            image: your-registry/mnist-trainer:latest
            command:
            - python
            - /opt/model/train.py
            - --epochs=20
            - --batch-size=128
            resources:
              limits:
                nvidia.com/gpu: "2"
            volumeMounts:
            - name: data
              mountPath: /data
            - name: output
              mountPath: /output
          volumes:
          - name: data
            persistentVolumeClaim:
              claimName: mnist-data-pvc
          - name: output
            persistentVolumeClaim:
              claimName: model-output-pvc
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: tensorflow
            image: your-registry/mnist-trainer:latest
            command:
            - python
            - /opt/model/train.py
            - --epochs=20
            - --batch-size=128
            resources:
              limits:
                nvidia.com/gpu: "2"

分布式训练脚本 #

python
import tensorflow as tf
import os
import json

def setup_distributed_training():
    tf_config = os.environ.get('TF_CONFIG')
    if tf_config:
        config = json.loads(tf_config)
        cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
        strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
        return strategy
    return tf.distribute.get_strategy()

def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model

def main():
    strategy = setup_distributed_training()
    
    with strategy.scope():
        model = create_model()
        model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0
    
    model.fit(
        x_train, y_train,
        epochs=20,
        batch_size=128,
        validation_data=(x_test, y_test)
    )
    
    if 'chief' in os.environ.get('TF_CONFIG', ''):
        model.save('/output/model')

if __name__ == '__main__':
    main()

模型服务 #

KServe 部署 #

yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: mnist-classifier
  namespace: kubeflow-user-example-com
  annotations:
    autoscaling.knative.dev/target: "10"
    autoscaling.knative.dev/minScale: "1"
    autoscaling.knative.dev/maxScale: "10"
spec:
  predictor:
    tensorflow:
      storageUri: s3://models/mnist
      resources:
        requests:
          cpu: "1"
          memory: "2Gi"
        limits:
          cpu: "2"
          memory: "4Gi"

推理测试 #

python
import requests
import numpy as np
import json

url = "http://mnist-classifier.kubeflow-user-example-com.example.com/v1/models/mnist-classifier:predict"

x_test = np.random.rand(1, 28, 28, 1).astype(np.float32)

data = {
    "instances": x_test.tolist()
}

response = requests.post(url, json=data)
predictions = response.json()

print(f"Predictions: {predictions}")
print(f"Predicted class: {np.argmax(predictions['predictions'][0])}")

完整工作流 #

运行 Pipeline #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

experiment = client.create_experiment(
    name='MNIST Classification',
    description='MNIST 图像分类实验'
)

run = client.create_run_from_pipeline_package(
    pipeline_package_path='mnist_pipeline.yaml',
    arguments={
        'epochs': 15,
        'learning_rate': 0.001,
        'batch_size': 128
    },
    experiment_name='MNIST Classification',
    run_name='mnist-run-001'
)

print(f"Run ID: {run.run_id}")

监控训练 #

python
import time
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

run_id = 'your-run-id'

while True:
    run = client.get_run(run_id)
    status = run.run.status
    
    print(f"Run status: {status}")
    
    if status in ['Succeeded', 'Failed', 'Error']:
        break
    
    time.sleep(30)

print(f"Final status: {status}")

最佳实践 #

项目组织 #

text
1. 代码管理
   ├── 使用 Git 版本控制
   ├── 模块化组件设计
   └── 文档完善

2. 数据管理
   ├── 数据版本控制
   ├── 数据血缘追踪
   └── 数据质量检查

3. 模型管理
   ├── 模型版本管理
   ├── 模型元数据记录
   └── 模型评估标准

工作流优化 #

text
1. Pipeline 优化
   ├── 组件缓存
   ├── 并行执行
   └── 资源优化

2. 训练优化
   ├── 分布式训练
   ├── 混合精度
   └── 梯度累积

3. 服务优化
   ├── 模型量化
   ├── 批处理推理
   └── 自动扩缩容

下一步 #

现在你已经完成了端到端示例,接下来学习 生产环境部署,了解企业级部署的最佳实践!

最后更新:2026-04-05