Pipelines 流水线 #
概述 #
Kubeflow Pipelines 是构建和部署可移植、可扩展机器学习工作流的核心组件。它提供了完整的工具链来管理 ML 工作流的生命周期。
核心功能 #
text
┌─────────────────────────────────────────────────────────────┐
│ Pipelines 核心功能 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 工作流编排: │
│ ├── 定义组件依赖关系 │
│ ├── 支持条件执行和循环 │
│ ├── 并行任务执行 │
│ └── 错误处理和重试 │
│ │
│ 实验管理: │
│ ├── 自动记录参数和指标 │
│ ├── 版本控制和比较 │
│ ├── 可视化展示 │
│ └── 结果复现 │
│ │
│ 组件复用: │
│ ├── 预置组件库 │
│ ├── 自定义组件开发 │
│ ├── 组件共享 │
│ └── 组件版本管理 │
│ │
└─────────────────────────────────────────────────────────────┘
核心概念 #
Pipeline #
Pipeline 是由多个组件组成的工作流,定义了组件之间的执行顺序和数据依赖。
text
┌─────────────────────────────────────────────────────────────┐
│ Pipeline 结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ 数据加载 │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 数据预处理│ │
│ └────┬─────┘ │
│ │ │
│ ┌────┴────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────┐ ┌────────┐ │
│ │ 模型A │ │ 模型B │ │
│ └───┬────┘ └───┬────┘ │
│ │ │ │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 模型评估 │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 模型部署 │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Component #
Component 是 Pipeline 的基本构建单元,代表一个独立的计算任务。
text
Component 组成:
├── 名称和描述
├── 输入参数
├── 输出结果
├── 容器镜像
├── 执行命令
└── 资源配置
Experiment 和 Run #
text
Experiment(实验):
├── 一组相关的 Pipeline 运行
├── 用于组织和管理运行
└── 方便比较不同运行结果
Run(运行):
├── Pipeline 的一次执行实例
├── 记录所有参数和指标
├── 保存输出工件
└── 可复现和追踪
安装和配置 SDK #
安装 Kubeflow Pipelines SDK #
bash
# 安装最新版本
pip install kfp
# 安装指定版本
pip install kfp==2.0.0
# 验证安装
python -c "import kfp; print(kfp.__version__)"
配置客户端 #
python
import kfp
# 连接到 Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080/pipeline')
# 验证连接
print(client.list_experiments())
组件开发 #
使用 @dsl.component 装饰器 #
python
from kfp import dsl
from kfp.dsl import Output, Input, Artifact, Model, Metrics, Dataset
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
input_path: str,
output_data: Output[Dataset],
output_metrics: Output[Metrics]
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv(input_path)
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df.select_dtypes(include=['float64', 'int']))
output_df = pd.DataFrame(scaled_data, columns=df.select_dtypes(include=['float64', 'int']).columns)
output_df.to_csv(output_data.path, index=False)
output_metrics.log_metric('rows', len(df))
output_metrics.log_metric('columns', len(df.columns))
组件输入输出类型 #
python
from kfp import dsl
from kfp.dsl import Output, Input, Artifact, Model, Metrics, Dataset, ClassificationMetrics
@dsl.component(base_image='python:3.9')
def component_types_demo(
# 基本类型
param_string: str,
param_int: int,
param_float: float,
param_bool: bool,
param_list: list,
param_dict: dict,
# 输入工件
input_dataset: Input[Dataset],
input_model: Input[Model],
input_artifact: Input[Artifact],
# 输出工件
output_dataset: Output[Dataset],
output_model: Output[Model],
output_metrics: Output[Metrics],
output_classification_metrics: Output[ClassificationMetrics]
):
import pandas as pd
# 读取输入数据集
df = pd.read_csv(input_dataset.path)
# 写入输出数据集
df.to_csv(output_dataset.path, index=False)
# 记录指标
output_metrics.log_metric('accuracy', 0.95)
# 记录分类指标
output_classification_metrics.log_roc_curve(
fpr=[0.0, 0.1, 0.2, 1.0],
tpr=[0.0, 0.5, 0.8, 1.0],
threshold=[1.0, 0.5, 0.2, 0.0]
)
使用容器镜像 #
python
@dsl.component(
base_image='tensorflow/tensorflow:2.12.0',
packages_to_install=['pandas', 'numpy']
)
def train_tensorflow_model(
data_path: str,
model_output: Output[Model],
epochs: int = 10
):
import tensorflow as tf
import pandas as pd
df = pd.read_csv(data_path)
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mse')
model.fit(df.values, epochs=epochs)
model.save(model_output.path)
定义资源配置 #
python
@dsl.component(
base_image='python:3.9',
packages_to_install=['torch']
)
def gpu_training(
data_path: str,
model_output: Output[Model]
):
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
@dsl.pipeline(name='GPU Pipeline')
def gpu_pipeline():
task = gpu_training(data_path='/data/train.csv')
task.set_cpu_limit('4')
task.set_memory_limit('16Gi')
task.set_accelerator_type('nvidia.com/gpu')
task.set_accelerator_limit('1')
Pipeline 编排 #
基本 Pipeline #
python
from kfp import dsl, compiler
@dsl.component(base_image='python:3.9')
def step_one(message: str) -> str:
print(f"Step one: {message}")
return f"Processed: {message}"
@dsl.component(base_image='python:3.9')
def step_two(input_message: str) -> str:
print(f"Step two: {input_message}")
return f"Final: {input_message}"
@dsl.pipeline(name='Simple Pipeline', description='A simple two-step pipeline')
def simple_pipeline(message: str = 'Hello'):
task1 = step_one(message=message)
task2 = step_two(input_message=task1.output)
if __name__ == '__main__':
compiler.Compiler().compile(simple_pipeline, 'simple_pipeline.yaml')
数据传递 Pipeline #
python
from kfp import dsl, compiler
from kfp.dsl import Output, Input, Dataset, Model, Metrics
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def load_data(output_data: Output[Dataset]):
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target
df.to_csv(output_data.path, index=False)
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def split_data(
input_data: Input[Dataset],
train_data: Output[Dataset],
test_data: Output[Dataset],
test_size: float = 0.2
):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_data.path)
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
train_df.to_csv(train_data.path, index=False)
test_df.to_csv(test_data.path, index=False)
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100
):
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
df = pd.read_csv(train_data.path)
X = df.drop('target', axis=1)
y = df['target']
model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
scores = cross_val_score(model, X, y, cv=5)
model.fit(X, y)
with open(model_output.path, 'wb') as f:
pickle.dump(model, f)
metrics.log_metric('cv_accuracy_mean', scores.mean())
metrics.log_metric('cv_accuracy_std', scores.std())
@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def evaluate_model(
test_data: Input[Dataset],
model_input: Input[Model],
metrics: Output[Metrics]
):
import pandas as pd
import pickle
from sklearn.metrics import accuracy_score, precision_score, recall_score
df = pd.read_csv(test_data.path)
X_test = df.drop('target', axis=1)
y_test = df['target']
with open(model_input.path, 'rb') as f:
model = pickle.load(f)
y_pred = model.predict(X_test)
metrics.log_metric('accuracy', accuracy_score(y_test, y_pred))
metrics.log_metric('precision', precision_score(y_test, y_pred, average='weighted'))
metrics.log_metric('recall', recall_score(y_test, y_pred, average='weighted'))
@dsl.pipeline(
name='ML Training Pipeline',
description='完整的机器学习训练流水线'
)
def ml_pipeline(
test_size: float = 0.2,
n_estimators: int = 100
):
load_task = load_data()
split_task = split_data(
input_data=load_task.outputs['output_data'],
test_size=test_size
)
train_task = train_model(
train_data=split_task.outputs['train_data'],
n_estimators=n_estimators
)
evaluate_task = evaluate_model(
test_data=split_task.outputs['test_data'],
model_input=train_task.outputs['model_output']
)
if __name__ == '__main__':
compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
条件执行 #
python
from kfp import dsl, compiler
@dsl.component(base_image='python:3.9')
def check_condition(value: int) -> bool:
return value > 10
@dsl.component(base_image='python:3.9')
def process_large():
print("Processing large value")
@dsl.component(base_image='python:3.9')
def process_small():
print("Processing small value")
@dsl.pipeline(name='Conditional Pipeline')
def conditional_pipeline(threshold: int = 15):
check_task = check_condition(value=threshold)
with dsl.If(check_task.output == True):
process_large()
with dsl.If(check_task.output == False):
process_small()
if __name__ == '__main__':
compiler.Compiler().compile(conditional_pipeline, 'conditional_pipeline.yaml')
循环执行 #
python
from kfp import dsl, compiler
@dsl.component(base_image='python:3.9')
def process_item(item: str) -> str:
print(f"Processing: {item}")
return f"Processed: {item}"
@dsl.component(base_image='python:3.9')
def aggregate_results(results: list) -> str:
return ", ".join(results)
@dsl.pipeline(name='Loop Pipeline')
def loop_pipeline():
items = ['item1', 'item2', 'item3']
with dsl.ParallelFor(items) as item:
process_task = process_item(item=item)
aggregate_task = aggregate_results(
results=dsl.Collected(process_task.outputs['Output'])
)
if __name__ == '__main__':
compiler.Compiler().compile(loop_pipeline, 'loop_pipeline.yaml')
并行执行 #
python
from kfp import dsl, compiler
from kfp.dsl import Output, Model, Metrics
@dsl.component(base_image='python:3.9', packages_to_install=['scikit-learn'])
def train_with_params(
n_estimators: int,
max_depth: int,
model_output: Output[Model],
metrics: Output[Metrics]
):
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
import pickle
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
scores = cross_val_score(model, X, y, cv=5)
model.fit(X, y)
with open(model_output.path, 'wb') as f:
pickle.dump(model, f)
metrics.log_metric('accuracy', scores.mean())
metrics.log_metric('n_estimators', n_estimators)
metrics.log_metric('max_depth', max_depth)
@dsl.pipeline(name='Parallel Training Pipeline')
def parallel_training_pipeline():
params_list = [
{'n_estimators': 50, 'max_depth': 5},
{'n_estimators': 100, 'max_depth': 10},
{'n_estimators': 200, 'max_depth': 15},
]
with dsl.ParallelFor(params_list) as params:
train_with_params(
n_estimators=params.n_estimators,
max_depth=params.max_depth
)
if __name__ == '__main__':
compiler.Compiler().compile(parallel_training_pipeline, 'parallel_pipeline.yaml')
Pipeline 执行 #
上传和运行 #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
# 上传 Pipeline
pipeline = client.upload_pipeline(
pipeline_package_path='ml_pipeline.yaml',
pipeline_name='ML Training Pipeline'
)
# 创建实验
experiment = client.create_experiment(
name='ML Experiments',
description='机器学习实验'
)
# 运行 Pipeline
run = client.create_run_from_pipeline_package(
pipeline_package_path='ml_pipeline.yaml',
arguments={
'test_size': 0.2,
'n_estimators': 100
},
experiment_name='ML Experiments',
run_name='Run 1'
)
print(f"Run ID: {run.run_id}")
使用 Pipeline 函数运行 #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
# 直接从函数创建运行
run = client.create_run_from_pipeline_func(
pipeline_func=ml_pipeline,
arguments={
'test_size': 0.3,
'n_estimators': 200
},
experiment_name='ML Experiments',
run_name='Run 2'
)
定时执行 #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
# 创建定时任务
job = client.create_recurring_run(
experiment_id='your-experiment-id',
job_name='Daily Training',
description='每日训练任务',
pipeline_package_path='ml_pipeline.yaml',
cron_expression='0 0 * * *', # 每天 00:00
arguments={
'test_size': 0.2,
'n_estimators': 100
}
)
实验管理 #
查看实验和运行 #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
# 列出所有实验
experiments = client.list_experiments()
for exp in experiments.experiments:
print(f"Experiment: {exp.name}")
# 列出实验下的运行
runs = client.list_runs(experiment_id='your-experiment-id')
for run in runs.runs:
print(f"Run: {run.name}, Status: {run.status}")
# 获取运行详情
run_detail = client.get_run(run_id='your-run-id')
print(run_detail)
比较运行结果 #
python
import kfp
client = kfp.Client(host='http://localhost:8080/pipeline')
# 获取运行的指标
def get_run_metrics(run_id):
run = client.get_run(run_id)
metrics = run.pipeline_runtime.workflow_manifest.get('status', {}).get('metrics', [])
return {m['name']: m['value'] for m in metrics}
# 比较多个运行
run_ids = ['run-id-1', 'run-id-2', 'run-id-3']
for run_id in run_ids:
metrics = get_run_metrics(run_id)
print(f"Run {run_id}: {metrics}")
工件管理 #
工件类型 #
text
内置工件类型:
├── Artifact - 通用工件
├── Dataset - 数据集
├── Model - 模型
├── Metrics - 指标
├── ClassificationMetrics - 分类指标
├── SlicedClassificationMetrics - 分片分类指标
├── HTML - HTML 文件
├── Markdown - Markdown 文件
└── Plot - 图表
自定义工件 #
python
from kfp import dsl
from kfp.dsl import Artifact
class CustomModel(Artifact):
schema_title = 'custom.Model'
schema_version = '1.0.0'
@dsl.component(base_image='python:3.9')
def save_custom_model(model_output: Output[CustomModel]):
with open(model_output.path, 'w') as f:
f.write('Custom model content')
model_output.metadata['format'] = 'custom'
model_output.metadata['version'] = '1.0'
工件可视化 #
python
from kfp import dsl
from kfp.dsl import Output, HTML, Markdown, Plot
import matplotlib.pyplot as plt
@dsl.component(base_image='python:3.9', packages_to_install=['matplotlib'])
def create_visualizations(
html_output: Output[HTML],
markdown_output: Output[Markdown],
plot_output: Output[Plot]
):
# 创建 HTML 报告
html_content = """
<h1>Model Report</h1>
<p>Accuracy: 95%</p>
"""
with open(html_output.path, 'w') as f:
f.write(html_content)
# 创建 Markdown 报告
markdown_content = """
# Model Report
## Results
- Accuracy: 95%
- Precision: 94%
"""
with open(markdown_output.path, 'w') as f:
f.write(markdown_content)
# 创建图表
fig, ax = plt.subplots()
ax.plot([1, 2, 3, 4], [1, 4, 2, 3])
ax.set_title('Training Progress')
plt.savefig(plot_output.path)
plt.close()
最佳实践 #
Pipeline 设计原则 #
text
1. 模块化设计
├── 每个组件职责单一
├── 组件可独立测试
└── 组件可复用
2. 数据管理
├── 使用工件传递数据
├── 避免重复计算
└── 合理设置缓存
3. 资源配置
├── 合理设置资源请求和限制
├── 根据任务类型选择镜像
└── 使用 GPU 时注意调度
4. 错误处理
├── 设置重试策略
├── 添加超时设置
└── 记录详细日志
性能优化 #
python
@dsl.pipeline(name='Optimized Pipeline')
def optimized_pipeline():
# 设置缓存
task = train_model()
task.execution_options.caching_strategy.max_cache_staleness = '1h'
# 设置超时
task.timeout_seconds = 3600
# 设置重试
task.set_retry(3)
安全最佳实践 #
python
from kfp import dsl
@dsl.component(base_image='python:3.9')
def secure_component():
pass
@dsl.pipeline(name='Secure Pipeline')
def secure_pipeline():
task = secure_component()
# 使用 Secret
task.set_env_variable('DB_PASSWORD', dsl.V1EnvVarSource(
secret_key_ref=dsl.V1SecretKeySelector(
name='db-secret',
key='password'
)
))
下一步 #
现在你已经掌握了 Pipelines 的核心功能,接下来学习 Notebooks 笔记本,了解如何使用交互式开发环境!
最后更新:2026-04-05