Scikit-learn 集成 #

概述 #

Scikit-learn 是 Python 中最流行的传统机器学习库,MLflow 提供了与 Scikit-learn 的深度集成,帮助管理机器学习模型的完整生命周期。

text
┌─────────────────────────────────────────────────────────────┐
│                MLflow + Scikit-learn 工作流                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  数据准备 → 特征工程 → 模型训练 → 模型评估 → 模型部署       │
│      │          │          │          │          │          │
│      ▼          ▼          ▼          ▼          ▼          │
│  MLflow      MLflow     MLflow     MLflow     MLflow        │
│  数据版本    特征记录    自动记录    指标追踪    模型服务     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本集成 #

安装依赖 #

bash
pip install mlflow scikit-learn pandas numpy

简单示例 #

python
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

data = load_iris()
X, y = data.data, data.target

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

mlflow.set_experiment("iris-classification")

with mlflow.start_run():
    n_estimators = 100
    max_depth = 5
    
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)
    
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1_score", f1)
    
    mlflow.sklearn.log_model(model, "model")
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")

自动记录 #

启用自动记录 #

python
import mlflow
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

mlflow.sklearn.autolog(
    log_models=True,
    log_datasets=True,
    disable=False,
    exclusive=False,
    disable_for_unsupported_versions=False,
    silent=False,
    registered_model_name=None,
    extra_tags=None
)

data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

with mlflow.start_run():
    model = RandomForestClassifier(n_estimators=100, max_depth=5)
    model.fit(X_train, y_train)
    
    score = model.score(X_test, y_test)
    print(f"Test Accuracy: {score:.4f}")

自动记录内容 #

text
┌─────────────────────────────────────────────────────────────┐
│                 Scikit-learn 自动记录内容                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  参数:                                                     │
│  ├── 模型所有超参数                                        │
│  ├── fit 参数                                              │
│  └── 数据集信息                                            │
│                                                             │
│  指标:                                                     │
│  ├── training_score                                        │
│  ├── testing_score (如果提供测试数据)                      │
│  └── 训练时间                                              │
│                                                             │
│  模型:                                                     │
│  ├── 序列化模型文件                                        │
│  ├── conda.yaml                                            │
│  └── requirements.txt                                      │
│                                                             │
│  工件:                                                     │
│  ├── 模型文件                                              │
│  └── 环境文件                                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

完整机器学习流程 #

数据准备 #

python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import mlflow

def load_and_prepare_data(filepath):
    df = pd.read_csv(filepath)
    
    mlflow.log_param("dataset_size", len(df))
    mlflow.log_param("num_features", len(df.columns) - 1)
    
    X = df.drop('target', axis=1)
    y = df['target']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.25, random_state=42, stratify=y_train
    )
    
    mlflow.log_param("train_size", len(X_train))
    mlflow.log_param("val_size", len(X_val))
    mlflow.log_param("test_size", len(X_test))
    
    return X_train, X_val, X_test, y_train, y_val, y_test

特征工程 #

python
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import mlflow

def create_feature_pipeline(numerical_features, categorical_features):
    numerical_transformer = StandardScaler()
    categorical_transformer = OneHotEncoder(handle_unknown='ignore')
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )
    
    mlflow.log_param("numerical_features", numerical_features)
    mlflow.log_param("categorical_features", categorical_features)
    mlflow.log_param("preprocessing", "StandardScaler + OneHotEncoder")
    
    return preprocessor

def create_full_pipeline(preprocessor, model):
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    return pipeline

模型训练 #

python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns

def train_and_evaluate_model(model_name, model, X_train, y_train, X_val, y_val):
    mlflow.set_experiment("customer-churn")
    
    with mlflow.start_run(run_name=model_name):
        mlflow.log_param("model_type", model_name)
        
        model.fit(X_train, y_train)
        
        y_train_pred = model.predict(X_train)
        y_val_pred = model.predict(X_val)
        
        if hasattr(model, 'predict_proba'):
            y_val_proba = model.predict_proba(X_val)
        
        train_metrics = {
            "train_accuracy": accuracy_score(y_train, y_train_pred),
            "train_precision": precision_score(y_train, y_train_pred, average='weighted'),
            "train_recall": recall_score(y_train, y_train_pred, average='weighted'),
            "train_f1": f1_score(y_train, y_train_pred, average='weighted')
        }
        
        val_metrics = {
            "val_accuracy": accuracy_score(y_val, y_val_pred),
            "val_precision": precision_score(y_val, y_val_pred, average='weighted'),
            "val_recall": recall_score(y_val, y_val_pred, average='weighted'),
            "val_f1": f1_score(y_val, y_val_pred, average='weighted')
        }
        
        if hasattr(model, 'predict_proba'):
            val_metrics["val_roc_auc"] = roc_auc_score(
                y_val, y_val_proba[:, 1], multi_class='ovr'
            )
        
        mlflow.log_metrics(train_metrics)
        mlflow.log_metrics(val_metrics)
        
        cm = confusion_matrix(y_val, y_val_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'Confusion Matrix - {model_name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.savefig('confusion_matrix.png')
        mlflow.log_artifact('confusion_matrix.png')
        plt.close()
        
        report = classification_report(y_val, y_val_pred)
        mlflow.log_text(report, "classification_report.txt")
        
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name=f"churn-{model_name}"
        )
        
        return model, val_metrics

模型比较 #

python
import mlflow
import pandas as pd

def compare_models(experiment_name):
    runs = mlflow.search_runs(
        experiment_names=[experiment_name],
        order_by=["metrics.val_f1 DESC"]
    )
    
    comparison = runs[[
        "run_id",
        "params.model_type",
        "metrics.val_accuracy",
        "metrics.val_precision",
        "metrics.val_recall",
        "metrics.val_f1"
    ]]
    
    return comparison

models_config = {
    "random_forest": RandomForestClassifier(n_estimators=100, max_depth=10),
    "gradient_boosting": GradientBoostingClassifier(n_estimators=100, max_depth=5),
    "logistic_regression": LogisticRegression(max_iter=1000)
}

results = []
for name, model in models_config.items():
    trained_model, metrics = train_and_evaluate_model(
        name, model, X_train, y_train, X_val, y_val
    )
    results.append({"model": name, **metrics})

comparison_df = pd.DataFrame(results)
print(comparison_df)

超参数调优 #

GridSearchCV #

python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

def grid_search_optimization(X_train, y_train, X_val, y_val):
    mlflow.set_experiment("hyperparameter-tuning")
    
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [5, 10, 15, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
    
    with mlflow.start_run(run_name="grid-search"):
        mlflow.log_param("search_type", "grid")
        mlflow.log_param("param_grid", str(param_grid))
        
        grid_search = GridSearchCV(
            RandomForestClassifier(random_state=42),
            param_grid,
            cv=5,
            scoring='f1_weighted',
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        
        mlflow.log_params(grid_search.best_params_)
        mlflow.log_metric("best_cv_score", grid_search.best_score_)
        
        best_model = grid_search.best_estimator_
        y_val_pred = best_model.predict(X_val)
        val_f1 = f1_score(y_val, y_val_pred, average='weighted')
        
        mlflow.log_metric("val_f1", val_f1)
        mlflow.sklearn.log_model(best_model, "best_model")
        
        cv_results = pd.DataFrame(grid_search.cv_results_)
        cv_results.to_csv("cv_results.csv", index=False)
        mlflow.log_artifact("cv_results.csv")
        
        return best_model, grid_search.best_params_

Optuna 集成 #

python
import optuna
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

def objective(trial):
    n_estimators = trial.suggest_int('n_estimators', 50, 300)
    max_depth = trial.suggest_int('max_depth', 3, 20)
    min_samples_split = trial.suggest_int('min_samples_split', 2, 20)
    min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 10)
    
    with mlflow.start_run(nested=True):
        mlflow.log_params({
            'n_estimators': n_estimators,
            'max_depth': max_depth,
            'min_samples_split': min_samples_split,
            'min_samples_leaf': min_samples_leaf
        })
        
        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            random_state=42
        )
        
        scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1_weighted')
        mean_score = scores.mean()
        
        mlflow.log_metric('cv_f1_mean', mean_score)
        mlflow.log_metric('cv_f1_std', scores.std())
        
        return mean_score

mlflow.set_experiment("optuna-optimization")

with mlflow.start_run(run_name="optuna-study"):
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=50)
    
    mlflow.log_params(study.best_params)
    mlflow.log_metric('best_cv_score', study.best_value)
    
    best_model = RandomForestClassifier(**study.best_params, random_state=42)
    best_model.fit(X_train, y_train)
    mlflow.sklearn.log_model(best_model, "best_model")

特征重要性分析 #

python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def log_feature_importance(model, feature_names):
    if hasattr(model, 'feature_importances_'):
        importance = model.feature_importances_
    elif hasattr(model, 'coef_'):
        importance = np.abs(model.coef_[0])
    else:
        return
    
    importance_df = pd.DataFrame({
        'feature': feature_names,
        'importance': importance
    }).sort_values('importance', ascending=True)
    
    plt.figure(figsize=(10, 8))
    plt.barh(importance_df['feature'], importance_df['importance'])
    plt.xlabel('Importance')
    plt.title('Feature Importance')
    plt.tight_layout()
    plt.savefig('feature_importance.png')
    mlflow.log_artifact('feature_importance.png')
    plt.close()
    
    importance_df.to_csv('feature_importance.csv', index=False)
    mlflow.log_artifact('feature_importance.csv')
    
    for _, row in importance_df.iterrows():
        mlflow.log_metric(f"importance_{row['feature']}", row['importance'])

Pipeline 持久化 #

python
import mlflow.sklearn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from mlflow.models.signature import infer_signature

def train_and_save_pipeline(X_train, y_train, X_test):
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])
    
    pipeline.fit(X_train, y_train)
    
    signature = infer_signature(X_train, pipeline.predict(X_train))
    
    with mlflow.start_run():
        mlflow.log_param("pipeline_steps", ["StandardScaler", "RandomForestClassifier"])
        
        mlflow.sklearn.log_model(
            pipeline,
            "pipeline",
            signature=signature,
            input_example=X_train.head()
        )
    
    return pipeline

pipeline = mlflow.sklearn.load_model("models:/my-pipeline/Production")
predictions = pipeline.predict(new_data)

模型解释 #

SHAP 集成 #

python
import mlflow
import shap
import matplotlib.pyplot as plt

def log_shap_analysis(model, X_train, X_sample):
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_sample)
    
    plt.figure(figsize=(12, 8))
    shap.summary_plot(shap_values, X_sample, show=False)
    plt.savefig('shap_summary.png')
    mlflow.log_artifact('shap_summary.png')
    plt.close()
    
    for i in range(min(5, len(X_sample))):
        plt.figure()
        shap.force_plot(
            explainer.expected_value[0],
            shap_values[0][i],
            X_sample.iloc[i],
            matplotlib=True,
            show=False
        )
        plt.savefig(f'shap_force_{i}.png')
        mlflow.log_artifact(f'shap_force_{i}.png')
        plt.close()

最佳实践 #

1. 使用 Pipeline #

python
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', model)
])

mlflow.sklearn.log_model(pipeline, "pipeline")

2. 记录数据版本 #

python
import hashlib

def get_data_hash(df):
    return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()

mlflow.log_param("train_data_hash", get_data_hash(X_train))
mlflow.log_param("test_data_hash", get_data_hash(X_test))

3. 记录模型配置 #

python
config = {
    "model_type": "RandomForest",
    "preprocessing": "StandardScaler",
    "feature_selection": "None",
    "cross_validation": "5-fold"
}

mlflow.log_params(config)

4. 完整评估报告 #

python
def log_evaluation_report(model, X_test, y_test):
    from sklearn.metrics import classification_report, confusion_matrix
    
    y_pred = model.predict(X_test)
    
    report = classification_report(y_test, y_pred)
    mlflow.log_text(report, "classification_report.txt")
    
    cm = confusion_matrix(y_test, y_pred)
    mlflow.log_dict(cm.tolist(), "confusion_matrix.json")

下一步 #

现在你已经掌握了 MLflow 与 Scikit-learn 的集成,接下来学习 生产环境部署,了解企业级 MLOps 实践!

最后更新:2026-04-04