PyTorch 集成 #
概述 #
PyTorch 是目前最流行的深度学习框架之一,MLflow 提供了与 PyTorch 的无缝集成,帮助管理深度学习模型的完整生命周期。
text
┌─────────────────────────────────────────────────────────────┐
│ MLflow + PyTorch 工作流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 数据准备 → 模型定义 → 训练跟踪 → 模型保存 → 部署推理 │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ MLflow MLflow MLflow MLflow MLflow │
│ 数据版本 参数记录 指标追踪 模型注册 模型服务 │
│ │
└─────────────────────────────────────────────────────────────┘
基本集成 #
安装依赖 #
bash
pip install mlflow torch torchvision pytorch-lightning
简单示例 #
python
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
class SimpleModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.fc2(x)
return x
X_train = torch.randn(1000, 10)
y_train = torch.randint(0, 2, (1000,))
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
model = SimpleModel(input_size=10, hidden_size=64, output_size=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
mlflow.set_experiment("pytorch-simple")
with mlflow.start_run():
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 32)
mlflow.log_param("hidden_size", 64)
mlflow.log_param("epochs", 10)
for epoch in range(10):
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")
mlflow.pytorch.log_model(model, "model")
自动记录 #
启用自动记录 #
python
import mlflow
import torch
import torch.nn as nn
mlflow.pytorch.autolog(
log_models=True,
log_datasets=True,
disable=False,
exclusive=False,
disable_for_unsupported_versions=False,
silent=False,
registered_model_name=None
)
with mlflow.start_run():
model = SimpleModel(10, 64, 2)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(10):
train_one_epoch(model, train_loader, optimizer, criterion)
自动记录内容 #
text
┌─────────────────────────────────────────────────────────────┐
│ PyTorch 自动记录内容 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 参数: │
│ ├── optimizer_name │
│ ├── optimizer_lr │
│ ├── optimizer_momentum │
│ ├── optimizer_weight_decay │
│ └── epochs │
│ │
│ 指标: │
│ ├── train_loss (每个 epoch) │
│ ├── train_accuracy (如果计算) │
│ └── learning_rate (如果使用 scheduler) │
│ │
│ 模型: │
│ ├── 模型权重 │
│ ├── 模型架构 │
│ └── 优化器状态 │
│ │
└─────────────────────────────────────────────────────────────┘
完整训练流程 #
数据准备 #
python
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import mlflow
def prepare_data(batch_size=64):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
full_dataset = datasets.MNIST(
root='./data',
train=True,
download=True,
transform=transform
)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(
full_dataset, [train_size, val_size]
)
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False
)
return train_loader, val_loader
模型定义 #
python
import torch.nn as nn
import torch.nn.functional as F
class CNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, num_classes)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
训练函数 #
python
import torch.optim as optim
from mlflow.models.signature import infer_signature
def train_model(config):
mlflow.set_experiment("mnist-classification")
with mlflow.start_run():
mlflow.log_params(config)
train_loader, val_loader = prepare_data(config["batch_size"])
model = CNN(num_classes=10)
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
criterion = nn.NLLLoss()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
best_val_acc = 0
for epoch in range(config["epochs"]):
model.train()
train_loss = 0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
train_loss /= len(train_loader)
train_acc = correct / total
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in val_loader:
data, target = data.to(device), target.to(device)
output = model(data)
val_loss += criterion(output, target).item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
val_loss /= len(val_loader)
val_acc = correct / total
mlflow.log_metrics({
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc
}, step=epoch)
print(f"Epoch {epoch+1}/{config['epochs']}")
print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
if val_acc > best_val_acc:
best_val_acc = val_acc
example_input = torch.randn(1, 1, 28, 28).to(device)
signature = infer_signature(
example_input.cpu().numpy(),
model(example_input).cpu().detach().numpy()
)
mlflow.pytorch.log_model(
model,
"model",
signature=signature,
input_example=example_input.cpu().numpy()
)
mlflow.log_metric("best_val_accuracy", best_val_acc)
return model
config = {
"learning_rate": 0.001,
"batch_size": 64,
"epochs": 10
}
model = train_model(config)
PyTorch Lightning 集成 #
Lightning 模块 #
python
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
import mlflow
class LitModel(pl.LightningModule):
def __init__(self, learning_rate=0.001):
super().__init__()
self.save_hyperparameters()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.nll_loss(y_hat, y)
self.log("train_loss", loss)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.nll_loss(y_hat, y)
acc = (y_hat.argmax(dim=1) == y).float().mean()
self.log("val_loss", loss)
self.log("val_acc", acc)
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
使用 MLflow 回调 #
python
from pytorch_lightning.callbacks import Callback
import mlflow
class MLflowCallback(Callback):
def __init__(self, experiment_name="lightning-experiment"):
self.experiment_name = experiment_name
def on_train_start(self, trainer, pl_module):
mlflow.set_experiment(self.experiment_name)
self.run = mlflow.start_run()
mlflow.log_params(pl_module.hparams)
def on_train_end(self, trainer, pl_module):
mlflow.pytorch.log_model(pl_module, "model")
mlflow.end_run()
def on_validation_end(self, trainer, pl_module):
metrics = trainer.callback_metrics
for key, value in metrics.items():
mlflow.log_metric(key, value.item(), step=trainer.current_epoch)
mlflow.pytorch.autolog()
model = LitModel(learning_rate=0.001)
trainer = pl.Trainer(max_epochs=10, callbacks=[MLflowCallback()])
trainer.fit(model, train_loader, val_loader)
超参数调优 #
网格搜索 #
python
import mlflow
import itertools
def hyperparameter_search():
learning_rates = [0.001, 0.01, 0.1]
batch_sizes = [32, 64, 128]
hidden_sizes = [64, 128, 256]
mlflow.set_experiment("hyperparameter-search")
best_val_acc = 0
best_config = None
for lr, bs, hs in itertools.product(learning_rates, batch_sizes, hidden_sizes):
config = {
"learning_rate": lr,
"batch_size": bs,
"hidden_size": hs,
"epochs": 5
}
with mlflow.start_run(nested=True):
mlflow.log_params(config)
model = train_one_config(config)
val_acc = evaluate_model(model, val_loader)
mlflow.log_metric("val_accuracy", val_acc)
if val_acc > best_val_acc:
best_val_acc = val_acc
best_config = config
return best_config, best_val_acc
使用 Optuna #
python
import optuna
import mlflow
def objective(trial):
lr = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
hidden_size = trial.suggest_int("hidden_size", 32, 256)
with mlflow.start_run(nested=True):
mlflow.log_params({
"learning_rate": lr,
"batch_size": batch_size,
"hidden_size": hidden_size
})
config = {
"learning_rate": lr,
"batch_size": batch_size,
"hidden_size": hidden_size,
"epochs": 10
}
model = train_one_config(config)
val_acc = evaluate_model(model, val_loader)
mlflow.log_metric("val_accuracy", val_acc)
return val_acc
mlflow.set_experiment("optuna-optimization")
with mlflow.start_run(run_name="optuna-study"):
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50)
mlflow.log_params(study.best_params)
mlflow.log_metric("best_val_accuracy", study.best_value)
模型加载与推理 #
加载模型 #
python
import mlflow.pytorch
import torch
model = mlflow.pytorch.load_model("models:/mnist-cnn/Production")
model = mlflow.pytorch.load_model("runs:/<run_id>/model")
批量推理 #
python
import torch
import numpy as np
def batch_predict(model_path, data_loader):
model = mlflow.pytorch.load_model(model_path)
model.eval()
predictions = []
with torch.no_grad():
for batch in data_loader:
if isinstance(batch, (list, tuple)):
inputs = batch[0]
else:
inputs = batch
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
predictions.extend(predicted.numpy())
return np.array(predictions)
部署为服务 #
bash
mlflow models serve -m "models:/mnist-cnn/Production" -p 5001
python
import requests
import numpy as np
data = np.random.randn(1, 1, 28, 28).tolist()
response = requests.post(
"http://localhost:5001/invocations",
json={"instances": data}
)
predictions = response.json()
print(predictions)
最佳实践 #
1. 使用签名 #
python
from mlflow.models.signature import infer_signature
example_input = torch.randn(1, 1, 28, 28)
example_output = model(example_input)
signature = infer_signature(
example_input.numpy(),
example_output.detach().numpy()
)
mlflow.pytorch.log_model(model, "model", signature=signature)
2. 记录模型摘要 #
python
def log_model_summary(model, input_size):
from torchsummary import summary
summary_str = str(summary(model, input_size))
mlflow.log_text(summary_str, "model_summary.txt")
3. 记录训练曲线 #
python
import matplotlib.pyplot as plt
def plot_training_curves(history):
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history['train_loss'], label='Train')
axes[0].plot(history['val_loss'], label='Val')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].legend()
axes[1].plot(history['train_acc'], label='Train')
axes[1].plot(history['val_acc'], label='Val')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].legend()
plt.tight_layout()
plt.savefig('training_curves.png')
mlflow.log_artifact('training_curves.png')
plt.close()
4. GPU 支持 #
python
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mlflow.log_param("device", str(device))
model = model.to(device)
下一步 #
现在你已经掌握了 MLflow 与 PyTorch 的集成,接下来学习 Scikit-learn 集成,了解传统机器学习工作流!
最后更新:2026-04-04