模型部署实战 #

部署概述 #

text
┌─────────────────────────────────────────────────────────────┐
│                    ONNX 部署架构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  云端部署                                                   │
│  ├── REST API 服务                                          │
│  ├── gRPC 服务                                              │
│  ├── 容器化部署 (Docker/K8s)                                │
│  └── Serverless (AWS Lambda/云函数)                        │
│                                                             │
│  边缘部署                                                   │
│  ├── 边缘服务器                                             │
│  ├── IoT 设备                                               │
│  └── 嵌入式系统                                             │
│                                                             │
│  移动端部署                                                 │
│  ├── Android                                                │
│  ├── iOS                                                    │
│  └── 跨平台 (Flutter/React Native)                         │
│                                                             │
│  Web 部署                                                   │
│  ├── ONNX.js                                                │
│  └── WebAssembly                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

服务器部署 #

REST API 服务 #

python
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import onnxruntime as ort
import numpy as np
from PIL import Image
import io

app = FastAPI(title="ONNX Model API")

class ONNXModel:
    def __init__(self, model_path: str):
        self.session = ort.InferenceSession(model_path)
        self.input_name = self.session.get_inputs()[0].name
        self.input_shape = self.session.get_inputs()[0].shape
    
    def preprocess(self, image_bytes: bytes) -> np.ndarray:
        image = Image.open(io.BytesIO(image_bytes)).convert('RGB')
        image = image.resize((224, 224))
        image_array = np.array(image).astype(np.float32)
        image_array = image_array / 255.0
        image_array = (image_array - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
        image_array = np.transpose(image_array, (2, 0, 1))
        image_array = np.expand_dims(image_array, 0)
        return image_array
    
    def predict(self, image_bytes: bytes) -> np.ndarray:
        input_data = self.preprocess(image_bytes)
        outputs = self.session.run(None, {self.input_name: input_data})
        return outputs[0]

model = ONNXModel("resnet50.onnx")

@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    image_bytes = await file.read()
    predictions = model.predict(image_bytes)
    
    top5_indices = np.argsort(predictions[0])[-5:][::-1]
    top5_scores = predictions[0][top5_indices]
    
    results = [
        {"class_id": int(idx), "score": float(score)}
        for idx, score in zip(top5_indices, top5_scores)
    ]
    
    return JSONResponse(content={"predictions": results})

@app.get("/health")
async def health():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

gRPC 服务 #

python
import grpc
from concurrent import futures
import onnxruntime as ort
import numpy as np
import inference_pb2
import inference_pb2_grpc

class InferenceServicer(inference_pb2_grpc.InferenceServicer):
    def __init__(self, model_path: str):
        self.session = ort.InferenceSession(model_path)
        self.input_name = self.session.get_inputs()[0].name
    
    def Predict(self, request, context):
        input_data = np.frombuffer(request.input_data, dtype=np.float32)
        input_data = input_data.reshape(request.shape)
        
        outputs = self.session.run(None, {self.input_name: input_data})
        
        response = inference_pb2.PredictResponse()
        response.output_data = outputs[0].tobytes()
        response.shape.extend(outputs[0].shape)
        
        return response

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    inference_pb2_grpc.add_InferenceServicer_to_server(
        InferenceServicer("model.onnx"),
        server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

Docker 部署 #

dockerfile
FROM python:3.10-slim

WORKDIR /app

RUN pip install --no-cache-dir \
    fastapi \
    uvicorn \
    onnxruntime-gpu \
    numpy \
    pillow \
    python-multipart

COPY model.onnx /app/
COPY server.py /app/

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
bash
docker build -t onnx-server .
docker run -d -p 8000:8000 --gpus all onnx-server

GPU 加速部署 #

CUDA 部署 #

python
import onnxruntime as ort
import numpy as np

providers = [
    ('CUDAExecutionProvider', {
        'device_id': 0,
        'arena_extend_strategy': 'kNextPowerOfTwo',
        'gpu_mem_limit': 2 * 1024 * 1024 * 1024,
        'cudnn_conv_algo_search': 'EXHAUSTIVE',
        'do_copy_in_default_stream': True,
    }),
    'CPUExecutionProvider'
]

sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

session = ort.InferenceSession(
    "model.onnx",
    sess_options=sess_options,
    providers=providers
)

print(f"使用提供者: {session.get_providers()}")

input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
input_name = session.get_inputs()[0].name

outputs = session.run(None, {input_name: input_data})

TensorRT 加速 #

python
import onnxruntime as ort

providers = [
    ('TensorrtExecutionProvider', {
        'device_id': 0,
        'trt_max_workspace_size': 1 << 30,
        'trt_fp16_enable': True,
        'trt_engine_cache_enable': True,
        'trt_engine_cache_path': './trt_cache',
    }),
    'CUDAExecutionProvider',
    'CPUExecutionProvider'
]

session = ort.InferenceSession("model.onnx", providers=providers)

多 GPU 部署 #

python
import onnxruntime as ort
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import threading

class MultiGPUInference:
    def __init__(self, model_path: str, num_gpus: int):
        self.sessions = []
        for gpu_id in range(num_gpus):
            providers = [
                ('CUDAExecutionProvider', {'device_id': gpu_id}),
                'CPUExecutionProvider'
            ]
            session = ort.InferenceSession(model_path, providers=providers)
            self.sessions.append(session)
        
        self.input_name = self.sessions[0].get_inputs()[0].name
        self.current_gpu = 0
        self.lock = threading.Lock()
    
    def predict(self, input_data: np.ndarray) -> np.ndarray:
        with self.lock:
            gpu_id = self.current_gpu
            self.current_gpu = (self.current_gpu + 1) % len(self.sessions)
        
        session = self.sessions[gpu_id]
        return session.run(None, {self.input_name: input_data})[0]
    
    def predict_batch(self, batch_data: list) -> list:
        with ThreadPoolExecutor(max_workers=len(self.sessions)) as executor:
            futures = [
                executor.submit(self.predict, data)
                for data in batch_data
            ]
            return [f.result() for f in futures]

inference = MultiGPUInference("model.onnx", num_gpus=2)

批处理推理 #

动态批处理 #

python
import onnxruntime as ort
import numpy as np
from collections import deque
import threading
import time

class DynamicBatcher:
    def __init__(self, model_path: str, max_batch_size: int = 32, timeout_ms: int = 50):
        self.session = ort.InferenceSession(model_path)
        self.input_name = self.session.get_inputs()[0].name
        
        self.max_batch_size = max_batch_size
        self.timeout_ms = timeout_ms
        
        self.queue = deque()
        self.lock = threading.Lock()
        self.running = True
        
        self.worker_thread = threading.Thread(target=self._process_batch)
        self.worker_thread.start()
    
    def _process_batch(self):
        while self.running:
            batch = []
            batch_ids = []
            
            start_time = time.time()
            
            while len(batch) < self.max_batch_size:
                elapsed = (time.time() - start_time) * 1000
                if elapsed >= self.timeout_ms and len(batch) > 0:
                    break
                
                with self.lock:
                    if self.queue:
                        item = self.queue.popleft()
                        batch.append(item['data'])
                        batch_ids.append(item['id'])
                    else:
                        time.sleep(0.001)
            
            if batch:
                batch_data = np.concatenate(batch, axis=0)
                outputs = self.session.run(None, {self.input_name: batch_data})
                
                for i, item_id in enumerate(batch_ids):
                    pass
    
    def submit(self, input_data: np.ndarray) -> str:
        import uuid
        request_id = str(uuid.uuid4())
        
        with self.lock:
            self.queue.append({
                'id': request_id,
                'data': input_data
            })
        
        return request_id
    
    def shutdown(self):
        self.running = False
        self.worker_thread.join()

边缘设备部署 #

Intel OpenVINO #

python
import onnxruntime as ort

providers = [
    ('OpenVINOExecutionProvider', {
        'device_type': 'CPU',
        'precision': 'FP16',
        'enable_opencl_throttling': True,
    })
]

session = ort.InferenceSession("model.onnx", providers=providers)

ARM 设备 #

python
import onnxruntime as ort

sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 4
sess_options.inter_op_num_threads = 1
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

session = ort.InferenceSession(
    "model.onnx",
    sess_options=sess_options,
    providers=['CPUExecutionProvider']
)

Web 部署 #

ONNX.js #

html
<!DOCTYPE html>
<html>
<head>
    <title>ONNX.js Inference</title>
    <script src="https://cdn.jsdelivr.net/npm/onnxjs/dist/onnx.min.js"></script>
</head>
<body>
    <input type="file" id="imageInput" accept="image/*">
    <div id="result"></div>
    
    <script>
        async function runInference(imageElement) {
            const session = new onnx.InferenceSession();
            await session.loadModel('./model.onnx');
            
            const canvas = document.createElement('canvas');
            canvas.width = 224;
            canvas.height = 224;
            const ctx = canvas.getContext('2d');
            ctx.drawImage(imageElement, 0, 0, 224, 224);
            
            const imageData = ctx.getImageData(0, 0, 224, 224);
            const input = new onnx.Tensor(new Float32Array(224 * 224 * 3), 'float32', [1, 3, 224, 224]);
            
            const outputMap = await session.run([input]);
            const output = outputMap.values().next().value;
            
            return output.data;
        }
        
        document.getElementById('imageInput').addEventListener('change', async (e) => {
            const file = e.target.files[0];
            const img = new Image();
            img.src = URL.createObjectURL(file);
            img.onload = async () => {
                const result = await runInference(img);
                document.getElementById('result').textContent = 
                    'Predictions: ' + Array.from(result).slice(0, 5).join(', ');
            };
        });
    </script>
</body>
</html>

监控与日志 #

性能监控 #

python
import onnxruntime as ort
import numpy as np
import time
import logging
from dataclasses import dataclass
from typing import Optional
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class InferenceMetrics:
    latency_ms: float
    batch_size: int
    gpu_memory_used: Optional[float] = None
    
    def to_dict(self):
        return {
            "latency_ms": self.latency_ms,
            "batch_size": self.batch_size,
            "gpu_memory_used": self.gpu_memory_used
        }

class MonitoredInferenceSession:
    def __init__(self, model_path: str):
        self.session = ort.InferenceSession(model_path)
        self.input_name = self.session.get_inputs()[0].name
        self.metrics_history = []
    
    def run(self, input_data: np.ndarray) -> tuple:
        start_time = time.time()
        
        outputs = self.session.run(None, {self.input_name: input_data})
        
        latency = (time.time() - start_time) * 1000
        
        metrics = InferenceMetrics(
            latency_ms=latency,
            batch_size=input_data.shape[0]
        )
        
        self.metrics_history.append(metrics)
        
        logger.info(f"Inference latency: {latency:.2f} ms")
        
        return outputs, metrics
    
    def get_stats(self) -> dict:
        if not self.metrics_history:
            return {}
        
        latencies = [m.latency_ms for m in self.metrics_history]
        
        return {
            "total_requests": len(self.metrics_history),
            "avg_latency_ms": np.mean(latencies),
            "p50_latency_ms": np.percentile(latencies, 50),
            "p95_latency_ms": np.percentile(latencies, 95),
            "p99_latency_ms": np.percentile(latencies, 99),
            "max_latency_ms": max(latencies),
            "min_latency_ms": min(latencies)
        }

session = MonitoredInferenceSession("model.onnx")

for _ in range(100):
    input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
    outputs, metrics = session.run(input_data)

print(json.dumps(session.get_stats(), indent=2))

最佳实践 #

部署检查清单 #

text
┌─────────────────────────────────────────────────────────────┐
│                    部署检查清单                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  模型准备:                                                 │
│  □ 模型已验证正确性                                         │
│  □ 模型已优化(算子融合、常量折叠)                         │
│  □ 模型已量化(如需要)                                     │
│  □ 动态形状已正确配置                                       │
│                                                             │
│  性能优化:                                                 │
│  □ 选择合适的 Execution Provider                           │
│  □ 配置线程数                                               │
│  □ 启用图优化                                               │
│  □ 考虑批处理                                               │
│                                                             │
│  可靠性:                                                   │
│  □ 错误处理完善                                             │
│  □ 日志记录完整                                             │
│  □ 健康检查接口                                             │
│  □ 超时处理                                                 │
│                                                             │
│  监控:                                                     │
│  □ 性能指标监控                                             │
│  □ 资源使用监控                                             │
│  □ 错误率监控                                               │
│  □ 告警配置                                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

生产环境配置 #

python
import onnxruntime as ort
import os

def create_production_session(model_path: str) -> ort.InferenceSession:
    sess_options = ort.SessionOptions()
    
    num_cores = os.cpu_count() or 4
    sess_options.intra_op_num_threads = num_cores
    sess_options.inter_op_num_threads = 1
    
    sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
    
    sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
    
    sess_options.enable_mem_pattern = True
    sess_options.enable_cpu_mem_arena = True
    
    providers = ['CPUExecutionProvider']
    
    if 'CUDA_VISIBLE_DEVICES' in os.environ:
        providers.insert(0, 'CUDAExecutionProvider')
    
    session = ort.InferenceSession(
        model_path,
        sess_options=sess_options,
        providers=providers
    )
    
    return session

总结 #

ONNX 提供了灵活的模型部署方案,从云端服务器到边缘设备,从高性能 GPU 到移动端,都可以通过统一的 ONNX 格式实现高效推理。选择合适的部署方案需要考虑:

  1. 性能需求:延迟、吞吐量要求
  2. 硬件环境:CPU、GPU、专用加速器
  3. 部署规模:单机、集群、边缘
  4. 资源限制:内存、功耗、存储

通过合理的配置和优化,ONNX Runtime 可以在各种环境下提供卓越的推理性能。

最后更新:2026-04-04