快速开始 #
概述 #
本章将通过实际操作带你快速上手 Kubeflow 的核心功能。你将学习:
text
学习内容:
├── 访问 Kubeflow Dashboard
├── 创建 Jupyter Notebook
├── 构建第一个 Pipeline
├── 运行超参数调优
└── 部署模型服务
访问 Kubeflow Dashboard #
登录 Dashboard #
text
1. 打开浏览器访问 Kubeflow Dashboard
└── http://localhost:8080 (端口转发方式)
2. 输入登录凭据
├── 用户名: user@example.com
└── 密码: 12341234
3. 首次登录后建议修改密码
Dashboard 界面 #
text
┌─────────────────────────────────────────────────────────────┐
│ Kubeflow Dashboard │
├─────────────────────────────────────────────────────────────┤
│ │
│ 左侧导航栏: │
│ ├── Home - 首页 │
│ ├── Notebooks - Jupyter 笔记本 │
│ ├── Experiments (AutoML) - Katib 实验 │
│ ├── Experiments (Pipelines) - Pipeline 实验 │
│ ├── Pipelines - 流水线 │
│ ├── Runs - 运行记录 │
│ ├── Models - 模型管理 │
│ ├── Volumes - 存储卷 │
│ └── TensorBoard - 可视化 │
│ │
└─────────────────────────────────────────────────────────────┘
创建 Jupyter Notebook #
通过 Dashboard 创建 #
text
1. 点击左侧导航栏 "Notebooks"
2. 点击 "New Notebook" 按钮
3. 配置 Notebook:
├── Name: my-notebook
├── Namespace: kubeflow-user-example-com
├── Image: 选择预置镜像
│ └── TensorFlow 2.x 或 PyTorch
├── CPU: 2
├── Memory: 4Gi
└── GPU: 0 (可选)
4. 点击 "LAUNCH" 创建
5. 等待状态变为 "Running"
6. 点击 "CONNECT" 连接到 Notebook
通过 YAML 创建 #
yaml
apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
name: my-notebook
namespace: kubeflow-user-example-com
spec:
template:
spec:
containers:
- name: notebook
image: public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow:v1.8.0
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "2"
memory: "4Gi"
serviceAccountName: default-editor
bash
# 应用配置
kubectl apply -f notebook.yaml
# 查看 Notebook 状态
kubectl get notebooks -n kubeflow-user-example-com
在 Notebook 中工作 #
python
# 在 Jupyter Notebook 中运行
# 安装依赖
!pip install numpy pandas scikit-learn
# 导入库
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 评估模型
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
构建第一个 Pipeline #
Pipeline 基础概念 #
text
┌─────────────────────────────────────────────────────────────┐
│ Pipeline 基础概念 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Component(组件): │
│ ├── 独立的可执行单元 │
│ ├── 可以是 Python 函数 │
│ └── 有明确的输入和输出 │
│ │
│ Pipeline(流水线): │
│ ├── 由多个组件组成 │
│ ├── 定义组件间的依赖关系 │
│ └── 支持条件执行和循环 │
│ │
│ Run(运行): │
│ ├── Pipeline 的一次执行 │
│ ├── 记录所有参数和结果 │
│ └── 可以复现和比较 │
│ │
└─────────────────────────────────────────────────────────────┘
安装 Kubeflow Pipelines SDK #
bash
# 安装 SDK
pip install kfp
# 验证安装
python -c "import kfp; print(kfp.__version__)"
创建简单 Pipeline #
python
from kfp import dsl
from kfp import compiler
@dsl.component(base_image='python:3.9')
def add_numbers(a: float, b: float) -> float:
"""计算两个数的和"""
return a + b
@dsl.component(base_image='python:3.9')
def multiply_numbers(a: float, b: float) -> float:
"""计算两个数的积"""
return a * b
@dsl.pipeline(
name='Simple Math Pipeline',
description='一个简单的数学运算流水线'
)
def math_pipeline(a: float = 10, b: float = 5):
add_task = add_numbers(a=a, b=b)
multiply_task = multiply_numbers(a=add_task.output, b=2)
# 编译 Pipeline
compiler.Compiler().compile(
pipeline_func=math_pipeline,
package_path='math_pipeline.yaml'
)
上传并运行 Pipeline #
text
方式一:通过 Dashboard
1. 点击左侧 "Pipelines"
2. 点击 "Upload pipeline"
3. 选择编译好的 YAML 文件
4. 点击 "Create"
5. 点击 "Create run" 运行
方式二:通过 SDK
python
import kfp
# 连接到 Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080/pipeline')
# 上传 Pipeline
pipeline = client.upload_pipeline(
pipeline_package_path='math_pipeline.yaml',
pipeline_name='Simple Math Pipeline'
)
# 创建运行
run = client.create_run_from_pipeline_package(
pipeline_package_path='math_pipeline.yaml',
arguments={'a': 10, 'b': 5}
)
完整的 ML Pipeline 示例 #
python
from kfp import dsl
from kfp import compiler
from kfp.dsl import Output, Input, Artifact, Model, Metrics
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def load_data(output_data: Output[Artifact]):
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target
df.to_csv(output_data.path, index=False)
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def train_model(
input_data: Input[Artifact],
output_model: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100
):
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
df = pd.read_csv(input_data.path)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
with open(output_model.path, 'wb') as f:
pickle.dump(model, f)
metrics.log_metric('accuracy', accuracy)
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def evaluate_model(
model: Input[Model],
metrics: Output[Metrics]
):
import pickle
with open(model.path, 'rb') as f:
loaded_model = pickle.load(f)
metrics.log_metric('model_loaded', 1)
@dsl.pipeline(
name='Iris Classification Pipeline',
description='使用随机森林进行鸢尾花分类'
)
def iris_pipeline(n_estimators: int = 100):
load_task = load_data()
train_task = train_model(
input_data=load_task.outputs['output_data'],
n_estimators=n_estimators
)
evaluate_task = evaluate_model(
model=train_task.outputs['output_model']
)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=iris_pipeline,
package_path='iris_pipeline.yaml'
)
运行超参数调优 #
创建 Katib 实验 #
yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: random-forest-tuning
namespace: kubeflow-user-example-com
spec:
objective:
type: maximize
goal: 0.98
objectiveMetricName: accuracy
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
parameters:
- name: n_estimators
parameterType: int
feasibleSpace:
min: "10"
max: "200"
- name: max_depth
parameterType: int
feasibleSpace:
min: "3"
max: "10"
trialTemplate:
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training
image: python:3.9
command:
- python
- -c
- |
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_iris
iris = load_iris()
X, y = iris.data, iris.target
n_estimators = ${trialParameters.n_estimators}
max_depth = ${trialParameters.max_depth}
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
scores = cross_val_score(model, X, y, cv=5)
accuracy = scores.mean()
print(f"Accuracy: {accuracy}")
restartPolicy: Never
trialParameters:
- name: n_estimators
reference: n_estimators
- name: max_depth
reference: max_depth
bash
# 创建实验
kubectl apply -f katib-experiment.yaml
# 查看实验状态
kubectl get experiments -n kubeflow-user-example-com
# 查看试验结果
kubectl get trials -n kubeflow-user-example-com
通过 Dashboard 创建 #
text
1. 点击左侧 "Experiments (AutoML)"
2. 点击 "New Experiment"
3. 填写实验配置:
├── Name: random-forest-tuning
├── Objective: maximize accuracy
├── Algorithm: Random Search
├── Parameters:
│ ├── n_estimators: 10-200
│ └── max_depth: 3-10
└── Trial Count: 12
4. 点击 "Create" 创建实验
5. 查看实验进度和结果
运行训练作业 #
创建 TFJob #
yaml
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: tensorflow-training
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Chief:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:2.12.0
command:
- python
- -c
- |
import tensorflow as tf
print("TensorFlow version:", tf.__version__)
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)
Worker:
replicas: 2
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: tensorflow/tensorflow:2.12.0
bash
# 创建 TFJob
kubectl apply -f tfjob.yaml
# 查看训练状态
kubectl get tfjobs -n kubeflow-user-example-com
# 查看训练日志
kubectl logs -f tensorflow-training-chief-0 -n kubeflow-user-example-com
创建 PyTorchJob #
yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
namespace: kubeflow-user-example-com
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
command:
- python
- -c
- |
import torch
import torch.nn as nn
import torch.optim as optim
print("PyTorch version:", torch.__version__)
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 1)
)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
for epoch in range(10):
x = torch.randn(32, 10)
y = torch.randn(32, 1)
optimizer.zero_grad()
output = model(x)
loss = criterion(output, y)
loss.backward()
optimizer.step()
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
Worker:
replicas: 2
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
bash
# 创建 PyTorchJob
kubectl apply -f pytorchjob.yaml
# 查看训练状态
kubectl get pytorchjobs -n kubeflow-user-example-com
部署模型服务 #
使用 KServe 部署模型 #
yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-iris
namespace: kubeflow-user-example-com
spec:
predictor:
sklearn:
storageUri: "gs://kfserving-examples/models/sklearn/iris"
protocolVersion: v2
bash
# 部署模型服务
kubectl apply -f inferenceservice.yaml
# 查看服务状态
kubectl get inferenceservices -n kubeflow-user-example-com
# 获取服务 URL
kubectl get inferenceservice sklearn-iris -n kubeflow-user-example-com -o jsonpath='{.status.url}'
调用模型服务 #
python
import requests
import json
# 服务端点
url = "http://sklearn-iris.kubeflow-user-example-com.svc.cluster.local/v2/models/sklearn-iris/infer"
# 请求数据
data = {
"inputs": [
{
"name": "input-0",
"shape": [1, 4],
"datatype": "FP32",
"data": [[5.1, 3.5, 1.4, 0.2]]
}
]
}
# 发送请求
response = requests.post(url, json=data)
print(response.json())
监控和管理 #
查看资源使用 #
bash
# 查看命名空间资源使用
kubectl top pods -n kubeflow-user-example-com
# 查看节点资源使用
kubectl top nodes
# 查看 Pod 详情
kubectl describe pod <pod-name> -n kubeflow-user-example-com
查看日志 #
bash
# 查看 Pipeline 运行日志
kubectl logs -n kubeflow <pipeline-pod-name>
# 查看 Notebook 日志
kubectl logs -n kubeflow-user-example-com <notebook-pod-name>
# 查看训练作业日志
kubectl logs -n kubeflow-user-example-com <training-pod-name> -f
管理资源 #
bash
# 删除 Notebook
kubectl delete notebook <notebook-name> -n kubeflow-user-example-com
# 删除 Pipeline 运行
# 在 Dashboard 中操作
# 删除训练作业
kubectl delete tfjob <job-name> -n kubeflow-user-example-com
kubectl delete pytorchjob <job-name> -n kubeflow-user-example-com
最佳实践 #
Pipeline 开发 #
text
1. 组件设计原则
├── 单一职责
├── 明确的输入输出
└── 可复用性
2. 资源配置
├── 合理设置资源请求和限制
├── 使用持久化存储
└── 配置失败重试策略
3. 版本管理
├── 对 Pipeline 进行版本控制
├── 记录参数和结果
└── 使用有意义的命名
Notebook 使用 #
text
1. 资源管理
├── 不使用时关闭 Notebook
├── 合理配置资源
└── 使用持久化存储保存数据
2. 镜像选择
├── 使用官方镜像
├── 或自定义镜像
└── 预装常用依赖
3. 安全实践
├── 不要在代码中硬编码密钥
├── 使用 Kubernetes Secret
└── 定期更新密码
下一步 #
现在你已经掌握了 Kubeflow 的基本操作,接下来深入学习 Pipelines 流水线,构建更复杂的机器学习工作流!
最后更新:2026-04-05