最佳实践(Best Practices) #

本章汇总 LangChain 开发的最佳实践,帮助你构建高质量、可维护、高性能的 LLM 应用。

架构设计 #

模块化设计 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

class QABot:
    """问答机器人模块"""
    
    def __init__(self, model_name: str = "gpt-4o-mini"):
        self.model = ChatOpenAI(model=model_name)
        self._setup_chains()
    
    def _setup_chains(self):
        """初始化链"""
        prompt = ChatPromptTemplate.from_template("""
        你是一个专业的问答助手。请回答以下问题:
        
        问题:{question}
        
        答案:
        """)
        
        self.chain = prompt | self.model | StrOutputParser()
    
    def answer(self, question: str) -> str:
        """回答问题"""
        return self.chain.invoke({"question": question})

# 使用
bot = QABot()
answer = bot.answer("什么是机器学习?")

配置管理 #

python
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    openai_api_key: str
    model_name: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1000
    
    class Config:
        env_file = ".env"

@lru_cache()
def get_settings() -> Settings:
    return Settings()

# 使用
settings = get_settings()
model = ChatOpenAI(
    model=settings.model_name,
    temperature=settings.temperature
)

性能优化 #

1. 批量处理 #

python
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

# 批量处理多个请求
questions = ["问题1", "问题2", "问题3"]
responses = model.batch(questions)

# 或使用异步批量
import asyncio

async def batch_process(items):
    tasks = [model.ainvoke(item) for item in items]
    return await asyncio.gather(*tasks)

2. 缓存策略 #

python
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache

# 设置内存缓存
set_llm_cache(InMemoryCache())

# 或使用 Redis 缓存
from langchain.cache import RedisCache
import redis

redis_client = redis.Redis(host="localhost", port=6379)
set_llm_cache(RedisCache(redis_client))

3. 嵌入缓存 #

python
from langchain.embeddings import CacheBackedEmbeddings
from langchain_community.storage import LocalFileStore
from langchain_openai import OpenAIEmbeddings

# 创建带缓存的嵌入模型
store = LocalFileStore("./embedding_cache")
cached_embeddings = CacheBackedEmbeddings.from_bytes_store(
    OpenAIEmbeddings(),
    store,
    namespace="openai"
)

4. 异步处理 #

python
import asyncio
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

async def process_concurrent(requests):
    """并发处理多个请求"""
    tasks = [model.ainvoke(req) for req in requests]
    return await asyncio.gather(*tasks)

# 使用信号量限制并发
async def process_with_limit(requests, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_process(req):
        async with semaphore:
            return await model.ainvoke(req)
    
    tasks = [limited_process(req) for req in requests]
    return await asyncio.gather(*tasks)

成本控制 #

1. Token 估算 #

python
import tiktoken

def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
    """计算 token 数量"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def estimate_cost(input_tokens: int, output_tokens: int, model: str) -> float:
    """估算成本"""
    pricing = {
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},  # 每 1M tokens
        "gpt-4o": {"input": 2.50, "output": 10.00},
    }
    
    price = pricing.get(model, {"input": 0, "output": 0})
    input_cost = (input_tokens / 1_000_000) * price["input"]
    output_cost = (output_tokens / 1_000_000) * price["output"]
    
    return input_cost + output_cost

2. 模型选择 #

python
def get_model_for_task(task_type: str) -> str:
    """根据任务类型选择合适的模型"""
    model_mapping = {
        "simple_qa": "gpt-4o-mini",      # 简单问答
        "code_gen": "gpt-4o",             # 代码生成
        "creative": "gpt-4o",             # 创意写作
        "classification": "gpt-4o-mini",  # 分类任务
        "summarization": "gpt-4o-mini",   # 摘要
    }
    return model_mapping.get(task_type, "gpt-4o-mini")

3. 提示优化 #

python
# 简洁的提示
concise_prompt = """总结以下文本(不超过50字):
{text}
"""

# 冗长的提示(避免)
verbose_prompt = """
请仔细阅读以下文本,然后进行深入分析,
提取关键信息,最后生成一个详细的摘要...
"""

错误处理 #

1. 重试机制 #

python
from tenacity import retry, stop_after_attempt, wait_exponential
from openai import RateLimitError, APIError

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((RateLimitError, APIError))
)
async def call_with_retry(model, prompt):
    return await model.ainvoke(prompt)

2. 优雅降级 #

python
class RobustLLM:
    """带降级的 LLM 调用"""
    
    def __init__(self):
        self.primary_model = ChatOpenAI(model="gpt-4o")
        self.fallback_model = ChatOpenAI(model="gpt-4o-mini")
    
    async def generate(self, prompt: str) -> str:
        try:
            return await self.primary_model.ainvoke(prompt)
        except Exception as e:
            print(f"主模型失败: {e},切换到备用模型")
            return await self.fallback_model.ainvoke(prompt)

3. 输入验证 #

python
from pydantic import BaseModel, Field, validator

class UserInput(BaseModel):
    question: str = Field(..., min_length=1, max_length=5000)
    
    @validator('question')
    def validate_question(cls, v):
        if not v.strip():
            raise ValueError('问题不能为空')
        # 检查敏感词
        sensitive_words = ['密码', '信用卡']
        for word in sensitive_words:
            if word in v:
                raise ValueError(f'问题包含敏感词: {word}')
        return v

安全考虑 #

1. API Key 管理 #

python
import os
from dotenv import load_dotenv

# 从环境变量加载
load_dotenv()

# 不要硬编码
# BAD: api_key = "sk-xxx"

# GOOD: 从环境变量读取
api_key = os.getenv("OPENAI_API_KEY")

2. 输入过滤 #

python
import re

def sanitize_input(text: str) -> str:
    """清理用户输入"""
    # 移除潜在危险的字符
    text = re.sub(r'<[^>]*>', '', text)  # 移除 HTML 标签
    text = text.strip()
    
    # 限制长度
    max_length = 10000
    if len(text) > max_length:
        text = text[:max_length]
    
    return text

3. 输出过滤 #

python
def sanitize_output(text: str) -> str:
    """清理模型输出"""
    # 移除可能的敏感信息模式
    patterns = [
        r'\b\d{16}\b',  # 信用卡号
        r'\b[A-Z]{2}\d{6}\b',  # 护照号
    ]
    
    for pattern in patterns:
        text = re.sub(pattern, '[已脱敏]', text)
    
    return text

4. 速率限制 #

python
from fastapi import FastAPI, HTTPException
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

app = FastAPI()

@app.post("/chat", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def chat(request: ChatRequest):
    return {"response": "..."}

提示工程 #

1. 清晰的指令 #

python
# 好的提示
good_prompt = """
你是一个专业的技术文档撰写者。
请按照以下格式回答问题:

1. 概述(1-2句话)
2. 详细解释
3. 示例代码(如果适用)
4. 注意事项

问题:{question}
"""

# 不好的提示
bad_prompt = "回答这个问题:{question}"

2. Few-shot 示例 #

python
few_shot_prompt = """
根据示例进行分类:

示例1:
文本:这款手机电池续航很长
分类:正面

示例2:
文本:服务态度太差了
分类:负面

示例3:
文本:产品质量一般
分类:中性

请对以下文本分类:
文本:{text}
分类:
"""

3. 结构化输出 #

python
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel

class AnalysisResult(BaseModel):
    sentiment: str
    confidence: float
    keywords: list[str]

parser = PydanticOutputParser(pydantic_object=AnalysisResult)

prompt = ChatPromptTemplate.from_messages([
    ("system", "分析文本情感。{format_instructions}"),
    ("human", "{text}")
])

chain = prompt | model | parser

测试 #

1. 单元测试 #

python
import pytest
from unittest.mock import Mock, patch

def test_qa_bot():
    """测试问答机器人"""
    bot = QABot()
    
    with patch.object(bot.model, 'invoke') as mock_invoke:
        mock_invoke.return_value = "这是一个测试回答"
        
        result = bot.answer("测试问题")
        
        assert result == "这是一个测试回答"
        mock_invoke.assert_called_once()

2. 集成测试 #

python
import pytest
from fastapi.testclient import TestClient

def test_chat_endpoint():
    """测试聊天端点"""
    client = TestClient(app)
    
    response = client.post(
        "/chat",
        json={"message": "你好"}
    )
    
    assert response.status_code == 200
    assert "response" in response.json()

3. 评估测试 #

python
def evaluate_rag_accuracy(questions, expected_answers, rag_chain):
    """评估 RAG 准确率"""
    correct = 0
    
    for question, expected in zip(questions, expected_answers):
        answer = rag_chain.invoke(question)
        
        # 简单的关键词匹配
        if any(keyword in answer.lower() for keyword in expected.lower().split()):
            correct += 1
    
    return correct / len(questions)

监控与日志 #

1. 结构化日志 #

python
import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def log(self, level: str, message: str, **kwargs):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "level": level,
            "message": message,
            **kwargs
        }
        self.logger.log(
            getattr(logging, level),
            json.dumps(log_entry)
        )

logger = StructuredLogger(__name__)
logger.log("INFO", "Request processed", user_id="123", tokens=100)

2. 性能监控 #

python
import time
from functools import wraps

def timing_decorator(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        duration = time.time() - start
        
        logger.log(
            "INFO",
            f"{func.__name__} completed",
            duration_seconds=duration
        )
        
        return result
    return wrapper

检查清单 #

开发阶段 #

  • [ ] 使用环境变量管理敏感信息
  • [ ] 实现适当的错误处理
  • [ ] 添加输入验证
  • [ ] 编写单元测试
  • [ ] 使用类型提示

部署阶段 #

  • [ ] 配置健康检查
  • [ ] 设置速率限制
  • [ ] 启用日志记录
  • [ ] 配置监控告警
  • [ ] 准备回滚方案

生产阶段 #

  • [ ] 监控 Token 使用
  • [ ] 追踪响应延迟
  • [ ] 记录错误率
  • [ ] 定期备份数据
  • [ ] 更新依赖版本

下一步 #

最后更新:2026-03-30