机器学习流水线实战 #
概述 #
本案例将构建一个完整的机器学习流水线,涵盖数据处理、模型训练、超参数调优和模型服务部署。
text
┌─────────────────────────────────────────────────────────────┐
│ ML 流水线架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 数据加载 │ ──►│ 数据处理 │ ──►│ 模型训练 │ ──►│ 模型服务 │ │
│ │Ray Data │ │Ray Data │ │Ray Train │ │Ray Serve│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Ray Tune │ │
│ │ 超参数调优 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
1. 数据处理 #
数据加载 #
python
import ray
from ray import data
ray.init()
train_ds = ray.data.read_csv("s3://bucket/data/train.csv")
val_ds = ray.data.read_csv("s3://bucket/data/val.csv")
test_ds = ray.data.read_csv("s3://bucket/data/test.csv")
print(f"Train: {train_ds.count()}, Val: {val_ds.count()}, Test: {test_ds.count()}")
数据预处理 #
python
def preprocess_batch(batch):
import numpy as np
features = np.array(batch["features"])
labels = np.array(batch["label"])
features = (features - features.mean()) / (features.std() + 1e-8)
return {
"features": features,
"label": labels
}
train_ds = train_ds.map_batches(preprocess_batch)
val_ds = val_ds.map_batches(preprocess_batch)
train_ds = train_ds.random_shuffle()
特征工程 #
python
def feature_engineering(row):
features = row["features"]
return {
"features": features,
"feature_mean": sum(features) / len(features),
"feature_std": (sum((x - sum(features)/len(features))**2 for x in features) / len(features)) ** 0.5,
"label": row["label"]
}
train_ds = train_ds.map(feature_engineering)
2. 模型训练 #
定义训练函数 #
python
import torch
import torch.nn as nn
import torch.optim as optim
from ray import train
def train_fn(config):
model = nn.Sequential(
nn.Linear(config["input_size"], config["hidden_size"]),
nn.ReLU(),
nn.Dropout(config["dropout"]),
nn.Linear(config["hidden_size"], config["num_classes"])
)
model = train.torch.prepare_model(model)
optimizer = optim.Adam(model.parameters(), lr=config["lr"])
criterion = nn.CrossEntropyLoss()
train_ds = train.get_dataset_shard("train")
val_ds = train.get_dataset_shard("val")
for epoch in range(config["epochs"]):
model.train()
for batch in train_ds.iter_batches(batch_size=config["batch_size"]):
features = torch.tensor(batch["features"], dtype=torch.float32)
labels = torch.tensor(batch["label"], dtype=torch.long)
optimizer.zero_grad()
outputs = model(features)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch in val_ds.iter_batches(batch_size=config["batch_size"]):
features = torch.tensor(batch["features"], dtype=torch.float32)
labels = torch.tensor(batch["label"], dtype=torch.long)
outputs = model(features)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
train.report({
"epoch": epoch,
"val_loss": val_loss,
"val_accuracy": correct / total
})
运行训练 #
python
from ray.train import ScalingConfig
trainer = train.torch.TorchTrainer(
train_loop_per_worker=train_fn,
train_loop_config={
"input_size": 100,
"hidden_size": 256,
"num_classes": 10,
"lr": 0.001,
"batch_size": 32,
"epochs": 20,
"dropout": 0.2
},
datasets={"train": train_ds, "val": val_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()
print(f"Best validation accuracy: {result.metrics['val_accuracy']}")
3. 超参数调优 #
定义搜索空间 #
python
from ray import tune
search_space = {
"input_size": 100,
"hidden_size": tune.choice([128, 256, 512]),
"num_classes": 10,
"lr": tune.loguniform(1e-4, 1e-2),
"batch_size": tune.choice([16, 32, 64]),
"epochs": 20,
"dropout": tune.uniform(0.1, 0.5)
}
配置调优器 #
python
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.schedulers import AsyncHyperBandScheduler
scheduler = AsyncHyperBandScheduler(
metric="val_accuracy",
mode="max",
max_t=20,
grace_period=5
)
tuner = tune.Tuner(
train_fn,
param_space=search_space,
tune_config=tune.TuneConfig(
num_samples=50,
scheduler=scheduler,
search_alg=BayesOptSearch()
),
run_config=tune.RunConfig(
name="ml_pipeline_tuning"
)
)
执行调优 #
python
results = tuner.fit()
best_result = results.get_best_result(metric="val_accuracy", mode="max")
print(f"Best config: {best_result.config}")
print(f"Best accuracy: {best_result.metrics['val_accuracy']}")
4. 模型服务 #
定义服务 #
python
from ray import serve
import torch
@serve.deployment(
num_replicas=2,
ray_actor_options={"num_gpus": 0.5}
)
class MLPredictionService:
def __init__(self, model_path):
self.model = self._load_model(model_path)
self.model.eval()
def _load_model(self, path):
model = torch.load(path)
return model
async def __call__(self, request):
data = await request.json()
features = torch.tensor(data["features"], dtype=torch.float32)
with torch.no_grad():
outputs = self.model(features)
probabilities = torch.softmax(outputs, dim=1)
predictions = outputs.argmax(dim=1)
return {
"predictions": predictions.tolist(),
"probabilities": probabilities.tolist()
}
部署服务 #
python
best_checkpoint = best_result.checkpoint
app = MLPredictionService.bind(best_checkpoint.path)
serve.run(app, host="0.0.0.0", port=8000)
测试服务 #
python
import requests
import numpy as np
test_features = np.random.randn(1, 100).tolist()
response = requests.post(
"http://localhost:8000",
json={"features": test_features}
)
print(response.json())
5. 完整流水线 #
流水线编排 #
python
import ray
from ray import data, train, tune, serve
def run_ml_pipeline(config):
ray.init()
print("Step 1: Loading data...")
train_ds = data.read_csv(config["train_path"])
val_ds = data.read_csv(config["val_path"])
print("Step 2: Preprocessing...")
train_ds = preprocess_dataset(train_ds)
val_ds = preprocess_dataset(val_ds)
print("Step 3: Hyperparameter tuning...")
best_config = tune_hyperparameters(train_ds, val_ds, config)
print("Step 4: Training with best config...")
model = train_model(train_ds, val_ds, best_config)
print("Step 5: Deploying model...")
deploy_model(model, config)
print("Pipeline completed!")
ray.shutdown()
def preprocess_dataset(ds):
return ds.map_batches(preprocess_batch)
def tune_hyperparameters(train_ds, val_ds, config):
tuner = tune.Tuner(
train_fn,
param_space=config["search_space"],
tune_config=tune.TuneConfig(num_samples=config["num_samples"])
)
results = tuner.fit()
return results.get_best_result().config
def train_model(train_ds, val_ds, config):
trainer = train.torch.TorchTrainer(
train_loop_per_worker=train_fn,
train_loop_config=config,
datasets={"train": train_ds, "val": val_ds}
)
return trainer.fit()
def deploy_model(model, config):
app = MLPredictionService.bind(model.checkpoint.path)
serve.run(app)
执行流水线 #
python
config = {
"train_path": "s3://bucket/data/train.csv",
"val_path": "s3://bucket/data/val.csv",
"search_space": {
"lr": tune.loguniform(1e-4, 1e-2),
"hidden_size": tune.choice([128, 256, 512]),
"dropout": tune.uniform(0.1, 0.5)
},
"num_samples": 20
}
run_ml_pipeline(config)
6. 监控与运维 #
性能监控 #
python
import ray
ray.init()
print(ray.available_resources())
print(ray._private.internal_api.memory_summary())
ray.shutdown()
日志管理 #
python
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def train_fn(config):
logger.info(f"Starting training with config: {config}")
pass
告警配置 #
python
def check_model_health():
response = requests.get("http://localhost:8000/health")
if response.status_code != 200:
send_alert("Model service unhealthy!")
总结 #
本案例展示了如何使用 Ray 构建完整的机器学习流水线:
text
┌─────────────────────────────────────────────────────────────┐
│ 流水线总结 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 数据处理 (Ray Data) │
│ ├── 大规模数据加载 │
│ ├── 并行预处理 │
│ └── 特征工程 │
│ │
│ 2. 模型训练 (Ray Train) │
│ ├── 分布式训练 │
│ ├── 检查点管理 │
│ └── 多框架支持 │
│ │
│ 3. 超参数调优 (Ray Tune) │
│ ├── 搜索算法 │
│ ├── 早停策略 │
│ └── 分布式执行 │
│ │
│ 4. 模型服务 (Ray Serve) │
│ ├── 在线推理 │
│ ├── 自动扩缩容 │
│ └── 版本管理 │
│ │
└─────────────────────────────────────────────────────────────┘
恭喜你完成了 Ray 文档的学习!现在你已经掌握了构建分布式机器学习系统的核心技能。
最后更新:2026-04-05