生产部署(Deployment) #

将 LangChain 应用部署到生产环境需要考虑性能、可靠性、可扩展性等多个方面。本章介绍如何使用 LangServe 和其他工具进行生产部署。

部署概述 #

text
┌─────────────────────────────────────────────────────────────┐
│                    生产部署考虑                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  性能:                                                     │
│  - 响应延迟                                                 │
│  - 并发处理                                                 │
│  - 资源使用                                                 │
│                                                             │
│  可靠性:                                                   │
│  - 错误处理                                                 │
│  - 重试机制                                                 │
│  - 健康检查                                                 │
│                                                             │
│  可扩展性:                                                  │
│  - 水平扩展                                                 │
│  - 负载均衡                                                 │
│  - 缓存策略                                                 │
│                                                             │
│  可观测性:                                                  │
│  - 日志记录                                                 │
│  - 性能监控                                                 │
│  - 错误追踪                                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

LangServe #

LangServe 是 LangChain 官方的部署工具,可以快速将链部署为 REST API。

安装 #

bash
pip install langserve
pip install "langserve[all]"  # 安装所有依赖

基础 API 服务 #

python
from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langserve import add_routes

# 创建应用
app = FastAPI(
    title="LangChain API",
    version="1.0",
    description="LangChain API 服务"
)

# 创建链
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{topic}")
chain = prompt | model | StrOutputParser()

# 添加路由
add_routes(app, chain, path="/chat")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

多个链端点 #

python
from fastapi import FastAPI
from langserve import add_routes

app = FastAPI()

# 聊天链
chat_chain = ChatPromptTemplate.from_template("回答: {question}") | model | StrOutputParser()
add_routes(app, chat_chain, path="/chat")

# 翻译链
translate_chain = ChatPromptTemplate.from_template(
    "翻译成{lang}: {text}"
) | model | StrOutputParser()
add_routes(app, translate_chain, path="/translate")

# 摘要链
summary_chain = ChatPromptTemplate.from_template(
    "用{words}字总结: {content}"
) | model | StrOutputParser()
add_routes(app, summary_chain, path="/summary")

客户端调用 #

python
from langserve import RemoteRunnable

# 连接远程链
remote_chain = RemoteRunnable("http://localhost:8000/chat")

# 同步调用
response = remote_chain.invoke({"topic": "你好"})

# 流式调用
for chunk in remote_chain.stream({"topic": "讲一个故事"}):
    print(chunk, end="")

# 异步调用
import asyncio

async def async_call():
    response = await remote_chain.ainvoke({"topic": "你好"})
    return response

result = asyncio.run(async_call())

使用 requests 调用 #

python
import requests

# 调用 API
response = requests.post(
    "http://localhost:8000/chat/invoke",
    json={"input": {"topic": "你好"}}
)

print(response.json()["output"])

高级 LangServe 配置 #

带认证的 API #

python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

app = FastAPI()
security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    if token != "your-secret-token":
        raise HTTPException(status_code=401, detail="Invalid token")
    return token

@app.post("/chat/invoke")
async def chat_invoke(
    input: dict,
    token: str = Depends(verify_token)
):
    return chain.invoke(input)

自定义端点 #

python
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    user_id: str = None

class ChatResponse(BaseModel):
    response: str
    tokens_used: int

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    result = chain.invoke({"topic": request.message})
    
    return ChatResponse(
        response=result,
        tokens_used=len(result.split())
    )

流式响应端点 #

python
from fastapi.responses import StreamingResponse

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for chunk in chain.astream({"topic": request.message}):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

FastAPI 部署 #

完整 API 服务 #

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI(title="LLM API")

# 请求模型
class GenerateRequest(BaseModel):
    prompt: str
    max_tokens: Optional[int] = 1000
    temperature: Optional[float] = 0.7

class GenerateResponse(BaseModel):
    text: str
    tokens: int
    model: str

# 初始化模型
model = ChatOpenAI(model="gpt-4o-mini")

@app.post("/generate", response_model=GenerateResponse)
async def generate(request: GenerateRequest):
    try:
        model_with_params = ChatOpenAI(
            model="gpt-4o-mini",
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )
        
        response = await model_with_params.ainvoke(request.prompt)
        
        return GenerateResponse(
            text=response.content,
            tokens=response.response_metadata.get("token_usage", {}).get("total_tokens", 0),
            model="gpt-4o-mini"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

使用 Gunicorn 部署 #

bash
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app

Docker 部署 #

Dockerfile #

dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml #

yaml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./data:/app/data
    depends_on:
      - redis
  
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

构建和运行 #

bash
# 构建镜像
docker build -t langchain-api .

# 运行容器
docker run -p 8000:8000 -e OPENAI_API_KEY=your-key langchain-api

# 使用 docker-compose
docker-compose up -d

Kubernetes 部署 #

Deployment 配置 #

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langchain-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langchain-api
  template:
    metadata:
      labels:
        app: langchain-api
    spec:
      containers:
      - name: api
        image: langchain-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-secrets
              key: openai-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

Service 配置 #

yaml
apiVersion: v1
kind: Service
metadata:
  name: langchain-api
spec:
  selector:
    app: langchain-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

性能优化 #

1. 连接池 #

python
import httpx

# 使用连接池
async_client = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=100),
    timeout=httpx.Timeout(30.0)
)

2. 缓存 #

python
from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def cached_embedding(text_hash: str):
    return embeddings.embed_query(text_hash)

def get_embedding(text: str):
    text_hash = hashlib.md5(text.encode()).hexdigest()
    return cached_embedding(text_hash)

3. 异步处理 #

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def process_batch(items):
    loop = asyncio.get_event_loop()
    tasks = [
        loop.run_in_executor(executor, process_item, item)
        for item in items
    ]
    return await asyncio.gather(*tasks)

监控与日志 #

健康检查 #

python
@app.get("/health")
async def health():
    checks = {
        "api": True,
        "redis": await check_redis(),
        "model": await check_model()
    }
    
    all_healthy = all(checks.values())
    
    return {
        "status": "healthy" if all_healthy else "unhealthy",
        "checks": checks
    }

async def check_redis():
    try:
        await redis.ping()
        return True
    except:
        return False

async def check_model():
    try:
        await model.ainvoke("test")
        return True
    except:
        return False

结构化日志 #

python
import logging
import json

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module
        }
        return json.dumps(log_entry)

handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())

logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

错误处理 #

全局异常处理 #

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"Unhandled exception: {exc}")
    
    return JSONResponse(
        status_code=500,
        content={
            "error": "Internal server error",
            "detail": str(exc) if app.debug else "An error occurred"
        }
    )

@app.exception_handler(RateLimitError)
async def rate_limit_handler(request: Request, exc: RateLimitError):
    return JSONResponse(
        status_code=429,
        content={"error": "Rate limit exceeded", "retry_after": 60}
    )

重试机制 #

python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def call_model_with_retry(prompt: str):
    return await model.ainvoke(prompt)

最佳实践 #

1. 环境配置 #

python
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    openai_api_key: str
    model_name: str = "gpt-4o-mini"
    max_tokens: int = 1000
    
    class Config:
        env_file = ".env"

settings = Settings()

2. 优雅关闭 #

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时
    await initialize_resources()
    yield
    # 关闭时
    await cleanup_resources()

app = FastAPI(lifespan=lifespan)

3. 请求验证 #

python
from pydantic import BaseModel, Field, validator

class ChatRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    temperature: float = Field(0.7, ge=0, le=2)
    
    @validator('message')
    def validate_message(cls, v):
        if not v.strip():
            raise ValueError('消息不能为空')
        return v

下一步 #

最后更新:2026-03-30