机器学习流水线实战 #

概述 #

本案例将构建一个完整的机器学习流水线,涵盖数据处理、模型训练、超参数调优和模型服务部署。

text
┌─────────────────────────────────────────────────────────────┐
│                    ML 流水线架构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐ │
│  │ 数据加载 │ ──►│ 数据处理 │ ──►│ 模型训练 │ ──►│ 模型服务 │ │
│  │Ray Data │    │Ray Data │    │Ray Train │    │Ray Serve│ │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘ │
│       │              │              │              │       │
│       ▼              ▼              ▼              ▼       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Ray Tune                           │   │
│  │              超参数调优                               │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1. 数据处理 #

数据加载 #

python
import ray
from ray import data

ray.init()

train_ds = ray.data.read_csv("s3://bucket/data/train.csv")
val_ds = ray.data.read_csv("s3://bucket/data/val.csv")
test_ds = ray.data.read_csv("s3://bucket/data/test.csv")

print(f"Train: {train_ds.count()}, Val: {val_ds.count()}, Test: {test_ds.count()}")

数据预处理 #

python
def preprocess_batch(batch):
    import numpy as np
    
    features = np.array(batch["features"])
    labels = np.array(batch["label"])
    
    features = (features - features.mean()) / (features.std() + 1e-8)
    
    return {
        "features": features,
        "label": labels
    }

train_ds = train_ds.map_batches(preprocess_batch)
val_ds = val_ds.map_batches(preprocess_batch)

train_ds = train_ds.random_shuffle()

特征工程 #

python
def feature_engineering(row):
    features = row["features"]
    
    return {
        "features": features,
        "feature_mean": sum(features) / len(features),
        "feature_std": (sum((x - sum(features)/len(features))**2 for x in features) / len(features)) ** 0.5,
        "label": row["label"]
    }

train_ds = train_ds.map(feature_engineering)

2. 模型训练 #

定义训练函数 #

python
import torch
import torch.nn as nn
import torch.optim as optim
from ray import train

def train_fn(config):
    model = nn.Sequential(
        nn.Linear(config["input_size"], config["hidden_size"]),
        nn.ReLU(),
        nn.Dropout(config["dropout"]),
        nn.Linear(config["hidden_size"], config["num_classes"])
    )
    
    model = train.torch.prepare_model(model)
    
    optimizer = optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()
    
    train_ds = train.get_dataset_shard("train")
    val_ds = train.get_dataset_shard("val")
    
    for epoch in range(config["epochs"]):
        model.train()
        for batch in train_ds.iter_batches(batch_size=config["batch_size"]):
            features = torch.tensor(batch["features"], dtype=torch.float32)
            labels = torch.tensor(batch["label"], dtype=torch.long)
            
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in val_ds.iter_batches(batch_size=config["batch_size"]):
                features = torch.tensor(batch["features"], dtype=torch.float32)
                labels = torch.tensor(batch["label"], dtype=torch.long)
                outputs = model(features)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        train.report({
            "epoch": epoch,
            "val_loss": val_loss,
            "val_accuracy": correct / total
        })

运行训练 #

python
from ray.train import ScalingConfig

trainer = train.torch.TorchTrainer(
    train_loop_per_worker=train_fn,
    train_loop_config={
        "input_size": 100,
        "hidden_size": 256,
        "num_classes": 10,
        "lr": 0.001,
        "batch_size": 32,
        "epochs": 20,
        "dropout": 0.2
    },
    datasets={"train": train_ds, "val": val_ds},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)

result = trainer.fit()
print(f"Best validation accuracy: {result.metrics['val_accuracy']}")

3. 超参数调优 #

定义搜索空间 #

python
from ray import tune

search_space = {
    "input_size": 100,
    "hidden_size": tune.choice([128, 256, 512]),
    "num_classes": 10,
    "lr": tune.loguniform(1e-4, 1e-2),
    "batch_size": tune.choice([16, 32, 64]),
    "epochs": 20,
    "dropout": tune.uniform(0.1, 0.5)
}

配置调优器 #

python
from ray.tune.search.bayesopt import BayesOptSearch
from ray.tune.schedulers import AsyncHyperBandScheduler

scheduler = AsyncHyperBandScheduler(
    metric="val_accuracy",
    mode="max",
    max_t=20,
    grace_period=5
)

tuner = tune.Tuner(
    train_fn,
    param_space=search_space,
    tune_config=tune.TuneConfig(
        num_samples=50,
        scheduler=scheduler,
        search_alg=BayesOptSearch()
    ),
    run_config=tune.RunConfig(
        name="ml_pipeline_tuning"
    )
)

执行调优 #

python
results = tuner.fit()

best_result = results.get_best_result(metric="val_accuracy", mode="max")
print(f"Best config: {best_result.config}")
print(f"Best accuracy: {best_result.metrics['val_accuracy']}")

4. 模型服务 #

定义服务 #

python
from ray import serve
import torch

@serve.deployment(
    num_replicas=2,
    ray_actor_options={"num_gpus": 0.5}
)
class MLPredictionService:
    def __init__(self, model_path):
        self.model = self._load_model(model_path)
        self.model.eval()
    
    def _load_model(self, path):
        model = torch.load(path)
        return model
    
    async def __call__(self, request):
        data = await request.json()
        features = torch.tensor(data["features"], dtype=torch.float32)
        
        with torch.no_grad():
            outputs = self.model(features)
            probabilities = torch.softmax(outputs, dim=1)
            predictions = outputs.argmax(dim=1)
        
        return {
            "predictions": predictions.tolist(),
            "probabilities": probabilities.tolist()
        }

部署服务 #

python
best_checkpoint = best_result.checkpoint

app = MLPredictionService.bind(best_checkpoint.path)
serve.run(app, host="0.0.0.0", port=8000)

测试服务 #

python
import requests
import numpy as np

test_features = np.random.randn(1, 100).tolist()

response = requests.post(
    "http://localhost:8000",
    json={"features": test_features}
)

print(response.json())

5. 完整流水线 #

流水线编排 #

python
import ray
from ray import data, train, tune, serve

def run_ml_pipeline(config):
    ray.init()
    
    print("Step 1: Loading data...")
    train_ds = data.read_csv(config["train_path"])
    val_ds = data.read_csv(config["val_path"])
    
    print("Step 2: Preprocessing...")
    train_ds = preprocess_dataset(train_ds)
    val_ds = preprocess_dataset(val_ds)
    
    print("Step 3: Hyperparameter tuning...")
    best_config = tune_hyperparameters(train_ds, val_ds, config)
    
    print("Step 4: Training with best config...")
    model = train_model(train_ds, val_ds, best_config)
    
    print("Step 5: Deploying model...")
    deploy_model(model, config)
    
    print("Pipeline completed!")
    ray.shutdown()

def preprocess_dataset(ds):
    return ds.map_batches(preprocess_batch)

def tune_hyperparameters(train_ds, val_ds, config):
    tuner = tune.Tuner(
        train_fn,
        param_space=config["search_space"],
        tune_config=tune.TuneConfig(num_samples=config["num_samples"])
    )
    results = tuner.fit()
    return results.get_best_result().config

def train_model(train_ds, val_ds, config):
    trainer = train.torch.TorchTrainer(
        train_loop_per_worker=train_fn,
        train_loop_config=config,
        datasets={"train": train_ds, "val": val_ds}
    )
    return trainer.fit()

def deploy_model(model, config):
    app = MLPredictionService.bind(model.checkpoint.path)
    serve.run(app)

执行流水线 #

python
config = {
    "train_path": "s3://bucket/data/train.csv",
    "val_path": "s3://bucket/data/val.csv",
    "search_space": {
        "lr": tune.loguniform(1e-4, 1e-2),
        "hidden_size": tune.choice([128, 256, 512]),
        "dropout": tune.uniform(0.1, 0.5)
    },
    "num_samples": 20
}

run_ml_pipeline(config)

6. 监控与运维 #

性能监控 #

python
import ray

ray.init()

print(ray.available_resources())

print(ray._private.internal_api.memory_summary())

ray.shutdown()

日志管理 #

python
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

def train_fn(config):
    logger.info(f"Starting training with config: {config}")
    pass

告警配置 #

python
def check_model_health():
    response = requests.get("http://localhost:8000/health")
    if response.status_code != 200:
        send_alert("Model service unhealthy!")

总结 #

本案例展示了如何使用 Ray 构建完整的机器学习流水线:

text
┌─────────────────────────────────────────────────────────────┐
│                    流水线总结                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 数据处理 (Ray Data)                                     │
│     ├── 大规模数据加载                                      │
│     ├── 并行预处理                                          │
│     └── 特征工程                                            │
│                                                             │
│  2. 模型训练 (Ray Train)                                    │
│     ├── 分布式训练                                          │
│     ├── 检查点管理                                          │
│     └── 多框架支持                                          │
│                                                             │
│  3. 超参数调优 (Ray Tune)                                   │
│     ├── 搜索算法                                            │
│     ├── 早停策略                                            │
│     └── 分布式执行                                          │
│                                                             │
│  4. 模型服务 (Ray Serve)                                    │
│     ├── 在线推理                                            │
│     ├── 自动扩缩容                                          │
│     └── 版本管理                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

恭喜你完成了 Ray 文档的学习!现在你已经掌握了构建分布式机器学习系统的核心技能。

最后更新:2026-04-05