Scikit-learn 集成 #
概述 #
Scikit-learn 是 Python 中最流行的传统机器学习库,MLflow 提供了与 Scikit-learn 的深度集成,帮助管理机器学习模型的完整生命周期。
text
┌─────────────────────────────────────────────────────────────┐
│ MLflow + Scikit-learn 工作流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 数据准备 → 特征工程 → 模型训练 → 模型评估 → 模型部署 │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ MLflow MLflow MLflow MLflow MLflow │
│ 数据版本 特征记录 自动记录 指标追踪 模型服务 │
│ │
└─────────────────────────────────────────────────────────────┘
基本集成 #
安装依赖 #
bash
pip install mlflow scikit-learn pandas numpy
简单示例 #
python
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
mlflow.set_experiment("iris-classification")
with mlflow.start_run():
n_estimators = 100
max_depth = 5
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1)
mlflow.sklearn.log_model(model, "model")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
自动记录 #
启用自动记录 #
python
import mlflow
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
mlflow.sklearn.autolog(
log_models=True,
log_datasets=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None,
extra_tags=None
)
data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100, max_depth=5)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
print(f"Test Accuracy: {score:.4f}")
自动记录内容 #
text
┌─────────────────────────────────────────────────────────────┐
│ Scikit-learn 自动记录内容 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 参数: │
│ ├── 模型所有超参数 │
│ ├── fit 参数 │
│ └── 数据集信息 │
│ │
│ 指标: │
│ ├── training_score │
│ ├── testing_score (如果提供测试数据) │
│ └── 训练时间 │
│ │
│ 模型: │
│ ├── 序列化模型文件 │
│ ├── conda.yaml │
│ └── requirements.txt │
│ │
│ 工件: │
│ ├── 模型文件 │
│ └── 环境文件 │
│ │
└─────────────────────────────────────────────────────────────┘
完整机器学习流程 #
数据准备 #
python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import mlflow
def load_and_prepare_data(filepath):
df = pd.read_csv(filepath)
mlflow.log_param("dataset_size", len(df))
mlflow.log_param("num_features", len(df.columns) - 1)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.25, random_state=42, stratify=y_train
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
mlflow.log_param("test_size", len(X_test))
return X_train, X_val, X_test, y_train, y_val, y_test
特征工程 #
python
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import mlflow
def create_feature_pipeline(numerical_features, categorical_features):
numerical_transformer = StandardScaler()
categorical_transformer = OneHotEncoder(handle_unknown='ignore')
preprocessor = ColumnTransformer(
transformers=[
('num', numerical_transformer, numerical_features),
('cat', categorical_transformer, categorical_features)
]
)
mlflow.log_param("numerical_features", numerical_features)
mlflow.log_param("categorical_features", categorical_features)
mlflow.log_param("preprocessing", "StandardScaler + OneHotEncoder")
return preprocessor
def create_full_pipeline(preprocessor, model):
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
return pipeline
模型训练 #
python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns
def train_and_evaluate_model(model_name, model, X_train, y_train, X_val, y_val):
mlflow.set_experiment("customer-churn")
with mlflow.start_run(run_name=model_name):
mlflow.log_param("model_type", model_name)
model.fit(X_train, y_train)
y_train_pred = model.predict(X_train)
y_val_pred = model.predict(X_val)
if hasattr(model, 'predict_proba'):
y_val_proba = model.predict_proba(X_val)
train_metrics = {
"train_accuracy": accuracy_score(y_train, y_train_pred),
"train_precision": precision_score(y_train, y_train_pred, average='weighted'),
"train_recall": recall_score(y_train, y_train_pred, average='weighted'),
"train_f1": f1_score(y_train, y_train_pred, average='weighted')
}
val_metrics = {
"val_accuracy": accuracy_score(y_val, y_val_pred),
"val_precision": precision_score(y_val, y_val_pred, average='weighted'),
"val_recall": recall_score(y_val, y_val_pred, average='weighted'),
"val_f1": f1_score(y_val, y_val_pred, average='weighted')
}
if hasattr(model, 'predict_proba'):
val_metrics["val_roc_auc"] = roc_auc_score(
y_val, y_val_proba[:, 1], multi_class='ovr'
)
mlflow.log_metrics(train_metrics)
mlflow.log_metrics(val_metrics)
cm = confusion_matrix(y_val, y_val_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'Confusion Matrix - {model_name}')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig('confusion_matrix.png')
mlflow.log_artifact('confusion_matrix.png')
plt.close()
report = classification_report(y_val, y_val_pred)
mlflow.log_text(report, "classification_report.txt")
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=f"churn-{model_name}"
)
return model, val_metrics
模型比较 #
python
import mlflow
import pandas as pd
def compare_models(experiment_name):
runs = mlflow.search_runs(
experiment_names=[experiment_name],
order_by=["metrics.val_f1 DESC"]
)
comparison = runs[[
"run_id",
"params.model_type",
"metrics.val_accuracy",
"metrics.val_precision",
"metrics.val_recall",
"metrics.val_f1"
]]
return comparison
models_config = {
"random_forest": RandomForestClassifier(n_estimators=100, max_depth=10),
"gradient_boosting": GradientBoostingClassifier(n_estimators=100, max_depth=5),
"logistic_regression": LogisticRegression(max_iter=1000)
}
results = []
for name, model in models_config.items():
trained_model, metrics = train_and_evaluate_model(
name, model, X_train, y_train, X_val, y_val
)
results.append({"model": name, **metrics})
comparison_df = pd.DataFrame(results)
print(comparison_df)
超参数调优 #
GridSearchCV #
python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
def grid_search_optimization(X_train, y_train, X_val, y_val):
mlflow.set_experiment("hyperparameter-tuning")
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
with mlflow.start_run(run_name="grid-search"):
mlflow.log_param("search_type", "grid")
mlflow.log_param("param_grid", str(param_grid))
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
mlflow.log_params(grid_search.best_params_)
mlflow.log_metric("best_cv_score", grid_search.best_score_)
best_model = grid_search.best_estimator_
y_val_pred = best_model.predict(X_val)
val_f1 = f1_score(y_val, y_val_pred, average='weighted')
mlflow.log_metric("val_f1", val_f1)
mlflow.sklearn.log_model(best_model, "best_model")
cv_results = pd.DataFrame(grid_search.cv_results_)
cv_results.to_csv("cv_results.csv", index=False)
mlflow.log_artifact("cv_results.csv")
return best_model, grid_search.best_params_
Optuna 集成 #
python
import optuna
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
def objective(trial):
n_estimators = trial.suggest_int('n_estimators', 50, 300)
max_depth = trial.suggest_int('max_depth', 3, 20)
min_samples_split = trial.suggest_int('min_samples_split', 2, 20)
min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 10)
with mlflow.start_run(nested=True):
mlflow.log_params({
'n_estimators': n_estimators,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf
})
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
random_state=42
)
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1_weighted')
mean_score = scores.mean()
mlflow.log_metric('cv_f1_mean', mean_score)
mlflow.log_metric('cv_f1_std', scores.std())
return mean_score
mlflow.set_experiment("optuna-optimization")
with mlflow.start_run(run_name="optuna-study"):
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)
mlflow.log_params(study.best_params)
mlflow.log_metric('best_cv_score', study.best_value)
best_model = RandomForestClassifier(**study.best_params, random_state=42)
best_model.fit(X_train, y_train)
mlflow.sklearn.log_model(best_model, "best_model")
特征重要性分析 #
python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
def log_feature_importance(model, feature_names):
if hasattr(model, 'feature_importances_'):
importance = model.feature_importances_
elif hasattr(model, 'coef_'):
importance = np.abs(model.coef_[0])
else:
return
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=True)
plt.figure(figsize=(10, 8))
plt.barh(importance_df['feature'], importance_df['importance'])
plt.xlabel('Importance')
plt.title('Feature Importance')
plt.tight_layout()
plt.savefig('feature_importance.png')
mlflow.log_artifact('feature_importance.png')
plt.close()
importance_df.to_csv('feature_importance.csv', index=False)
mlflow.log_artifact('feature_importance.csv')
for _, row in importance_df.iterrows():
mlflow.log_metric(f"importance_{row['feature']}", row['importance'])
Pipeline 持久化 #
python
import mlflow.sklearn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from mlflow.models.signature import infer_signature
def train_and_save_pipeline(X_train, y_train, X_test):
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipeline.fit(X_train, y_train)
signature = infer_signature(X_train, pipeline.predict(X_train))
with mlflow.start_run():
mlflow.log_param("pipeline_steps", ["StandardScaler", "RandomForestClassifier"])
mlflow.sklearn.log_model(
pipeline,
"pipeline",
signature=signature,
input_example=X_train.head()
)
return pipeline
pipeline = mlflow.sklearn.load_model("models:/my-pipeline/Production")
predictions = pipeline.predict(new_data)
模型解释 #
SHAP 集成 #
python
import mlflow
import shap
import matplotlib.pyplot as plt
def log_shap_analysis(model, X_train, X_sample):
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_sample)
plt.figure(figsize=(12, 8))
shap.summary_plot(shap_values, X_sample, show=False)
plt.savefig('shap_summary.png')
mlflow.log_artifact('shap_summary.png')
plt.close()
for i in range(min(5, len(X_sample))):
plt.figure()
shap.force_plot(
explainer.expected_value[0],
shap_values[0][i],
X_sample.iloc[i],
matplotlib=True,
show=False
)
plt.savefig(f'shap_force_{i}.png')
mlflow.log_artifact(f'shap_force_{i}.png')
plt.close()
最佳实践 #
1. 使用 Pipeline #
python
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
mlflow.sklearn.log_model(pipeline, "pipeline")
2. 记录数据版本 #
python
import hashlib
def get_data_hash(df):
return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()
mlflow.log_param("train_data_hash", get_data_hash(X_train))
mlflow.log_param("test_data_hash", get_data_hash(X_test))
3. 记录模型配置 #
python
config = {
"model_type": "RandomForest",
"preprocessing": "StandardScaler",
"feature_selection": "None",
"cross_validation": "5-fold"
}
mlflow.log_params(config)
4. 完整评估报告 #
python
def log_evaluation_report(model, X_test, y_test):
from sklearn.metrics import classification_report, confusion_matrix
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred)
mlflow.log_text(report, "classification_report.txt")
cm = confusion_matrix(y_test, y_pred)
mlflow.log_dict(cm.tolist(), "confusion_matrix.json")
下一步 #
现在你已经掌握了 MLflow 与 Scikit-learn 的集成,接下来学习 生产环境部署,了解企业级 MLOps 实践!
最后更新:2026-04-04