OpenAI API 最佳实践 #
概述 #
在生产环境中使用 OpenAI API 需要考虑成本、安全、性能、可靠性等多个方面。本章将介绍经过实践验证的最佳实践,帮助你构建稳定、高效、安全的 AI 应用。
text
┌─────────────────────────────────────────────────────────────┐
│ 最佳实践框架 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 成本优化 │
│ ├── Token 管理 │
│ ├── 模型选择 │
│ └── 缓存策略 │
│ │
│ 安全合规 │
│ ├── API Key 管理 │
│ ├── 数据隐私 │
│ └── 内容审核 │
│ │
│ 性能优化 │
│ ├── 并发处理 │
│ ├── 流式响应 │
│ └── 超时控制 │
│ │
│ 可靠性 │
│ ├── 错误处理 │
│ ├── 重试机制 │
│ └── 监控告警 │
│ │
└─────────────────────────────────────────────────────────────┘
成本优化 #
Token 管理 #
text
┌─────────────────────────────────────────────────────────────┐
│ Token 优化策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 精简提示词 │
│ - 删除不必要的描述 │
│ - 使用简洁的语言 │
│ - 避免重复内容 │
│ │
│ 2. 控制输出长度 │
│ - 设置合理的 max_tokens │
│ - 使用 stop 序列 │
│ │
│ 3. 对话历史管理 │
│ - 限制历史消息数量 │
│ - 使用摘要替代完整历史 │
│ │
│ 4. 选择合适的模型 │
│ - 简单任务用 gpt-4o-mini │
│ - 复杂任务用 gpt-4o │
│ │
└─────────────────────────────────────────────────────────────┘
Token 计算与监控 #
python
import tiktoken
from openai import OpenAI
client = OpenAI()
class TokenManager:
def __init__(self, model: str = "gpt-4o-mini"):
self.model = model
self.encoding = tiktoken.encoding_for_model(model)
self.total_tokens = 0
def count_tokens(self, text: str) -> int:
return len(self.encoding.encode(text))
def count_messages_tokens(self, messages: list) -> int:
total = 0
for message in messages:
total += 4
for key, value in message.items():
total += len(self.encoding.encode(str(value)))
if key == "name":
total -= 1
total += 2
return total
def truncate_text(self, text: str, max_tokens: int) -> str:
tokens = self.encoding.encode(text)
if len(tokens) <= max_tokens:
return text
return self.encoding.decode(tokens[:max_tokens])
def estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
prices = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4-turbo": {"input": 10.00, "output": 30.00}
}
price = prices.get(self.model, prices["gpt-4o-mini"])
input_cost = (input_tokens / 1_000_000) * price["input"]
output_cost = (output_tokens / 1_000_000) * price["output"]
return input_cost + output_cost
manager = TokenManager()
text = "这是一段测试文本"
print(f"Token 数量: {manager.count_tokens(text)}")
messages = [
{"role": "system", "content": "你是一个助手"},
{"role": "user", "content": "你好"}
]
print(f"消息 Token: {manager.count_messages_tokens(messages)}")
缓存策略 #
python
import hashlib
import json
from typing import Optional
import redis
class ResponseCache:
def __init__(self, redis_client=None, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
self.local_cache = {}
def _get_cache_key(self, messages: list, model: str, **kwargs) -> str:
cache_data = {
"messages": messages,
"model": model,
**{k: v for k, v in kwargs.items() if k in ["temperature", "max_tokens"]}
}
return hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()
def get(self, messages: list, model: str, **kwargs) -> Optional[str]:
key = self._get_cache_key(messages, model, **kwargs)
if self.redis:
cached = self.redis.get(key)
if cached:
return cached.decode()
else:
return self.local_cache.get(key)
return None
def set(self, messages: list, model: str, response: str, **kwargs):
key = self._get_cache_key(messages, model, **kwargs)
if self.redis:
self.redis.setex(key, self.ttl, response)
else:
self.local_cache[key] = response
cache = ResponseCache()
def cached_chat(messages: list, model: str = "gpt-4o-mini", **kwargs) -> str:
cached_response = cache.get(messages, model, **kwargs)
if cached_response:
return cached_response
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
result = response.choices[0].message.content
cache.set(messages, model, result, **kwargs)
return result
模型选择策略 #
python
def select_model(task_type: str, complexity: str = "medium") -> str:
"""
根据任务类型和复杂度选择模型
task_type:
- chat: 一般对话
- code: 代码生成
- analysis: 数据分析
- creative: 创意写作
- reasoning: 复杂推理
complexity:
- low: 简单任务
- medium: 中等复杂度
- high: 高复杂度
"""
model_matrix = {
"chat": {
"low": "gpt-4o-mini",
"medium": "gpt-4o-mini",
"high": "gpt-4o"
},
"code": {
"low": "gpt-4o-mini",
"medium": "gpt-4o",
"high": "gpt-4o"
},
"analysis": {
"low": "gpt-4o-mini",
"medium": "gpt-4o",
"high": "gpt-4-turbo"
},
"creative": {
"low": "gpt-4o-mini",
"medium": "gpt-4o",
"high": "gpt-4o"
},
"reasoning": {
"low": "gpt-4o-mini",
"medium": "o1-mini",
"high": "o1"
}
}
return model_matrix.get(task_type, {}).get(complexity, "gpt-4o-mini")
安全合规 #
API Key 管理 #
python
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY 环境变量未设置")
client = OpenAI(api_key=api_key)
敏感信息过滤 #
python
import re
class SensitiveFilter:
def __init__(self):
self.patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{11}\b',
"id_card": r'\b\d{17}[\dXx]\b',
"credit_card": r'\b\d{16}\b',
"api_key": r'sk-[a-zA-Z0-9]{48,}'
}
def mask(self, text: str) -> str:
for pattern_type, pattern in self.patterns.items():
text = re.sub(pattern, f'[{pattern_type.upper()}_MASKED]', text)
return text
def contains_sensitive(self, text: str) -> bool:
for pattern in self.patterns.values():
if re.search(pattern, text):
return True
return False
filter = SensitiveFilter()
text = "我的邮箱是 test@example.com,电话是 13800138000"
print(filter.mask(text))
内容审核 #
python
from openai import OpenAI
client = OpenAI()
def moderate_content(text: str) -> dict:
"""内容审核"""
response = client.moderations.create(input=text)
result = response.results[0]
return {
"flagged": result.flagged,
"categories": {
category: getattr(result.categories, category)
for category in [
"hate", "harassment", "self_harm",
"sexual", "violence", "sexual_minors"
]
}
}
def safe_chat(messages: list) -> str:
"""安全对话"""
for message in messages:
if message["role"] == "user":
moderation = moderate_content(message["content"])
if moderation["flagged"]:
return "抱歉,您的消息包含不适当的内容。"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content
性能优化 #
并发处理 #
python
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def async_chat(messages: list, model: str = "gpt-4o-mini") -> str:
response = await async_client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
async def batch_chat(requests: list) -> list:
tasks = [async_chat(messages) for messages in requests]
return await asyncio.gather(*tasks)
async def main():
requests = [
[{"role": "user", "content": "问题1"}],
[{"role": "user", "content": "问题2"}],
[{"role": "user", "content": "问题3"}]
]
results = await batch_chat(requests)
for i, result in enumerate(results):
print(f"结果 {i+1}: {result}")
asyncio.run(main())
超时与重试 #
python
from openai import OpenAI
import time
from typing import Optional
client = OpenAI(timeout=60.0)
class RobustChat:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.client = OpenAI(timeout=60.0)
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def chat(
self,
messages: list,
model: str = "gpt-4o-mini",
**kwargs
) -> Optional[str]:
last_error = None
for attempt in range(self.max_retries):
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
print(f"请求失败,{delay}秒后重试... (尝试 {attempt + 1}/{self.max_retries})")
time.sleep(delay)
print(f"所有重试失败: {last_error}")
return None
robust = RobustChat()
result = robust.chat([{"role": "user", "content": "你好"}])
连接池管理 #
python
import httpx
from openai import OpenAI
http_client = httpx.Client(
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
timeout=httpx.Timeout(60.0, connect=10.0)
)
client = OpenAI(http_client=http_client)
错误处理 #
错误类型处理 #
python
from openai import (
OpenAI,
APIError,
APIConnectionError,
RateLimitError,
APITimeoutError,
AuthenticationError,
BadRequestError
)
client = OpenAI()
def handle_chat(messages: list) -> str:
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content
except AuthenticationError:
return "API Key 无效,请检查配置"
except RateLimitError:
return "请求频率超限,请稍后重试"
except APITimeoutError:
return "请求超时,请检查网络连接"
except APIConnectionError:
return "无法连接到 OpenAI 服务器"
except BadRequestError as e:
return f"请求参数错误: {e}"
except APIError as e:
return f"API 错误: {e}"
except Exception as e:
return f"未知错误: {e}"
优雅降级 #
python
class FallbackChat:
def __init__(self):
self.models = ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"]
self.client = OpenAI()
def chat(self, messages: list) -> str:
for model in self.models:
try:
response = self.client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
except Exception as e:
print(f"模型 {model} 失败: {e}")
continue
return "所有模型都不可用,请稍后重试"
fallback = FallbackChat()
监控与日志 #
请求日志 #
python
import logging
import json
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("openai_api")
class LoggingChat:
def __init__(self):
self.client = OpenAI()
def chat(self, messages: list, **kwargs) -> str:
start_time = datetime.now()
try:
response = self.client.chat.completions.create(
model=kwargs.get("model", "gpt-4o-mini"),
messages=messages,
**{k: v for k, v in kwargs.items() if k != "model"}
)
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(json.dumps({
"timestamp": start_time.isoformat(),
"model": response.model,
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"elapsed_seconds": elapsed,
"status": "success"
}))
return response.choices[0].message.content
except Exception as e:
elapsed = (datetime.now() - start_time).total_seconds()
logger.error(json.dumps({
"timestamp": start_time.isoformat(),
"elapsed_seconds": elapsed,
"status": "error",
"error": str(e)
}))
raise
logging_chat = LoggingChat()
指标收集 #
python
from dataclasses import dataclass, field
from typing import List
import statistics
@dataclass
class Metrics:
requests: int = 0
successes: int = 0
failures: int = 0
total_tokens: int = 0
total_cost: float = 0.0
latencies: List[float] = field(default_factory=list)
def record_success(self, tokens: int, cost: float, latency: float):
self.requests += 1
self.successes += 1
self.total_tokens += tokens
self.total_cost += cost
self.latencies.append(latency)
def record_failure(self):
self.requests += 1
self.failures += 1
def get_stats(self) -> dict:
return {
"total_requests": self.requests,
"success_rate": self.successes / self.requests if self.requests > 0 else 0,
"total_tokens": self.total_tokens,
"total_cost": self.total_cost,
"avg_latency": statistics.mean(self.latencies) if self.latencies else 0,
"p95_latency": statistics.quantiles(self.latencies, n=20)[18] if len(self.latencies) > 1 else 0
}
metrics = Metrics()
生产部署清单 #
部署前检查 #
text
┌─────────────────────────────────────────────────────────────┐
│ 生产部署检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 安全 │
│ □ API Key 存储在安全的位置 │
│ □ 敏感数据已加密 │
│ □ 实施了输入验证 │
│ □ 配置了内容审核 │
│ │
│ 可靠性 │
│ □ 实现了错误处理 │
│ □ 配置了重试机制 │
│ □ 设置了合理的超时 │
│ □ 准备了降级方案 │
│ │
│ 性能 │
│ □ 优化了 Token 使用 │
│ □ 实现了缓存策略 │
│ □ 配置了并发限制 │
│ □ 选择了合适的模型 │
│ │
│ 监控 │
│ □ 配置了日志记录 │
│ □ 设置了指标收集 │
│ □ 配置了告警通知 │
│ □ 准备了仪表板 │
│ │
│ 成本 │
│ □ 设置了预算限制 │
│ □ 实现了用量监控 │
│ □ 配置了成本告警 │
│ │
└─────────────────────────────────────────────────────────────┘
完整的生产级客户端 #
python
from openai import OpenAI, AsyncOpenAI
import tiktoken
import hashlib
import json
import time
import logging
from typing import Optional, List, Dict
from dataclasses import dataclass
@dataclass
class ChatResult:
content: str
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency: float
cached: bool
class ProductionOpenAI:
def __init__(
self,
model: str = "gpt-4o-mini",
max_retries: int = 3,
timeout: float = 60.0,
cache_ttl: int = 3600
):
self.client = OpenAI(timeout=timeout)
self.async_client = AsyncOpenAI(timeout=timeout)
self.model = model
self.max_retries = max_retries
self.encoding = tiktoken.encoding_for_model(model)
self.cache: Dict[str, str] = {}
self.cache_ttl = cache_ttl
self.logger = logging.getLogger(__name__)
def _get_cache_key(self, messages: List[Dict], **kwargs) -> str:
data = {"messages": messages, **kwargs}
return hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
def _count_tokens(self, text: str) -> int:
return len(self.encoding.encode(text))
def chat(
self,
messages: List[Dict],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
use_cache: bool = True,
**kwargs
) -> ChatResult:
cache_key = self._get_cache_key(messages, temperature=temperature, **kwargs)
if use_cache and cache_key in self.cache:
return ChatResult(
content=self.cache[cache_key],
model=self.model,
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
latency=0,
cached=True
)
start_time = time.time()
last_error = None
for attempt in range(self.max_retries):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
latency = time.time() - start_time
content = response.choices[0].message.content
if use_cache:
self.cache[cache_key] = content
return ChatResult(
content=content,
model=response.model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
latency=latency,
cached=False
)
except Exception as e:
last_error = e
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
raise last_error
async def async_chat(
self,
messages: List[Dict],
**kwargs
) -> ChatResult:
start_time = time.time()
response = await self.async_client.chat.completions.create(
model=self.model,
messages=messages,
**kwargs
)
latency = time.time() - start_time
return ChatResult(
content=response.choices[0].message.content,
model=response.model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
latency=latency,
cached=False
)
prod_client = ProductionOpenAI()
result = prod_client.chat([
{"role": "user", "content": "你好"}
])
print(f"回复: {result.content}")
print(f"Token: {result.total_tokens}")
print(f"延迟: {result.latency:.2f}s")
print(f"缓存: {result.cached}")
总结 #
通过本系列文档的学习,你已经掌握了 OpenAI API 的核心功能:
- 基础概念:了解 OpenAI API 的工作原理和核心特点
- 快速入门:学会配置环境和发送第一个请求
- 对话补全:掌握 Chat Completions API 的各种用法
- 参数调优:理解如何通过参数控制模型输出
- 流式响应:实现实时输出的用户体验
- 函数调用:扩展 GPT 的能力边界
- 图像生成:使用 DALL·E 创造视觉内容
- 文本嵌入:构建语义搜索和推荐系统
- Assistants API:构建功能完整的 AI 助手
- 最佳实践:生产环境部署的专业知识
继续探索和实践,你将能够构建出更加强大和智能的 AI 应用!
最后更新:2026-03-29