Pipeline 流水线 #
概述 #
Pipeline 将多个数据处理步骤和模型训练步骤串联起来,形成一个完整的工作流。
Pipeline 优势 #
| 优势 | 描述 |
|---|---|
| 代码简洁 | 封装多个步骤 |
| 避免泄露 | 自动处理训练/测试转换 |
| 方便调参 | 统一参数搜索 |
| 易于部署 | 单个对象保存 |
Pipeline 结构 #
text
数据输入
│
▼
┌─────────────┐
│ 步骤1: 预处理 │
└─────────────┘
│
▼
┌─────────────┐
│ 步骤2: 特征工程│
└─────────────┘
│
▼
┌─────────────┐
│ 步骤3: 模型 │
└─────────────┘
│
▼
预测输出
基本使用 #
创建 Pipeline #
python
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)
print(f"准确率: {pipe.score(X_test, y_test):.4f}")
make_pipeline 简化 #
python
from sklearn.pipeline import make_pipeline
pipe = make_pipeline(
StandardScaler(),
LogisticRegression()
)
print(f"步骤名: {pipe.named_steps.keys()}")
访问步骤 #
python
print(pipe.named_steps['scaler'])
print(pipe.named_steps['classifier'])
print(pipe['scaler'])
print(pipe[0])
获取步骤属性 #
python
scaler = pipe.named_steps['scaler']
print(f"均值: {scaler.mean_}")
print(f"标准差: {scaler.scale_}")
classifier = pipe.named_steps['classifier']
print(f"系数: {classifier.coef_}")
ColumnTransformer #
基本使用 #
python
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import pandas as pd
import numpy as np
df = pd.DataFrame({
'age': [25, 30, 35, 40],
'income': [50000, 60000, 70000, 80000],
'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou'],
'gender': ['M', 'F', 'M', 'F']
})
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), ['age', 'income']),
('cat', OneHotEncoder(), ['city', 'gender'])
]
)
X_transformed = preprocessor.fit_transform(df)
print(f"转换后形状: {X_transformed.shape}")
make_column_transformer #
python
from sklearn.compose import make_column_transformer
preprocessor = make_column_transformer(
(StandardScaler(), ['age', 'income']),
(OneHotEncoder(), ['city', 'gender'])
)
make_column_selector #
python
from sklearn.compose import make_column_selector
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), make_column_selector(dtype_include=np.number)),
('cat', OneHotEncoder(), make_column_selector(dtype_include=object))
]
)
保留未转换列 #
python
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), ['age', 'income'])
],
remainder='passthrough'
)
FeatureUnion #
特征合并 #
python
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
union = FeatureUnion([
('pca', PCA(n_components=2)),
('kbest', SelectKBest(k=2))
])
X_union = union.fit_transform(X_train, y_train)
print(f"合并后特征数: {X_union.shape[1]}")
make_union #
python
from sklearn.pipeline import make_union
union = make_union(
PCA(n_components=2),
SelectKBest(k=2)
)
复杂 Pipeline #
完整示例 #
python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
numeric_features = ['age', 'income']
categorical_features = ['city', 'gender']
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier())
])
pipe.fit(X_train, y_train)
嵌套 Pipeline #
python
from sklearn.feature_selection import SelectKBest, f_classif
pipe = Pipeline([
('preprocessor', ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler())
]), numeric_features),
('cat', OneHotEncoder(), categorical_features)
])),
('selector', SelectKBest(f_classif, k=10)),
('classifier', LogisticRegression())
])
Pipeline 调参 #
参数命名规则 #
python
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'preprocessor__num__scaler__with_mean': [True, False],
'classifier__C': [0.1, 1, 10],
'classifier__penalty': ['l1', 'l2']
}
GridSearchCV #
python
from sklearn.model_selection import GridSearchCV
grid_search = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳分数: {grid_search.best_score_:.4f}")
缓存转换 #
python
from tempfile import mkdtemp
from shutil import rmtree
cachedir = mkdtemp()
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
], memory=cachedir)
pipe.fit(X_train, y_train)
rmtree(cachedir)
Pipeline 可视化 #
显示 Pipeline #
python
from sklearn import set_config
set_config(display='diagram')
pipe
获取参数 #
python
params = pipe.get_params()
for key in sorted(params.keys()):
print(key)
自定义转换器 #
在 Pipeline 中使用 #
python
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class CustomTransformer(BaseEstimator, TransformerMixin):
def __init__(self, param=1):
self.param = param
def fit(self, X, y=None):
return self
def transform(self, X):
return X * self.param
pipe = Pipeline([
('custom', CustomTransformer(param=2)),
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
Pipeline 持久化 #
保存 Pipeline #
python
from joblib import dump, load
dump(pipe, 'pipeline.joblib')
loaded_pipe = load('pipeline.joblib')
y_pred = loaded_pipe.predict(X_test)
最佳实践 #
1. 命名规范 #
python
pipe = Pipeline([
('step_1_preprocessing', StandardScaler()),
('step_2_feature_selection', SelectKBest(k=10)),
('step_3_model', LogisticRegression())
])
2. 模块化设计 #
python
def create_preprocessor(numeric_features, categorical_features):
return ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore'))
]), categorical_features)
])
3. 验证 Pipeline #
python
from sklearn.utils import estimator_checks
estimator_checks.check_estimator(pipe)
4. 并行处理 #
python
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_jobs=-1))
])
下一步 #
掌握 Pipeline 后,继续学习 模型持久化 了解如何保存和部署模型!
最后更新:2026-04-04