Pipeline 流水线 #

概述 #

Pipeline 将多个数据处理步骤和模型训练步骤串联起来,形成一个完整的工作流。

Pipeline 优势 #

优势 描述
代码简洁 封装多个步骤
避免泄露 自动处理训练/测试转换
方便调参 统一参数搜索
易于部署 单个对象保存

Pipeline 结构 #

text
数据输入
    │
    ▼
┌─────────────┐
│ 步骤1: 预处理 │
└─────────────┘
    │
    ▼
┌─────────────┐
│ 步骤2: 特征工程│
└─────────────┘
    │
    ▼
┌─────────────┐
│ 步骤3: 模型   │
└─────────────┘
    │
    ▼
  预测输出

基本使用 #

创建 Pipeline #

python
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

pipe.fit(X_train, y_train)
print(f"准确率: {pipe.score(X_test, y_test):.4f}")

make_pipeline 简化 #

python
from sklearn.pipeline import make_pipeline

pipe = make_pipeline(
    StandardScaler(),
    LogisticRegression()
)

print(f"步骤名: {pipe.named_steps.keys()}")

访问步骤 #

python
print(pipe.named_steps['scaler'])
print(pipe.named_steps['classifier'])

print(pipe['scaler'])
print(pipe[0])

获取步骤属性 #

python
scaler = pipe.named_steps['scaler']
print(f"均值: {scaler.mean_}")
print(f"标准差: {scaler.scale_}")

classifier = pipe.named_steps['classifier']
print(f"系数: {classifier.coef_}")

ColumnTransformer #

基本使用 #

python
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import pandas as pd
import numpy as np

df = pd.DataFrame({
    'age': [25, 30, 35, 40],
    'income': [50000, 60000, 70000, 80000],
    'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou'],
    'gender': ['M', 'F', 'M', 'F']
})

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['age', 'income']),
        ('cat', OneHotEncoder(), ['city', 'gender'])
    ]
)

X_transformed = preprocessor.fit_transform(df)
print(f"转换后形状: {X_transformed.shape}")

make_column_transformer #

python
from sklearn.compose import make_column_transformer

preprocessor = make_column_transformer(
    (StandardScaler(), ['age', 'income']),
    (OneHotEncoder(), ['city', 'gender'])
)

make_column_selector #

python
from sklearn.compose import make_column_selector

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), make_column_selector(dtype_include=np.number)),
        ('cat', OneHotEncoder(), make_column_selector(dtype_include=object))
    ]
)

保留未转换列 #

python
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['age', 'income'])
    ],
    remainder='passthrough'
)

FeatureUnion #

特征合并 #

python
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest

union = FeatureUnion([
    ('pca', PCA(n_components=2)),
    ('kbest', SelectKBest(k=2))
])

X_union = union.fit_transform(X_train, y_train)
print(f"合并后特征数: {X_union.shape[1]}")

make_union #

python
from sklearn.pipeline import make_union

union = make_union(
    PCA(n_components=2),
    SelectKBest(k=2)
)

复杂 Pipeline #

完整示例 #

python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier

numeric_features = ['age', 'income']
categorical_features = ['city', 'gender']

numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier())
])

pipe.fit(X_train, y_train)

嵌套 Pipeline #

python
from sklearn.feature_selection import SelectKBest, f_classif

pipe = Pipeline([
    ('preprocessor', ColumnTransformer([
        ('num', Pipeline([
            ('imputer', SimpleImputer()),
            ('scaler', StandardScaler())
        ]), numeric_features),
        ('cat', OneHotEncoder(), categorical_features)
    ])),
    ('selector', SelectKBest(f_classif, k=10)),
    ('classifier', LogisticRegression())
])

Pipeline 调参 #

参数命名规则 #

python
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__num__scaler__with_mean': [True, False],
    'classifier__C': [0.1, 1, 10],
    'classifier__penalty': ['l1', 'l2']
}

GridSearchCV #

python
from sklearn.model_selection import GridSearchCV

grid_search = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)

grid_search.fit(X_train, y_train)

print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")

缓存转换 #

python
from tempfile import mkdtemp
from shutil import rmtree

cachedir = mkdtemp()
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
], memory=cachedir)

pipe.fit(X_train, y_train)
rmtree(cachedir)

Pipeline 可视化 #

显示 Pipeline #

python
from sklearn import set_config

set_config(display='diagram')

pipe

获取参数 #

python
params = pipe.get_params()
for key in sorted(params.keys()):
    print(key)

自定义转换器 #

在 Pipeline 中使用 #

python
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class CustomTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, param=1):
        self.param = param
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return X * self.param

pipe = Pipeline([
    ('custom', CustomTransformer(param=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

Pipeline 持久化 #

保存 Pipeline #

python
from joblib import dump, load

dump(pipe, 'pipeline.joblib')

loaded_pipe = load('pipeline.joblib')
y_pred = loaded_pipe.predict(X_test)

最佳实践 #

1. 命名规范 #

python
pipe = Pipeline([
    ('step_1_preprocessing', StandardScaler()),
    ('step_2_feature_selection', SelectKBest(k=10)),
    ('step_3_model', LogisticRegression())
])

2. 模块化设计 #

python
def create_preprocessor(numeric_features, categorical_features):
    return ColumnTransformer([
        ('num', Pipeline([
            ('imputer', SimpleImputer()),
            ('scaler', StandardScaler())
        ]), numeric_features),
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ])

3. 验证 Pipeline #

python
from sklearn.utils import estimator_checks

estimator_checks.check_estimator(pipe)

4. 并行处理 #

python
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_jobs=-1))
])

下一步 #

掌握 Pipeline 后,继续学习 模型持久化 了解如何保存和部署模型!

最后更新:2026-04-04