模型部署实战 #
部署概述 #
text
┌─────────────────────────────────────────────────────────────┐
│ ONNX 部署架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 云端部署 │
│ ├── REST API 服务 │
│ ├── gRPC 服务 │
│ ├── 容器化部署 (Docker/K8s) │
│ └── Serverless (AWS Lambda/云函数) │
│ │
│ 边缘部署 │
│ ├── 边缘服务器 │
│ ├── IoT 设备 │
│ └── 嵌入式系统 │
│ │
│ 移动端部署 │
│ ├── Android │
│ ├── iOS │
│ └── 跨平台 (Flutter/React Native) │
│ │
│ Web 部署 │
│ ├── ONNX.js │
│ └── WebAssembly │
│ │
└─────────────────────────────────────────────────────────────┘
服务器部署 #
REST API 服务 #
python
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import onnxruntime as ort
import numpy as np
from PIL import Image
import io
app = FastAPI(title="ONNX Model API")
class ONNXModel:
def __init__(self, model_path: str):
self.session = ort.InferenceSession(model_path)
self.input_name = self.session.get_inputs()[0].name
self.input_shape = self.session.get_inputs()[0].shape
def preprocess(self, image_bytes: bytes) -> np.ndarray:
image = Image.open(io.BytesIO(image_bytes)).convert('RGB')
image = image.resize((224, 224))
image_array = np.array(image).astype(np.float32)
image_array = image_array / 255.0
image_array = (image_array - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
image_array = np.transpose(image_array, (2, 0, 1))
image_array = np.expand_dims(image_array, 0)
return image_array
def predict(self, image_bytes: bytes) -> np.ndarray:
input_data = self.preprocess(image_bytes)
outputs = self.session.run(None, {self.input_name: input_data})
return outputs[0]
model = ONNXModel("resnet50.onnx")
@app.post("/predict")
async def predict(file: UploadFile = File(...)):
image_bytes = await file.read()
predictions = model.predict(image_bytes)
top5_indices = np.argsort(predictions[0])[-5:][::-1]
top5_scores = predictions[0][top5_indices]
results = [
{"class_id": int(idx), "score": float(score)}
for idx, score in zip(top5_indices, top5_scores)
]
return JSONResponse(content={"predictions": results})
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
gRPC 服务 #
python
import grpc
from concurrent import futures
import onnxruntime as ort
import numpy as np
import inference_pb2
import inference_pb2_grpc
class InferenceServicer(inference_pb2_grpc.InferenceServicer):
def __init__(self, model_path: str):
self.session = ort.InferenceSession(model_path)
self.input_name = self.session.get_inputs()[0].name
def Predict(self, request, context):
input_data = np.frombuffer(request.input_data, dtype=np.float32)
input_data = input_data.reshape(request.shape)
outputs = self.session.run(None, {self.input_name: input_data})
response = inference_pb2.PredictResponse()
response.output_data = outputs[0].tobytes()
response.shape.extend(outputs[0].shape)
return response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
inference_pb2_grpc.add_InferenceServicer_to_server(
InferenceServicer("model.onnx"),
server
)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
Docker 部署 #
dockerfile
FROM python:3.10-slim
WORKDIR /app
RUN pip install --no-cache-dir \
fastapi \
uvicorn \
onnxruntime-gpu \
numpy \
pillow \
python-multipart
COPY model.onnx /app/
COPY server.py /app/
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
bash
docker build -t onnx-server .
docker run -d -p 8000:8000 --gpus all onnx-server
GPU 加速部署 #
CUDA 部署 #
python
import onnxruntime as ort
import numpy as np
providers = [
('CUDAExecutionProvider', {
'device_id': 0,
'arena_extend_strategy': 'kNextPowerOfTwo',
'gpu_mem_limit': 2 * 1024 * 1024 * 1024,
'cudnn_conv_algo_search': 'EXHAUSTIVE',
'do_copy_in_default_stream': True,
}),
'CPUExecutionProvider'
]
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session = ort.InferenceSession(
"model.onnx",
sess_options=sess_options,
providers=providers
)
print(f"使用提供者: {session.get_providers()}")
input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
input_name = session.get_inputs()[0].name
outputs = session.run(None, {input_name: input_data})
TensorRT 加速 #
python
import onnxruntime as ort
providers = [
('TensorrtExecutionProvider', {
'device_id': 0,
'trt_max_workspace_size': 1 << 30,
'trt_fp16_enable': True,
'trt_engine_cache_enable': True,
'trt_engine_cache_path': './trt_cache',
}),
'CUDAExecutionProvider',
'CPUExecutionProvider'
]
session = ort.InferenceSession("model.onnx", providers=providers)
多 GPU 部署 #
python
import onnxruntime as ort
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import threading
class MultiGPUInference:
def __init__(self, model_path: str, num_gpus: int):
self.sessions = []
for gpu_id in range(num_gpus):
providers = [
('CUDAExecutionProvider', {'device_id': gpu_id}),
'CPUExecutionProvider'
]
session = ort.InferenceSession(model_path, providers=providers)
self.sessions.append(session)
self.input_name = self.sessions[0].get_inputs()[0].name
self.current_gpu = 0
self.lock = threading.Lock()
def predict(self, input_data: np.ndarray) -> np.ndarray:
with self.lock:
gpu_id = self.current_gpu
self.current_gpu = (self.current_gpu + 1) % len(self.sessions)
session = self.sessions[gpu_id]
return session.run(None, {self.input_name: input_data})[0]
def predict_batch(self, batch_data: list) -> list:
with ThreadPoolExecutor(max_workers=len(self.sessions)) as executor:
futures = [
executor.submit(self.predict, data)
for data in batch_data
]
return [f.result() for f in futures]
inference = MultiGPUInference("model.onnx", num_gpus=2)
批处理推理 #
动态批处理 #
python
import onnxruntime as ort
import numpy as np
from collections import deque
import threading
import time
class DynamicBatcher:
def __init__(self, model_path: str, max_batch_size: int = 32, timeout_ms: int = 50):
self.session = ort.InferenceSession(model_path)
self.input_name = self.session.get_inputs()[0].name
self.max_batch_size = max_batch_size
self.timeout_ms = timeout_ms
self.queue = deque()
self.lock = threading.Lock()
self.running = True
self.worker_thread = threading.Thread(target=self._process_batch)
self.worker_thread.start()
def _process_batch(self):
while self.running:
batch = []
batch_ids = []
start_time = time.time()
while len(batch) < self.max_batch_size:
elapsed = (time.time() - start_time) * 1000
if elapsed >= self.timeout_ms and len(batch) > 0:
break
with self.lock:
if self.queue:
item = self.queue.popleft()
batch.append(item['data'])
batch_ids.append(item['id'])
else:
time.sleep(0.001)
if batch:
batch_data = np.concatenate(batch, axis=0)
outputs = self.session.run(None, {self.input_name: batch_data})
for i, item_id in enumerate(batch_ids):
pass
def submit(self, input_data: np.ndarray) -> str:
import uuid
request_id = str(uuid.uuid4())
with self.lock:
self.queue.append({
'id': request_id,
'data': input_data
})
return request_id
def shutdown(self):
self.running = False
self.worker_thread.join()
边缘设备部署 #
Intel OpenVINO #
python
import onnxruntime as ort
providers = [
('OpenVINOExecutionProvider', {
'device_type': 'CPU',
'precision': 'FP16',
'enable_opencl_throttling': True,
})
]
session = ort.InferenceSession("model.onnx", providers=providers)
ARM 设备 #
python
import onnxruntime as ort
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 4
sess_options.inter_op_num_threads = 1
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session = ort.InferenceSession(
"model.onnx",
sess_options=sess_options,
providers=['CPUExecutionProvider']
)
Web 部署 #
ONNX.js #
html
<!DOCTYPE html>
<html>
<head>
<title>ONNX.js Inference</title>
<script src="https://cdn.jsdelivr.net/npm/onnxjs/dist/onnx.min.js"></script>
</head>
<body>
<input type="file" id="imageInput" accept="image/*">
<div id="result"></div>
<script>
async function runInference(imageElement) {
const session = new onnx.InferenceSession();
await session.loadModel('./model.onnx');
const canvas = document.createElement('canvas');
canvas.width = 224;
canvas.height = 224;
const ctx = canvas.getContext('2d');
ctx.drawImage(imageElement, 0, 0, 224, 224);
const imageData = ctx.getImageData(0, 0, 224, 224);
const input = new onnx.Tensor(new Float32Array(224 * 224 * 3), 'float32', [1, 3, 224, 224]);
const outputMap = await session.run([input]);
const output = outputMap.values().next().value;
return output.data;
}
document.getElementById('imageInput').addEventListener('change', async (e) => {
const file = e.target.files[0];
const img = new Image();
img.src = URL.createObjectURL(file);
img.onload = async () => {
const result = await runInference(img);
document.getElementById('result').textContent =
'Predictions: ' + Array.from(result).slice(0, 5).join(', ');
};
});
</script>
</body>
</html>
监控与日志 #
性能监控 #
python
import onnxruntime as ort
import numpy as np
import time
import logging
from dataclasses import dataclass
from typing import Optional
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class InferenceMetrics:
latency_ms: float
batch_size: int
gpu_memory_used: Optional[float] = None
def to_dict(self):
return {
"latency_ms": self.latency_ms,
"batch_size": self.batch_size,
"gpu_memory_used": self.gpu_memory_used
}
class MonitoredInferenceSession:
def __init__(self, model_path: str):
self.session = ort.InferenceSession(model_path)
self.input_name = self.session.get_inputs()[0].name
self.metrics_history = []
def run(self, input_data: np.ndarray) -> tuple:
start_time = time.time()
outputs = self.session.run(None, {self.input_name: input_data})
latency = (time.time() - start_time) * 1000
metrics = InferenceMetrics(
latency_ms=latency,
batch_size=input_data.shape[0]
)
self.metrics_history.append(metrics)
logger.info(f"Inference latency: {latency:.2f} ms")
return outputs, metrics
def get_stats(self) -> dict:
if not self.metrics_history:
return {}
latencies = [m.latency_ms for m in self.metrics_history]
return {
"total_requests": len(self.metrics_history),
"avg_latency_ms": np.mean(latencies),
"p50_latency_ms": np.percentile(latencies, 50),
"p95_latency_ms": np.percentile(latencies, 95),
"p99_latency_ms": np.percentile(latencies, 99),
"max_latency_ms": max(latencies),
"min_latency_ms": min(latencies)
}
session = MonitoredInferenceSession("model.onnx")
for _ in range(100):
input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
outputs, metrics = session.run(input_data)
print(json.dumps(session.get_stats(), indent=2))
最佳实践 #
部署检查清单 #
text
┌─────────────────────────────────────────────────────────────┐
│ 部署检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 模型准备: │
│ □ 模型已验证正确性 │
│ □ 模型已优化(算子融合、常量折叠) │
│ □ 模型已量化(如需要) │
│ □ 动态形状已正确配置 │
│ │
│ 性能优化: │
│ □ 选择合适的 Execution Provider │
│ □ 配置线程数 │
│ □ 启用图优化 │
│ □ 考虑批处理 │
│ │
│ 可靠性: │
│ □ 错误处理完善 │
│ □ 日志记录完整 │
│ □ 健康检查接口 │
│ □ 超时处理 │
│ │
│ 监控: │
│ □ 性能指标监控 │
│ □ 资源使用监控 │
│ □ 错误率监控 │
│ □ 告警配置 │
│ │
└─────────────────────────────────────────────────────────────┘
生产环境配置 #
python
import onnxruntime as ort
import os
def create_production_session(model_path: str) -> ort.InferenceSession:
sess_options = ort.SessionOptions()
num_cores = os.cpu_count() or 4
sess_options.intra_op_num_threads = num_cores
sess_options.inter_op_num_threads = 1
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
sess_options.enable_mem_pattern = True
sess_options.enable_cpu_mem_arena = True
providers = ['CPUExecutionProvider']
if 'CUDA_VISIBLE_DEVICES' in os.environ:
providers.insert(0, 'CUDAExecutionProvider')
session = ort.InferenceSession(
model_path,
sess_options=sess_options,
providers=providers
)
return session
总结 #
ONNX 提供了灵活的模型部署方案,从云端服务器到边缘设备,从高性能 GPU 到移动端,都可以通过统一的 ONNX 格式实现高效推理。选择合适的部署方案需要考虑:
- 性能需求:延迟、吞吐量要求
- 硬件环境:CPU、GPU、专用加速器
- 部署规模:单机、集群、边缘
- 资源限制:内存、功耗、存储
通过合理的配置和优化,ONNX Runtime 可以在各种环境下提供卓越的推理性能。
最后更新:2026-04-04