Pipelines 流水线 #

概述 #

Kubeflow Pipelines 是构建和部署可移植、可扩展机器学习工作流的核心组件。它提供了完整的工具链来管理 ML 工作流的生命周期。

核心功能 #

text
┌─────────────────────────────────────────────────────────────┐
│                  Pipelines 核心功能                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  工作流编排:                                                │
│  ├── 定义组件依赖关系                                       │
│  ├── 支持条件执行和循环                                      │
│  ├── 并行任务执行                                           │
│  └── 错误处理和重试                                         │
│                                                             │
│  实验管理:                                                  │
│  ├── 自动记录参数和指标                                      │
│  ├── 版本控制和比较                                          │
│  ├── 可视化展示                                             │
│  └── 结果复现                                               │
│                                                             │
│  组件复用:                                                  │
│  ├── 预置组件库                                             │
│  ├── 自定义组件开发                                          │
│  ├── 组件共享                                               │
│  └── 组件版本管理                                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

核心概念 #

Pipeline #

Pipeline 是由多个组件组成的工作流,定义了组件之间的执行顺序和数据依赖。

text
┌─────────────────────────────────────────────────────────────┐
│                    Pipeline 结构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│     ┌──────────┐                                            │
│     │ 数据加载  │                                            │
│     └────┬─────┘                                            │
│          │                                                  │
│          ▼                                                  │
│     ┌──────────┐                                            │
│     │ 数据预处理│                                            │
│     └────┬─────┘                                            │
│          │                                                  │
│     ┌────┴────┐                                             │
│     │         │                                             │
│     ▼         ▼                                             │
│ ┌────────┐ ┌────────┐                                       │
│ │ 模型A  │ │ 模型B  │                                       │
│ └───┬────┘ └───┬────┘                                       │
│     │         │                                             │
│     └────┬────┘                                             │
│          │                                                  │
│          ▼                                                  │
│     ┌──────────┐                                            │
│     │ 模型评估  │                                            │
│     └────┬─────┘                                            │
│          │                                                  │
│          ▼                                                  │
│     ┌──────────┐                                            │
│     │ 模型部署  │                                            │
│     └──────────┘                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Component #

Component 是 Pipeline 的基本构建单元,代表一个独立的计算任务。

text
Component 组成:
├── 名称和描述
├── 输入参数
├── 输出结果
├── 容器镜像
├── 执行命令
└── 资源配置

Experiment 和 Run #

text
Experiment(实验):
├── 一组相关的 Pipeline 运行
├── 用于组织和管理运行
└── 方便比较不同运行结果

Run(运行):
├── Pipeline 的一次执行实例
├── 记录所有参数和指标
├── 保存输出工件
└── 可复现和追踪

安装和配置 SDK #

安装 Kubeflow Pipelines SDK #

bash
# 安装最新版本
pip install kfp

# 安装指定版本
pip install kfp==2.0.0

# 验证安装
python -c "import kfp; print(kfp.__version__)"

配置客户端 #

python
import kfp

# 连接到 Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080/pipeline')

# 验证连接
print(client.list_experiments())

组件开发 #

使用 @dsl.component 装饰器 #

python
from kfp import dsl
from kfp.dsl import Output, Input, Artifact, Model, Metrics, Dataset

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
    input_path: str,
    output_data: Output[Dataset],
    output_metrics: Output[Metrics]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    
    df = pd.read_csv(input_path)
    
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(df.select_dtypes(include=['float64', 'int']))
    
    output_df = pd.DataFrame(scaled_data, columns=df.select_dtypes(include=['float64', 'int']).columns)
    output_df.to_csv(output_data.path, index=False)
    
    output_metrics.log_metric('rows', len(df))
    output_metrics.log_metric('columns', len(df.columns))

组件输入输出类型 #

python
from kfp import dsl
from kfp.dsl import Output, Input, Artifact, Model, Metrics, Dataset, ClassificationMetrics

@dsl.component(base_image='python:3.9')
def component_types_demo(
    # 基本类型
    param_string: str,
    param_int: int,
    param_float: float,
    param_bool: bool,
    param_list: list,
    param_dict: dict,
    
    # 输入工件
    input_dataset: Input[Dataset],
    input_model: Input[Model],
    input_artifact: Input[Artifact],
    
    # 输出工件
    output_dataset: Output[Dataset],
    output_model: Output[Model],
    output_metrics: Output[Metrics],
    output_classification_metrics: Output[ClassificationMetrics]
):
    import pandas as pd
    
    # 读取输入数据集
    df = pd.read_csv(input_dataset.path)
    
    # 写入输出数据集
    df.to_csv(output_dataset.path, index=False)
    
    # 记录指标
    output_metrics.log_metric('accuracy', 0.95)
    
    # 记录分类指标
    output_classification_metrics.log_roc_curve(
        fpr=[0.0, 0.1, 0.2, 1.0],
        tpr=[0.0, 0.5, 0.8, 1.0],
        threshold=[1.0, 0.5, 0.2, 0.0]
    )

使用容器镜像 #

python
@dsl.component(
    base_image='tensorflow/tensorflow:2.12.0',
    packages_to_install=['pandas', 'numpy']
)
def train_tensorflow_model(
    data_path: str,
    model_output: Output[Model],
    epochs: int = 10
):
    import tensorflow as tf
    import pandas as pd
    
    df = pd.read_csv(data_path)
    
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1)
    ])
    
    model.compile(optimizer='adam', loss='mse')
    model.fit(df.values, epochs=epochs)
    
    model.save(model_output.path)

定义资源配置 #

python
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['torch']
)
def gpu_training(
    data_path: str,
    model_output: Output[Model]
):
    import torch
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

@dsl.pipeline(name='GPU Pipeline')
def gpu_pipeline():
    task = gpu_training(data_path='/data/train.csv')
    
    task.set_cpu_limit('4')
    task.set_memory_limit('16Gi')
    task.set_accelerator_type('nvidia.com/gpu')
    task.set_accelerator_limit('1')

Pipeline 编排 #

基本 Pipeline #

python
from kfp import dsl, compiler

@dsl.component(base_image='python:3.9')
def step_one(message: str) -> str:
    print(f"Step one: {message}")
    return f"Processed: {message}"

@dsl.component(base_image='python:3.9')
def step_two(input_message: str) -> str:
    print(f"Step two: {input_message}")
    return f"Final: {input_message}"

@dsl.pipeline(name='Simple Pipeline', description='A simple two-step pipeline')
def simple_pipeline(message: str = 'Hello'):
    task1 = step_one(message=message)
    task2 = step_two(input_message=task1.output)

if __name__ == '__main__':
    compiler.Compiler().compile(simple_pipeline, 'simple_pipeline.yaml')

数据传递 Pipeline #

python
from kfp import dsl, compiler
from kfp.dsl import Output, Input, Dataset, Model, Metrics

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def load_data(output_data: Output[Dataset]):
    import pandas as pd
    from sklearn.datasets import load_iris
    
    iris = load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df['target'] = iris.target
    df.to_csv(output_data.path, index=False)

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def split_data(
    input_data: Input[Dataset],
    train_data: Output[Dataset],
    test_data: Output[Dataset],
    test_size: float = 0.2
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    df = pd.read_csv(input_data.path)
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
    
    train_df.to_csv(train_data.path, index=False)
    test_df.to_csv(test_data.path, index=False)

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def train_model(
    train_data: Input[Dataset],
    model_output: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100
):
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    
    df = pd.read_csv(train_data.path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
    scores = cross_val_score(model, X, y, cv=5)
    
    model.fit(X, y)
    
    with open(model_output.path, 'wb') as f:
        pickle.dump(model, f)
    
    metrics.log_metric('cv_accuracy_mean', scores.mean())
    metrics.log_metric('cv_accuracy_std', scores.std())

@dsl.component(base_image='python:3.9', packages_to_install=['pandas', 'scikit-learn'])
def evaluate_model(
    test_data: Input[Dataset],
    model_input: Input[Model],
    metrics: Output[Metrics]
):
    import pandas as pd
    import pickle
    from sklearn.metrics import accuracy_score, precision_score, recall_score
    
    df = pd.read_csv(test_data.path)
    X_test = df.drop('target', axis=1)
    y_test = df['target']
    
    with open(model_input.path, 'rb') as f:
        model = pickle.load(f)
    
    y_pred = model.predict(X_test)
    
    metrics.log_metric('accuracy', accuracy_score(y_test, y_pred))
    metrics.log_metric('precision', precision_score(y_test, y_pred, average='weighted'))
    metrics.log_metric('recall', recall_score(y_test, y_pred, average='weighted'))

@dsl.pipeline(
    name='ML Training Pipeline',
    description='完整的机器学习训练流水线'
)
def ml_pipeline(
    test_size: float = 0.2,
    n_estimators: int = 100
):
    load_task = load_data()
    
    split_task = split_data(
        input_data=load_task.outputs['output_data'],
        test_size=test_size
    )
    
    train_task = train_model(
        train_data=split_task.outputs['train_data'],
        n_estimators=n_estimators
    )
    
    evaluate_task = evaluate_model(
        test_data=split_task.outputs['test_data'],
        model_input=train_task.outputs['model_output']
    )

if __name__ == '__main__':
    compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')

条件执行 #

python
from kfp import dsl, compiler

@dsl.component(base_image='python:3.9')
def check_condition(value: int) -> bool:
    return value > 10

@dsl.component(base_image='python:3.9')
def process_large():
    print("Processing large value")

@dsl.component(base_image='python:3.9')
def process_small():
    print("Processing small value")

@dsl.pipeline(name='Conditional Pipeline')
def conditional_pipeline(threshold: int = 15):
    check_task = check_condition(value=threshold)
    
    with dsl.If(check_task.output == True):
        process_large()
    
    with dsl.If(check_task.output == False):
        process_small()

if __name__ == '__main__':
    compiler.Compiler().compile(conditional_pipeline, 'conditional_pipeline.yaml')

循环执行 #

python
from kfp import dsl, compiler

@dsl.component(base_image='python:3.9')
def process_item(item: str) -> str:
    print(f"Processing: {item}")
    return f"Processed: {item}"

@dsl.component(base_image='python:3.9')
def aggregate_results(results: list) -> str:
    return ", ".join(results)

@dsl.pipeline(name='Loop Pipeline')
def loop_pipeline():
    items = ['item1', 'item2', 'item3']
    
    with dsl.ParallelFor(items) as item:
        process_task = process_item(item=item)
    
    aggregate_task = aggregate_results(
        results=dsl.Collected(process_task.outputs['Output'])
    )

if __name__ == '__main__':
    compiler.Compiler().compile(loop_pipeline, 'loop_pipeline.yaml')

并行执行 #

python
from kfp import dsl, compiler
from kfp.dsl import Output, Model, Metrics

@dsl.component(base_image='python:3.9', packages_to_install=['scikit-learn'])
def train_with_params(
    n_estimators: int,
    max_depth: int,
    model_output: Output[Model],
    metrics: Output[Metrics]
):
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import load_iris
    from sklearn.model_selection import cross_val_score
    import pickle
    
    X, y = load_iris(return_X_y=True)
    
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    
    scores = cross_val_score(model, X, y, cv=5)
    model.fit(X, y)
    
    with open(model_output.path, 'wb') as f:
        pickle.dump(model, f)
    
    metrics.log_metric('accuracy', scores.mean())
    metrics.log_metric('n_estimators', n_estimators)
    metrics.log_metric('max_depth', max_depth)

@dsl.pipeline(name='Parallel Training Pipeline')
def parallel_training_pipeline():
    params_list = [
        {'n_estimators': 50, 'max_depth': 5},
        {'n_estimators': 100, 'max_depth': 10},
        {'n_estimators': 200, 'max_depth': 15},
    ]
    
    with dsl.ParallelFor(params_list) as params:
        train_with_params(
            n_estimators=params.n_estimators,
            max_depth=params.max_depth
        )

if __name__ == '__main__':
    compiler.Compiler().compile(parallel_training_pipeline, 'parallel_pipeline.yaml')

Pipeline 执行 #

上传和运行 #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

# 上传 Pipeline
pipeline = client.upload_pipeline(
    pipeline_package_path='ml_pipeline.yaml',
    pipeline_name='ML Training Pipeline'
)

# 创建实验
experiment = client.create_experiment(
    name='ML Experiments',
    description='机器学习实验'
)

# 运行 Pipeline
run = client.create_run_from_pipeline_package(
    pipeline_package_path='ml_pipeline.yaml',
    arguments={
        'test_size': 0.2,
        'n_estimators': 100
    },
    experiment_name='ML Experiments',
    run_name='Run 1'
)

print(f"Run ID: {run.run_id}")

使用 Pipeline 函数运行 #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

# 直接从函数创建运行
run = client.create_run_from_pipeline_func(
    pipeline_func=ml_pipeline,
    arguments={
        'test_size': 0.3,
        'n_estimators': 200
    },
    experiment_name='ML Experiments',
    run_name='Run 2'
)

定时执行 #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

# 创建定时任务
job = client.create_recurring_run(
    experiment_id='your-experiment-id',
    job_name='Daily Training',
    description='每日训练任务',
    pipeline_package_path='ml_pipeline.yaml',
    cron_expression='0 0 * * *',  # 每天 00:00
    arguments={
        'test_size': 0.2,
        'n_estimators': 100
    }
)

实验管理 #

查看实验和运行 #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

# 列出所有实验
experiments = client.list_experiments()
for exp in experiments.experiments:
    print(f"Experiment: {exp.name}")

# 列出实验下的运行
runs = client.list_runs(experiment_id='your-experiment-id')
for run in runs.runs:
    print(f"Run: {run.name}, Status: {run.status}")

# 获取运行详情
run_detail = client.get_run(run_id='your-run-id')
print(run_detail)

比较运行结果 #

python
import kfp

client = kfp.Client(host='http://localhost:8080/pipeline')

# 获取运行的指标
def get_run_metrics(run_id):
    run = client.get_run(run_id)
    metrics = run.pipeline_runtime.workflow_manifest.get('status', {}).get('metrics', [])
    return {m['name']: m['value'] for m in metrics}

# 比较多个运行
run_ids = ['run-id-1', 'run-id-2', 'run-id-3']
for run_id in run_ids:
    metrics = get_run_metrics(run_id)
    print(f"Run {run_id}: {metrics}")

工件管理 #

工件类型 #

text
内置工件类型:
├── Artifact - 通用工件
├── Dataset - 数据集
├── Model - 模型
├── Metrics - 指标
├── ClassificationMetrics - 分类指标
├── SlicedClassificationMetrics - 分片分类指标
├── HTML - HTML 文件
├── Markdown - Markdown 文件
└── Plot - 图表

自定义工件 #

python
from kfp import dsl
from kfp.dsl import Artifact

class CustomModel(Artifact):
    schema_title = 'custom.Model'
    schema_version = '1.0.0'

@dsl.component(base_image='python:3.9')
def save_custom_model(model_output: Output[CustomModel]):
    with open(model_output.path, 'w') as f:
        f.write('Custom model content')
    
    model_output.metadata['format'] = 'custom'
    model_output.metadata['version'] = '1.0'

工件可视化 #

python
from kfp import dsl
from kfp.dsl import Output, HTML, Markdown, Plot
import matplotlib.pyplot as plt

@dsl.component(base_image='python:3.9', packages_to_install=['matplotlib'])
def create_visualizations(
    html_output: Output[HTML],
    markdown_output: Output[Markdown],
    plot_output: Output[Plot]
):
    # 创建 HTML 报告
    html_content = """
    <h1>Model Report</h1>
    <p>Accuracy: 95%</p>
    """
    with open(html_output.path, 'w') as f:
        f.write(html_content)
    
    # 创建 Markdown 报告
    markdown_content = """
    # Model Report
    
    ## Results
    - Accuracy: 95%
    - Precision: 94%
    """
    with open(markdown_output.path, 'w') as f:
        f.write(markdown_content)
    
    # 创建图表
    fig, ax = plt.subplots()
    ax.plot([1, 2, 3, 4], [1, 4, 2, 3])
    ax.set_title('Training Progress')
    plt.savefig(plot_output.path)
    plt.close()

最佳实践 #

Pipeline 设计原则 #

text
1. 模块化设计
   ├── 每个组件职责单一
   ├── 组件可独立测试
   └── 组件可复用

2. 数据管理
   ├── 使用工件传递数据
   ├── 避免重复计算
   └── 合理设置缓存

3. 资源配置
   ├── 合理设置资源请求和限制
   ├── 根据任务类型选择镜像
   └── 使用 GPU 时注意调度

4. 错误处理
   ├── 设置重试策略
   ├── 添加超时设置
   └── 记录详细日志

性能优化 #

python
@dsl.pipeline(name='Optimized Pipeline')
def optimized_pipeline():
    # 设置缓存
    task = train_model()
    task.execution_options.caching_strategy.max_cache_staleness = '1h'
    
    # 设置超时
    task.timeout_seconds = 3600
    
    # 设置重试
    task.set_retry(3)

安全最佳实践 #

python
from kfp import dsl

@dsl.component(base_image='python:3.9')
def secure_component():
    pass

@dsl.pipeline(name='Secure Pipeline')
def secure_pipeline():
    task = secure_component()
    
    # 使用 Secret
    task.set_env_variable('DB_PASSWORD', dsl.V1EnvVarSource(
        secret_key_ref=dsl.V1SecretKeySelector(
            name='db-secret',
            key='password'
        )
    ))

下一步 #

现在你已经掌握了 Pipelines 的核心功能,接下来学习 Notebooks 笔记本,了解如何使用交互式开发环境!

最后更新:2026-04-05