Ray Serve 模型服务 #
什么是 Ray Serve? #
Ray Serve 是 Ray 提供的可扩展模型服务框架,用于将机器学习模型部署为生产级在线服务。它支持自动扩缩容、版本管理和复杂推理图。
text
┌─────────────────────────────────────────────────────────────┐
│ Ray Serve 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端请求 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ HTTP Server │ │
│ │ (FastAPI + Uvicorn) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Serve Controller │ │
│ │ ├── 路由管理 │ │
│ │ ├── 负载均衡 │ │
│ │ └── 版本管理 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Deployment │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Replica │ │ Replica │ │ Replica │ │ │
│ │ │ 1 │ │ 2 │ │ 3 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
基本用法 #
简单部署 #
python
from ray import serve
@serve.deployment
class MyModel:
def __init__(self):
self.model = self._load_model()
def _load_model(self):
return "dummy_model"
def __call__(self, request):
data = request.json()
return {"prediction": f"processed {data}"}
app = MyModel.bind()
serve.run(app)
HTTP 请求 #
python
import requests
response = requests.post(
"http://localhost:8000",
json={"input": "test"}
)
print(response.json())
函数部署 #
python
from ray import serve
@serve.deployment
def predict(request):
data = request.json()
return {"result": data["input"] * 2}
serve.run(predict.bind())
Deployment 配置 #
资源配置 #
python
from ray import serve
@serve.deployment(
num_replicas=3,
ray_actor_options={"num_cpus": 2, "num_gpus": 1}
)
class ModelDeployment:
def __init__(self):
self.model = self._load_model()
def __call__(self, request):
return {"prediction": "result"}
serve.run(ModelDeployment.bind())
配置选项 #
text
┌─────────────────────────────────────────────────────────────┐
│ Deployment 配置 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 副本管理: │
│ ├── num_replicas: 副本数量 │
│ ├── max_concurrent_queries: 每个副本最大并发数 │
│ └── autoscaling_config: 自动扩缩容配置 │
│ │
│ 资源配置: │
│ ├── ray_actor_options: Actor 资源选项 │
│ │ ├── num_cpus: CPU 数量 │
│ │ ├── num_gpus: GPU 数量 │
│ │ └── memory: 内存大小 │
│ └── placement_group: Placement Group 配置 │
│ │
│ 版本管理: │
│ ├── version: 部署版本 │
│ ├── prev_version: 前一版本 │
│ └── traffic: 流量分配 │
│ │
│ 健康检查: │
│ ├── health_check_period_s: 健康检查周期 │
│ └── health_check_timeout_s: 健康检查超时 │
│ │
└─────────────────────────────────────────────────────────────┘
自动扩缩容 #
python
from ray import serve
@serve.deployment(
autoscaling_config=serve.config.AutoscalingConfig(
min_replicas=1,
max_replicas=10,
target_num_ongoing_requests_per_replica=5,
target_average_latency_ms=100
)
)
class AutoScalingModel:
def __call__(self, request):
return {"result": "prediction"}
serve.run(AutoScalingModel.bind())
推理图 #
模型组合 #
python
from ray import serve
@serve.deployment
class Preprocessor:
def __call__(self, request):
data = request.json()
return {"preprocessed": data["input"].upper()}
@serve.deployment
class Model:
def __init__(self, preprocessor):
self.preprocessor = preprocessor
async def __call__(self, request):
preprocessed = await self.preprocessor.__call__.remote(request)
return {"prediction": f"model({preprocessed['preprocessed']})"}
app = Model.bind(Preprocessor.bind())
serve.run(app)
条件路由 #
python
from ray import serve
@serve.deployment
class ModelA:
def __call__(self, request):
return {"model": "A", "result": "prediction_a"}
@serve.deployment
class ModelB:
def __call__(self, request):
return {"model": "B", "result": "prediction_b"}
@serve.deployment
class Router:
def __init__(self, model_a, model_b):
self.model_a = model_a
self.model_b = model_b
async def __call__(self, request):
data = request.json()
if data.get("use_model_b"):
return await self.model_b.__call__.remote(request)
return await self.model_a.__call__.remote(request)
app = Router.bind(ModelA.bind(), ModelB.bind())
serve.run(app)
并行推理 #
python
from ray import serve
import asyncio
@serve.deployment
class FeatureExtractor:
def __call__(self, data):
return f"features({data})"
@serve.deployment
class Model:
def __init__(self, feature_extractor):
self.feature_extractor = feature_extractor
async def __call__(self, request):
data = request.json()["input"]
features_ref = self.feature_extractor.__call__.remote(data)
other_ref = self.feature_extractor.__call__.remote(data + "_other")
features, other = await asyncio.gather(
features_ref, other_ref
)
return {"features": features, "other": other}
app = Model.bind(FeatureExtractor.bind())
serve.run(app)
版本管理 #
蓝绿部署 #
python
from ray import serve
@serve.deployment(version="v1")
class Model:
def __init__(self):
self.version = "v1"
def __call__(self, request):
return {"version": self.version}
serve.run(Model.bind())
@serve.deployment(version="v2")
class Model:
def __init__(self):
self.version = "v2"
def __call__(self, request):
return {"version": self.version}
serve.run(Model.bind())
流量分配 #
python
from ray import serve
serve.run(
Model.options(
version="v1",
traffic=30
).bind()
)
serve.run(
Model.options(
version="v2",
traffic=70
).bind()
)
FastAPI 集成 #
自定义路由 #
python
from ray import serve
from fastapi import FastAPI
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class FastAPIDeployment:
def __init__(self):
self.model = "model"
@app.get("/")
def root(self):
return {"message": "Hello"}
@app.post("/predict")
def predict(self, data: dict):
return {"prediction": f"processed {data}"}
@app.get("/health")
def health(self):
return {"status": "healthy"}
serve.run(FastAPIDeployment.bind())
请求验证 #
python
from ray import serve
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class PredictRequest(BaseModel):
input: str
threshold: float = 0.5
class PredictResponse(BaseModel):
prediction: str
confidence: float
@serve.deployment
@serve.ingress(app)
class ValidatedModel:
@app.post("/predict", response_model=PredictResponse)
def predict(self, request: PredictRequest):
if len(request.input) == 0:
raise HTTPException(status_code=400, detail="Input cannot be empty")
return PredictResponse(
prediction=f"result for {request.input}",
confidence=request.threshold
)
serve.run(ValidatedModel.bind())
生产实践 #
健康检查 #
python
from ray import serve
@serve.deployment(
health_check_period_s=10,
health_check_timeout_s=30
)
class HealthyModel:
def __init__(self):
self.healthy = True
self.model = self._load_model()
def _load_model(self):
return "model"
def check_health(self):
if not self.healthy:
raise RuntimeError("Unhealthy")
def __call__(self, request):
return {"status": "ok"}
def set_unhealthy(self):
self.healthy = False
serve.run(HealthyModel.bind())
优雅关闭 #
python
from ray import serve
@serve.deployment(graceful_shutdown_timeout_s=30)
class GracefulModel:
def __init__(self):
self.processing = False
def __call__(self, request):
self.processing = True
import time
time.sleep(5)
self.processing = False
return {"result": "done"}
def reconfigure(self, config):
pass
serve.run(GracefulModel.bind())
监控指标 #
python
from ray import serve
import time
@serve.deployment
class MonitoredModel:
def __init__(self):
self.request_count = 0
self.total_latency = 0
def __call__(self, request):
start = time.time()
result = self._predict(request.json())
latency = time.time() - start
self.request_count += 1
self.total_latency += latency
return {
"result": result,
"metrics": {
"request_count": self.request_count,
"avg_latency": self.total_latency / self.request_count
}
}
def _predict(self, data):
return "prediction"
serve.run(MonitoredModel.bind())
最佳实践 #
1. 合理设置副本数 #
python
from ray import serve
@serve.deployment(num_replicas=3)
class ProductionModel:
pass
@serve.deployment(
autoscaling_config=serve.config.AutoscalingConfig(
min_replicas=1,
max_replicas=10
)
)
class AutoScalingModel:
pass
2. 使用 GPU 加速 #
python
from ray import serve
@serve.deployment(
ray_actor_options={"num_gpus": 1}
)
class GPUModel:
def __init__(self):
import torch
self.device = torch.device("cuda:0")
self.model = self._load_model()
def __call__(self, request):
return {"prediction": "gpu_result"}
serve.run(GPUModel.bind())
3. 批处理请求 #
python
from ray import serve
@serve.deployment(max_concurrent_queries=10)
class BatchModel:
def __init__(self):
self.batch = []
def __call__(self, request):
return self._predict(request.json())
def _predict(self, data):
return "prediction"
serve.run(BatchModel.bind())
下一步 #
掌握了 Ray Serve 之后,继续学习 Ray Tune 超参数调优,了解如何优化模型性能!
最后更新:2026-04-05