快速开始 #

概述 #

本章将通过实际操作带你快速上手 Kubeflow 的核心功能。你将学习:

text
学习内容:
├── 访问 Kubeflow Dashboard
├── 创建 Jupyter Notebook
├── 构建第一个 Pipeline
├── 运行超参数调优
└── 部署模型服务

访问 Kubeflow Dashboard #

登录 Dashboard #

text
1. 打开浏览器访问 Kubeflow Dashboard
   └── http://localhost:8080 (端口转发方式)

2. 输入登录凭据
   ├── 用户名: user@example.com
   └── 密码: 12341234

3. 首次登录后建议修改密码

Dashboard 界面 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Kubeflow Dashboard                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  左侧导航栏:                                                │
│  ├── Home - 首页                                            │
│  ├── Notebooks - Jupyter 笔记本                             │
│  ├── Experiments (AutoML) - Katib 实验                      │
│  ├── Experiments (Pipelines) - Pipeline 实验                │
│  ├── Pipelines - 流水线                                     │
│  ├── Runs - 运行记录                                        │
│  ├── Models - 模型管理                                      │
│  ├── Volumes - 存储卷                                       │
│  └── TensorBoard - 可视化                                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

创建 Jupyter Notebook #

通过 Dashboard 创建 #

text
1. 点击左侧导航栏 "Notebooks"

2. 点击 "New Notebook" 按钮

3. 配置 Notebook:
   ├── Name: my-notebook
   ├── Namespace: kubeflow-user-example-com
   ├── Image: 选择预置镜像
   │   └── TensorFlow 2.x 或 PyTorch
   ├── CPU: 2
   ├── Memory: 4Gi
   └── GPU: 0 (可选)

4. 点击 "LAUNCH" 创建

5. 等待状态变为 "Running"

6. 点击 "CONNECT" 连接到 Notebook

通过 YAML 创建 #

yaml
apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
  name: my-notebook
  namespace: kubeflow-user-example-com
spec:
  template:
    spec:
      containers:
      - name: notebook
        image: public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow:v1.8.0
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "2"
            memory: "4Gi"
      serviceAccountName: default-editor
bash
# 应用配置
kubectl apply -f notebook.yaml

# 查看 Notebook 状态
kubectl get notebooks -n kubeflow-user-example-com

在 Notebook 中工作 #

python
# 在 Jupyter Notebook 中运行

# 安装依赖
!pip install numpy pandas scikit-learn

# 导入库
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# 加载数据
iris = load_iris()
X, y = iris.data, iris.target

# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 训练模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 评估模型
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")

构建第一个 Pipeline #

Pipeline 基础概念 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Pipeline 基础概念                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Component(组件):                                         │
│  ├── 独立的可执行单元                                        │
│  ├── 可以是 Python 函数                                     │
│  └── 有明确的输入和输出                                      │
│                                                             │
│  Pipeline(流水线):                                        │
│  ├── 由多个组件组成                                          │
│  ├── 定义组件间的依赖关系                                    │
│  └── 支持条件执行和循环                                      │
│                                                             │
│  Run(运行):                                               │
│  ├── Pipeline 的一次执行                                     │
│  ├── 记录所有参数和结果                                      │
│  └── 可以复现和比较                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

安装 Kubeflow Pipelines SDK #

bash
# 安装 SDK
pip install kfp

# 验证安装
python -c "import kfp; print(kfp.__version__)"

创建简单 Pipeline #

python
from kfp import dsl
from kfp import compiler

@dsl.component(base_image='python:3.9')
def add_numbers(a: float, b: float) -> float:
    """计算两个数的和"""
    return a + b

@dsl.component(base_image='python:3.9')
def multiply_numbers(a: float, b: float) -> float:
    """计算两个数的积"""
    return a * b

@dsl.pipeline(
    name='Simple Math Pipeline',
    description='一个简单的数学运算流水线'
)
def math_pipeline(a: float = 10, b: float = 5):
    add_task = add_numbers(a=a, b=b)
    multiply_task = multiply_numbers(a=add_task.output, b=2)

# 编译 Pipeline
compiler.Compiler().compile(
    pipeline_func=math_pipeline,
    package_path='math_pipeline.yaml'
)

上传并运行 Pipeline #

text
方式一:通过 Dashboard
1. 点击左侧 "Pipelines"
2. 点击 "Upload pipeline"
3. 选择编译好的 YAML 文件
4. 点击 "Create"
5. 点击 "Create run" 运行

方式二:通过 SDK
python
import kfp

# 连接到 Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080/pipeline')

# 上传 Pipeline
pipeline = client.upload_pipeline(
    pipeline_package_path='math_pipeline.yaml',
    pipeline_name='Simple Math Pipeline'
)

# 创建运行
run = client.create_run_from_pipeline_package(
    pipeline_package_path='math_pipeline.yaml',
    arguments={'a': 10, 'b': 5}
)

完整的 ML Pipeline 示例 #

python
from kfp import dsl
from kfp import compiler
from kfp.dsl import Output, Input, Artifact, Model, Metrics

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def load_data(output_data: Output[Artifact]):
    import pandas as pd
    from sklearn.datasets import load_iris
    
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df['target'] = iris.target
    
    df.to_csv(output_data.path, index=False)

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def train_model(
    input_data: Input[Artifact],
    output_model: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100
):
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    
    df = pd.read_csv(input_data.path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    with open(output_model.path, 'wb') as f:
        pickle.dump(model, f)
    
    metrics.log_metric('accuracy', accuracy)

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def evaluate_model(
    model: Input[Model],
    metrics: Output[Metrics]
):
    import pickle
    
    with open(model.path, 'rb') as f:
        loaded_model = pickle.load(f)
    
    metrics.log_metric('model_loaded', 1)

@dsl.pipeline(
    name='Iris Classification Pipeline',
    description='使用随机森林进行鸢尾花分类'
)
def iris_pipeline(n_estimators: int = 100):
    load_task = load_data()
    
    train_task = train_model(
        input_data=load_task.outputs['output_data'],
        n_estimators=n_estimators
    )
    
    evaluate_task = evaluate_model(
        model=train_task.outputs['output_model']
    )

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=iris_pipeline,
        package_path='iris_pipeline.yaml'
    )

运行超参数调优 #

创建 Katib 实验 #

yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: random-forest-tuning
  namespace: kubeflow-user-example-com
spec:
  objective:
    type: maximize
    goal: 0.98
    objectiveMetricName: accuracy
  algorithm:
    algorithmName: random
  parallelTrialCount: 3
  maxTrialCount: 12
  maxFailedTrialCount: 3
  parameters:
  - name: n_estimators
    parameterType: int
    feasibleSpace:
      min: "10"
      max: "200"
  - name: max_depth
    parameterType: int
    feasibleSpace:
      min: "3"
      max: "10"
  trialTemplate:
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
            - name: training
              image: python:3.9
              command:
              - python
              - -c
              - |
                import pandas as pd
                import numpy as np
                from sklearn.ensemble import RandomForestClassifier
                from sklearn.model_selection import cross_val_score
                from sklearn.datasets import load_iris
                
                iris = load_iris()
                X, y = iris.data, iris.target
                
                n_estimators = ${trialParameters.n_estimators}
                max_depth = ${trialParameters.max_depth}
                
                model = RandomForestClassifier(
                    n_estimators=n_estimators,
                    max_depth=max_depth,
                    random_state=42
                )
                
                scores = cross_val_score(model, X, y, cv=5)
                accuracy = scores.mean()
                
                print(f"Accuracy: {accuracy}")
            restartPolicy: Never
    trialParameters:
    - name: n_estimators
      reference: n_estimators
    - name: max_depth
      reference: max_depth
bash
# 创建实验
kubectl apply -f katib-experiment.yaml

# 查看实验状态
kubectl get experiments -n kubeflow-user-example-com

# 查看试验结果
kubectl get trials -n kubeflow-user-example-com

通过 Dashboard 创建 #

text
1. 点击左侧 "Experiments (AutoML)"

2. 点击 "New Experiment"

3. 填写实验配置:
   ├── Name: random-forest-tuning
   ├── Objective: maximize accuracy
   ├── Algorithm: Random Search
   ├── Parameters:
   │   ├── n_estimators: 10-200
   │   └── max_depth: 3-10
   └── Trial Count: 12

4. 点击 "Create" 创建实验

5. 查看实验进度和结果

运行训练作业 #

创建 TFJob #

yaml
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: tensorflow-training
  namespace: kubeflow-user-example-com
spec:
  tfReplicaSpecs:
    Chief:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: tensorflow
            image: tensorflow/tensorflow:2.12.0
            command:
            - python
            - -c
            - |
              import tensorflow as tf
              print("TensorFlow version:", tf.__version__)
              
              mnist = tf.keras.datasets.mnist
              (x_train, y_train), (x_test, y_test) = mnist.load_data()
              x_train, x_test = x_train / 255.0, x_test / 255.0
              
              model = tf.keras.models.Sequential([
                tf.keras.layers.Flatten(input_shape=(28, 28)),
                tf.keras.layers.Dense(128, activation='relu'),
                tf.keras.layers.Dropout(0.2),
                tf.keras.layers.Dense(10)
              ])
              
              loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
              model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
              
              model.fit(x_train, y_train, epochs=5)
              model.evaluate(x_test, y_test)
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: tensorflow
            image: tensorflow/tensorflow:2.12.0
bash
# 创建 TFJob
kubectl apply -f tfjob.yaml

# 查看训练状态
kubectl get tfjobs -n kubeflow-user-example-com

# 查看训练日志
kubectl logs -f tensorflow-training-chief-0 -n kubeflow-user-example-com

创建 PyTorchJob #

yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-training
  namespace: kubeflow-user-example-com
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
            command:
            - python
            - -c
            - |
              import torch
              import torch.nn as nn
              import torch.optim as optim
              
              print("PyTorch version:", torch.__version__)
              
              model = nn.Sequential(
                  nn.Linear(10, 20),
                  nn.ReLU(),
                  nn.Linear(20, 1)
              )
              
              optimizer = optim.SGD(model.parameters(), lr=0.01)
              criterion = nn.MSELoss()
              
              for epoch in range(10):
                  x = torch.randn(32, 10)
                  y = torch.randn(32, 1)
                  
                  optimizer.zero_grad()
                  output = model(x)
                  loss = criterion(output, y)
                  loss.backward()
                  optimizer.step()
                  
                  print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
bash
# 创建 PyTorchJob
kubectl apply -f pytorchjob.yaml

# 查看训练状态
kubectl get pytorchjobs -n kubeflow-user-example-com

部署模型服务 #

使用 KServe 部署模型 #

yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-iris
  namespace: kubeflow-user-example-com
spec:
  predictor:
    sklearn:
      storageUri: "gs://kfserving-examples/models/sklearn/iris"
      protocolVersion: v2
bash
# 部署模型服务
kubectl apply -f inferenceservice.yaml

# 查看服务状态
kubectl get inferenceservices -n kubeflow-user-example-com

# 获取服务 URL
kubectl get inferenceservice sklearn-iris -n kubeflow-user-example-com -o jsonpath='{.status.url}'

调用模型服务 #

python
import requests
import json

# 服务端点
url = "http://sklearn-iris.kubeflow-user-example-com.svc.cluster.local/v2/models/sklearn-iris/infer"

# 请求数据
data = {
    "inputs": [
        {
            "name": "input-0",
            "shape": [1, 4],
            "datatype": "FP32",
            "data": [[5.1, 3.5, 1.4, 0.2]]
        }
    ]
}

# 发送请求
response = requests.post(url, json=data)
print(response.json())

监控和管理 #

查看资源使用 #

bash
# 查看命名空间资源使用
kubectl top pods -n kubeflow-user-example-com

# 查看节点资源使用
kubectl top nodes

# 查看 Pod 详情
kubectl describe pod <pod-name> -n kubeflow-user-example-com

查看日志 #

bash
# 查看 Pipeline 运行日志
kubectl logs -n kubeflow <pipeline-pod-name>

# 查看 Notebook 日志
kubectl logs -n kubeflow-user-example-com <notebook-pod-name>

# 查看训练作业日志
kubectl logs -n kubeflow-user-example-com <training-pod-name> -f

管理资源 #

bash
# 删除 Notebook
kubectl delete notebook <notebook-name> -n kubeflow-user-example-com

# 删除 Pipeline 运行
# 在 Dashboard 中操作

# 删除训练作业
kubectl delete tfjob <job-name> -n kubeflow-user-example-com
kubectl delete pytorchjob <job-name> -n kubeflow-user-example-com

最佳实践 #

Pipeline 开发 #

text
1. 组件设计原则
   ├── 单一职责
   ├── 明确的输入输出
   └── 可复用性

2. 资源配置
   ├── 合理设置资源请求和限制
   ├── 使用持久化存储
   └── 配置失败重试策略

3. 版本管理
   ├── 对 Pipeline 进行版本控制
   ├── 记录参数和结果
   └── 使用有意义的命名

Notebook 使用 #

text
1. 资源管理
   ├── 不使用时关闭 Notebook
   ├── 合理配置资源
   └── 使用持久化存储保存数据

2. 镜像选择
   ├── 使用官方镜像
   ├── 或自定义镜像
   └── 预装常用依赖

3. 安全实践
   ├── 不要在代码中硬编码密钥
   ├── 使用 Kubernetes Secret
   └── 定期更新密码

下一步 #

现在你已经掌握了 Kubeflow 的基本操作,接下来深入学习 Pipelines 流水线,构建更复杂的机器学习工作流!

最后更新:2026-04-05