Ray Serve 模型服务 #

什么是 Ray Serve? #

Ray Serve 是 Ray 提供的可扩展模型服务框架,用于将机器学习模型部署为生产级在线服务。它支持自动扩缩容、版本管理和复杂推理图。

text
┌─────────────────────────────────────────────────────────────┐
│                    Ray Serve 架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  客户端请求                                                  │
│       │                                                      │
│       ▼                                                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   HTTP Server                        │   │
│  │              (FastAPI + Uvicorn)                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Serve Controller                   │   │
│  │  ├── 路由管理                                        │   │
│  │  ├── 负载均衡                                        │   │
│  │  └── 版本管理                                        │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Deployment                         │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ Replica │  │ Replica │  │ Replica │             │   │
│  │  │   1     │  │   2     │  │   3     │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本用法 #

简单部署 #

python
from ray import serve

@serve.deployment
class MyModel:
    def __init__(self):
        self.model = self._load_model()
    
    def _load_model(self):
        return "dummy_model"
    
    def __call__(self, request):
        data = request.json()
        return {"prediction": f"processed {data}"}

app = MyModel.bind()
serve.run(app)

HTTP 请求 #

python
import requests

response = requests.post(
    "http://localhost:8000",
    json={"input": "test"}
)
print(response.json())

函数部署 #

python
from ray import serve

@serve.deployment
def predict(request):
    data = request.json()
    return {"result": data["input"] * 2}

serve.run(predict.bind())

Deployment 配置 #

资源配置 #

python
from ray import serve

@serve.deployment(
    num_replicas=3,
    ray_actor_options={"num_cpus": 2, "num_gpus": 1}
)
class ModelDeployment:
    def __init__(self):
        self.model = self._load_model()
    
    def __call__(self, request):
        return {"prediction": "result"}

serve.run(ModelDeployment.bind())

配置选项 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Deployment 配置                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  副本管理:                                                  │
│  ├── num_replicas: 副本数量                                 │
│  ├── max_concurrent_queries: 每个副本最大并发数             │
│  └── autoscaling_config: 自动扩缩容配置                     │
│                                                             │
│  资源配置:                                                  │
│  ├── ray_actor_options: Actor 资源选项                      │
│  │   ├── num_cpus: CPU 数量                                │
│  │   ├── num_gpus: GPU 数量                                │
│  │   └── memory: 内存大小                                  │
│  └── placement_group: Placement Group 配置                  │
│                                                             │
│  版本管理:                                                  │
│  ├── version: 部署版本                                      │
│  ├── prev_version: 前一版本                                 │
│  └── traffic: 流量分配                                      │
│                                                             │
│  健康检查:                                                  │
│  ├── health_check_period_s: 健康检查周期                    │
│  └── health_check_timeout_s: 健康检查超时                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

自动扩缩容 #

python
from ray import serve

@serve.deployment(
    autoscaling_config=serve.config.AutoscalingConfig(
        min_replicas=1,
        max_replicas=10,
        target_num_ongoing_requests_per_replica=5,
        target_average_latency_ms=100
    )
)
class AutoScalingModel:
    def __call__(self, request):
        return {"result": "prediction"}

serve.run(AutoScalingModel.bind())

推理图 #

模型组合 #

python
from ray import serve

@serve.deployment
class Preprocessor:
    def __call__(self, request):
        data = request.json()
        return {"preprocessed": data["input"].upper()}

@serve.deployment
class Model:
    def __init__(self, preprocessor):
        self.preprocessor = preprocessor
    
    async def __call__(self, request):
        preprocessed = await self.preprocessor.__call__.remote(request)
        return {"prediction": f"model({preprocessed['preprocessed']})"}

app = Model.bind(Preprocessor.bind())
serve.run(app)

条件路由 #

python
from ray import serve

@serve.deployment
class ModelA:
    def __call__(self, request):
        return {"model": "A", "result": "prediction_a"}

@serve.deployment
class ModelB:
    def __call__(self, request):
        return {"model": "B", "result": "prediction_b"}

@serve.deployment
class Router:
    def __init__(self, model_a, model_b):
        self.model_a = model_a
        self.model_b = model_b
    
    async def __call__(self, request):
        data = request.json()
        if data.get("use_model_b"):
            return await self.model_b.__call__.remote(request)
        return await self.model_a.__call__.remote(request)

app = Router.bind(ModelA.bind(), ModelB.bind())
serve.run(app)

并行推理 #

python
from ray import serve
import asyncio

@serve.deployment
class FeatureExtractor:
    def __call__(self, data):
        return f"features({data})"

@serve.deployment
class Model:
    def __init__(self, feature_extractor):
        self.feature_extractor = feature_extractor
    
    async def __call__(self, request):
        data = request.json()["input"]
        
        features_ref = self.feature_extractor.__call__.remote(data)
        other_ref = self.feature_extractor.__call__.remote(data + "_other")
        
        features, other = await asyncio.gather(
            features_ref, other_ref
        )
        
        return {"features": features, "other": other}

app = Model.bind(FeatureExtractor.bind())
serve.run(app)

版本管理 #

蓝绿部署 #

python
from ray import serve

@serve.deployment(version="v1")
class Model:
    def __init__(self):
        self.version = "v1"
    
    def __call__(self, request):
        return {"version": self.version}

serve.run(Model.bind())

@serve.deployment(version="v2")
class Model:
    def __init__(self):
        self.version = "v2"
    
    def __call__(self, request):
        return {"version": self.version}

serve.run(Model.bind())

流量分配 #

python
from ray import serve

serve.run(
    Model.options(
        version="v1",
        traffic=30
    ).bind()
)

serve.run(
    Model.options(
        version="v2",
        traffic=70
    ).bind()
)

FastAPI 集成 #

自定义路由 #

python
from ray import serve
from fastapi import FastAPI

app = FastAPI()

@serve.deployment
@serve.ingress(app)
class FastAPIDeployment:
    def __init__(self):
        self.model = "model"
    
    @app.get("/")
    def root(self):
        return {"message": "Hello"}
    
    @app.post("/predict")
    def predict(self, data: dict):
        return {"prediction": f"processed {data}"}
    
    @app.get("/health")
    def health(self):
        return {"status": "healthy"}

serve.run(FastAPIDeployment.bind())

请求验证 #

python
from ray import serve
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class PredictRequest(BaseModel):
    input: str
    threshold: float = 0.5

class PredictResponse(BaseModel):
    prediction: str
    confidence: float

@serve.deployment
@serve.ingress(app)
class ValidatedModel:
    @app.post("/predict", response_model=PredictResponse)
    def predict(self, request: PredictRequest):
        if len(request.input) == 0:
            raise HTTPException(status_code=400, detail="Input cannot be empty")
        
        return PredictResponse(
            prediction=f"result for {request.input}",
            confidence=request.threshold
        )

serve.run(ValidatedModel.bind())

生产实践 #

健康检查 #

python
from ray import serve

@serve.deployment(
    health_check_period_s=10,
    health_check_timeout_s=30
)
class HealthyModel:
    def __init__(self):
        self.healthy = True
        self.model = self._load_model()
    
    def _load_model(self):
        return "model"
    
    def check_health(self):
        if not self.healthy:
            raise RuntimeError("Unhealthy")
    
    def __call__(self, request):
        return {"status": "ok"}
    
    def set_unhealthy(self):
        self.healthy = False

serve.run(HealthyModel.bind())

优雅关闭 #

python
from ray import serve

@serve.deployment(graceful_shutdown_timeout_s=30)
class GracefulModel:
    def __init__(self):
        self.processing = False
    
    def __call__(self, request):
        self.processing = True
        import time
        time.sleep(5)
        self.processing = False
        return {"result": "done"}
    
    def reconfigure(self, config):
        pass

serve.run(GracefulModel.bind())

监控指标 #

python
from ray import serve
import time

@serve.deployment
class MonitoredModel:
    def __init__(self):
        self.request_count = 0
        self.total_latency = 0
    
    def __call__(self, request):
        start = time.time()
        
        result = self._predict(request.json())
        
        latency = time.time() - start
        self.request_count += 1
        self.total_latency += latency
        
        return {
            "result": result,
            "metrics": {
                "request_count": self.request_count,
                "avg_latency": self.total_latency / self.request_count
            }
        }
    
    def _predict(self, data):
        return "prediction"

serve.run(MonitoredModel.bind())

最佳实践 #

1. 合理设置副本数 #

python
from ray import serve

@serve.deployment(num_replicas=3)
class ProductionModel:
    pass

@serve.deployment(
    autoscaling_config=serve.config.AutoscalingConfig(
        min_replicas=1,
        max_replicas=10
    )
)
class AutoScalingModel:
    pass

2. 使用 GPU 加速 #

python
from ray import serve

@serve.deployment(
    ray_actor_options={"num_gpus": 1}
)
class GPUModel:
    def __init__(self):
        import torch
        self.device = torch.device("cuda:0")
        self.model = self._load_model()
    
    def __call__(self, request):
        return {"prediction": "gpu_result"}

serve.run(GPUModel.bind())

3. 批处理请求 #

python
from ray import serve

@serve.deployment(max_concurrent_queries=10)
class BatchModel:
    def __init__(self):
        self.batch = []
    
    def __call__(self, request):
        return self._predict(request.json())
    
    def _predict(self, data):
        return "prediction"

serve.run(BatchModel.bind())

下一步 #

掌握了 Ray Serve 之后,继续学习 Ray Tune 超参数调优,了解如何优化模型性能!

最后更新:2026-04-05