PyTorch 集成 #

概述 #

PyTorch 是目前最流行的深度学习框架之一,MLflow 提供了与 PyTorch 的无缝集成,帮助管理深度学习模型的完整生命周期。

text
┌─────────────────────────────────────────────────────────────┐
│                  MLflow + PyTorch 工作流                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  数据准备 → 模型定义 → 训练跟踪 → 模型保存 → 部署推理       │
│      │          │          │          │          │          │
│      ▼          ▼          ▼          ▼          ▼          │
│  MLflow      MLflow     MLflow     MLflow     MLflow        │
│  数据版本    参数记录    指标追踪    模型注册    模型服务     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本集成 #

安装依赖 #

bash
pip install mlflow torch torchvision pytorch-lightning

简单示例 #

python
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class SimpleModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

X_train = torch.randn(1000, 10)
y_train = torch.randint(0, 2, (1000,))
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

model = SimpleModel(input_size=10, hidden_size=64, output_size=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

mlflow.set_experiment("pytorch-simple")

with mlflow.start_run():
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 32)
    mlflow.log_param("hidden_size", 64)
    mlflow.log_param("epochs", 10)
    
    for epoch in range(10):
        total_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        mlflow.log_metric("train_loss", avg_loss, step=epoch)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
    
    mlflow.pytorch.log_model(model, "model")

自动记录 #

启用自动记录 #

python
import mlflow
import torch
import torch.nn as nn

mlflow.pytorch.autolog(
    log_models=True,
    log_datasets=True,
    disable=False,
    exclusive=False,
    disable_for_unsupported_versions=False,
    silent=False,
    registered_model_name=None
)

with mlflow.start_run():
    model = SimpleModel(10, 64, 2)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(10):
        train_one_epoch(model, train_loader, optimizer, criterion)

自动记录内容 #

text
┌─────────────────────────────────────────────────────────────┐
│                   PyTorch 自动记录内容                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  参数:                                                     │
│  ├── optimizer_name                                         │
│  ├── optimizer_lr                                          │
│  ├── optimizer_momentum                                    │
│  ├── optimizer_weight_decay                                │
│  └── epochs                                                │
│                                                             │
│  指标:                                                     │
│  ├── train_loss (每个 epoch)                               │
│  ├── train_accuracy (如果计算)                             │
│  └── learning_rate (如果使用 scheduler)                    │
│                                                             │
│  模型:                                                     │
│  ├── 模型权重                                              │
│  ├── 模型架构                                              │
│  └── 优化器状态                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

完整训练流程 #

数据准备 #

python
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import mlflow

def prepare_data(batch_size=64):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    
    full_dataset = datasets.MNIST(
        root='./data',
        train=True,
        download=True,
        transform=transform
    )
    
    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = random_split(
        full_dataset, [train_size, val_size]
    )
    
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False
    )
    
    return train_loader, val_loader

模型定义 #

python
import torch.nn as nn
import torch.nn.functional as F

class CNN(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, num_classes)
    
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

训练函数 #

python
import torch.optim as optim
from mlflow.models.signature import infer_signature

def train_model(config):
    mlflow.set_experiment("mnist-classification")
    
    with mlflow.start_run():
        mlflow.log_params(config)
        
        train_loader, val_loader = prepare_data(config["batch_size"])
        
        model = CNN(num_classes=10)
        optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
        criterion = nn.NLLLoss()
        
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = model.to(device)
        
        best_val_acc = 0
        
        for epoch in range(config["epochs"]):
            model.train()
            train_loss = 0
            correct = 0
            total = 0
            
            for batch_idx, (data, target) in enumerate(train_loader):
                data, target = data.to(device), target.to(device)
                
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()
            
            train_loss /= len(train_loader)
            train_acc = correct / total
            
            model.eval()
            val_loss = 0
            correct = 0
            total = 0
            
            with torch.no_grad():
                for data, target in val_loader:
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    val_loss += criterion(output, target).item()
                    _, predicted = output.max(1)
                    total += target.size(0)
                    correct += predicted.eq(target).sum().item()
            
            val_loss /= len(val_loader)
            val_acc = correct / total
            
            mlflow.log_metrics({
                "train_loss": train_loss,
                "train_accuracy": train_acc,
                "val_loss": val_loss,
                "val_accuracy": val_acc
            }, step=epoch)
            
            print(f"Epoch {epoch+1}/{config['epochs']}")
            print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
            print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
            
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                example_input = torch.randn(1, 1, 28, 28).to(device)
                signature = infer_signature(
                    example_input.cpu().numpy(),
                    model(example_input).cpu().detach().numpy()
                )
                mlflow.pytorch.log_model(
                    model,
                    "model",
                    signature=signature,
                    input_example=example_input.cpu().numpy()
                )
        
        mlflow.log_metric("best_val_accuracy", best_val_acc)
        
        return model

config = {
    "learning_rate": 0.001,
    "batch_size": 64,
    "epochs": 10
}

model = train_model(config)

PyTorch Lightning 集成 #

Lightning 模块 #

python
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
import mlflow

class LitModel(pl.LightningModule):
    def __init__(self, learning_rate=0.001):
        super().__init__()
        self.save_hyperparameters()
        
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.nll_loss(y_hat, y)
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.nll_loss(y_hat, y)
        acc = (y_hat.argmax(dim=1) == y).float().mean()
        self.log("val_loss", loss)
        self.log("val_acc", acc)
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)

使用 MLflow 回调 #

python
from pytorch_lightning.callbacks import Callback
import mlflow

class MLflowCallback(Callback):
    def __init__(self, experiment_name="lightning-experiment"):
        self.experiment_name = experiment_name
    
    def on_train_start(self, trainer, pl_module):
        mlflow.set_experiment(self.experiment_name)
        self.run = mlflow.start_run()
        mlflow.log_params(pl_module.hparams)
    
    def on_train_end(self, trainer, pl_module):
        mlflow.pytorch.log_model(pl_module, "model")
        mlflow.end_run()
    
    def on_validation_end(self, trainer, pl_module):
        metrics = trainer.callback_metrics
        for key, value in metrics.items():
            mlflow.log_metric(key, value.item(), step=trainer.current_epoch)

mlflow.pytorch.autolog()

model = LitModel(learning_rate=0.001)
trainer = pl.Trainer(max_epochs=10, callbacks=[MLflowCallback()])
trainer.fit(model, train_loader, val_loader)

超参数调优 #

网格搜索 #

python
import mlflow
import itertools

def hyperparameter_search():
    learning_rates = [0.001, 0.01, 0.1]
    batch_sizes = [32, 64, 128]
    hidden_sizes = [64, 128, 256]
    
    mlflow.set_experiment("hyperparameter-search")
    
    best_val_acc = 0
    best_config = None
    
    for lr, bs, hs in itertools.product(learning_rates, batch_sizes, hidden_sizes):
        config = {
            "learning_rate": lr,
            "batch_size": bs,
            "hidden_size": hs,
            "epochs": 5
        }
        
        with mlflow.start_run(nested=True):
            mlflow.log_params(config)
            
            model = train_one_config(config)
            val_acc = evaluate_model(model, val_loader)
            
            mlflow.log_metric("val_accuracy", val_acc)
            
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                best_config = config
    
    return best_config, best_val_acc

使用 Optuna #

python
import optuna
import mlflow

def objective(trial):
    lr = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
    hidden_size = trial.suggest_int("hidden_size", 32, 256)
    
    with mlflow.start_run(nested=True):
        mlflow.log_params({
            "learning_rate": lr,
            "batch_size": batch_size,
            "hidden_size": hidden_size
        })
        
        config = {
            "learning_rate": lr,
            "batch_size": batch_size,
            "hidden_size": hidden_size,
            "epochs": 10
        }
        
        model = train_one_config(config)
        val_acc = evaluate_model(model, val_loader)
        
        mlflow.log_metric("val_accuracy", val_acc)
        
        return val_acc

mlflow.set_experiment("optuna-optimization")

with mlflow.start_run(run_name="optuna-study"):
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    
    mlflow.log_params(study.best_params)
    mlflow.log_metric("best_val_accuracy", study.best_value)

模型加载与推理 #

加载模型 #

python
import mlflow.pytorch
import torch

model = mlflow.pytorch.load_model("models:/mnist-cnn/Production")

model = mlflow.pytorch.load_model("runs:/<run_id>/model")

批量推理 #

python
import torch
import numpy as np

def batch_predict(model_path, data_loader):
    model = mlflow.pytorch.load_model(model_path)
    model.eval()
    
    predictions = []
    
    with torch.no_grad():
        for batch in data_loader:
            if isinstance(batch, (list, tuple)):
                inputs = batch[0]
            else:
                inputs = batch
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.numpy())
    
    return np.array(predictions)

部署为服务 #

bash
mlflow models serve -m "models:/mnist-cnn/Production" -p 5001
python
import requests
import numpy as np

data = np.random.randn(1, 1, 28, 28).tolist()

response = requests.post(
    "http://localhost:5001/invocations",
    json={"instances": data}
)

predictions = response.json()
print(predictions)

最佳实践 #

1. 使用签名 #

python
from mlflow.models.signature import infer_signature

example_input = torch.randn(1, 1, 28, 28)
example_output = model(example_input)

signature = infer_signature(
    example_input.numpy(),
    example_output.detach().numpy()
)

mlflow.pytorch.log_model(model, "model", signature=signature)

2. 记录模型摘要 #

python
def log_model_summary(model, input_size):
    from torchsummary import summary
    
    summary_str = str(summary(model, input_size))
    mlflow.log_text(summary_str, "model_summary.txt")

3. 记录训练曲线 #

python
import matplotlib.pyplot as plt

def plot_training_curves(history):
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    axes[0].plot(history['train_loss'], label='Train')
    axes[0].plot(history['val_loss'], label='Val')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    
    axes[1].plot(history['train_acc'], label='Train')
    axes[1].plot(history['val_acc'], label='Val')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].legend()
    
    plt.tight_layout()
    plt.savefig('training_curves.png')
    mlflow.log_artifact('training_curves.png')
    plt.close()

4. GPU 支持 #

python
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mlflow.log_param("device", str(device))

model = model.to(device)

下一步 #

现在你已经掌握了 MLflow 与 PyTorch 的集成,接下来学习 Scikit-learn 集成,了解传统机器学习工作流!

最后更新:2026-04-04