生产部署(Deployment) #
将 LangChain 应用部署到生产环境需要考虑性能、可靠性、可扩展性等多个方面。本章介绍如何使用 LangServe 和其他工具进行生产部署。
部署概述 #
text
┌─────────────────────────────────────────────────────────────┐
│ 生产部署考虑 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 性能: │
│ - 响应延迟 │
│ - 并发处理 │
│ - 资源使用 │
│ │
│ 可靠性: │
│ - 错误处理 │
│ - 重试机制 │
│ - 健康检查 │
│ │
│ 可扩展性: │
│ - 水平扩展 │
│ - 负载均衡 │
│ - 缓存策略 │
│ │
│ 可观测性: │
│ - 日志记录 │
│ - 性能监控 │
│ - 错误追踪 │
│ │
└─────────────────────────────────────────────────────────────┘
LangServe #
LangServe 是 LangChain 官方的部署工具,可以快速将链部署为 REST API。
安装 #
bash
pip install langserve
pip install "langserve[all]" # 安装所有依赖
基础 API 服务 #
python
from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langserve import add_routes
# 创建应用
app = FastAPI(
title="LangChain API",
version="1.0",
description="LangChain API 服务"
)
# 创建链
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{topic}")
chain = prompt | model | StrOutputParser()
# 添加路由
add_routes(app, chain, path="/chat")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
多个链端点 #
python
from fastapi import FastAPI
from langserve import add_routes
app = FastAPI()
# 聊天链
chat_chain = ChatPromptTemplate.from_template("回答: {question}") | model | StrOutputParser()
add_routes(app, chat_chain, path="/chat")
# 翻译链
translate_chain = ChatPromptTemplate.from_template(
"翻译成{lang}: {text}"
) | model | StrOutputParser()
add_routes(app, translate_chain, path="/translate")
# 摘要链
summary_chain = ChatPromptTemplate.from_template(
"用{words}字总结: {content}"
) | model | StrOutputParser()
add_routes(app, summary_chain, path="/summary")
客户端调用 #
python
from langserve import RemoteRunnable
# 连接远程链
remote_chain = RemoteRunnable("http://localhost:8000/chat")
# 同步调用
response = remote_chain.invoke({"topic": "你好"})
# 流式调用
for chunk in remote_chain.stream({"topic": "讲一个故事"}):
print(chunk, end="")
# 异步调用
import asyncio
async def async_call():
response = await remote_chain.ainvoke({"topic": "你好"})
return response
result = asyncio.run(async_call())
使用 requests 调用 #
python
import requests
# 调用 API
response = requests.post(
"http://localhost:8000/chat/invoke",
json={"input": {"topic": "你好"}}
)
print(response.json()["output"])
高级 LangServe 配置 #
带认证的 API #
python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
app = FastAPI()
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
if token != "your-secret-token":
raise HTTPException(status_code=401, detail="Invalid token")
return token
@app.post("/chat/invoke")
async def chat_invoke(
input: dict,
token: str = Depends(verify_token)
):
return chain.invoke(input)
自定义端点 #
python
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
user_id: str = None
class ChatResponse(BaseModel):
response: str
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
result = chain.invoke({"topic": request.message})
return ChatResponse(
response=result,
tokens_used=len(result.split())
)
流式响应端点 #
python
from fastapi.responses import StreamingResponse
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for chunk in chain.astream({"topic": request.message}):
yield f"data: {chunk}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
FastAPI 部署 #
完整 API 服务 #
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import asyncio
app = FastAPI(title="LLM API")
# 请求模型
class GenerateRequest(BaseModel):
prompt: str
max_tokens: Optional[int] = 1000
temperature: Optional[float] = 0.7
class GenerateResponse(BaseModel):
text: str
tokens: int
model: str
# 初始化模型
model = ChatOpenAI(model="gpt-4o-mini")
@app.post("/generate", response_model=GenerateResponse)
async def generate(request: GenerateRequest):
try:
model_with_params = ChatOpenAI(
model="gpt-4o-mini",
max_tokens=request.max_tokens,
temperature=request.temperature
)
response = await model_with_params.ainvoke(request.prompt)
return GenerateResponse(
text=response.content,
tokens=response.response_metadata.get("token_usage", {}).get("total_tokens", 0),
model="gpt-4o-mini"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
使用 Gunicorn 部署 #
bash
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
Docker 部署 #
Dockerfile #
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml #
yaml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./data:/app/data
depends_on:
- redis
redis:
image: redis:alpine
ports:
- "6379:6379"
构建和运行 #
bash
# 构建镜像
docker build -t langchain-api .
# 运行容器
docker run -p 8000:8000 -e OPENAI_API_KEY=your-key langchain-api
# 使用 docker-compose
docker-compose up -d
Kubernetes 部署 #
Deployment 配置 #
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langchain-api
spec:
replicas: 3
selector:
matchLabels:
app: langchain-api
template:
metadata:
labels:
app: langchain-api
spec:
containers:
- name: api
image: langchain-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
Service 配置 #
yaml
apiVersion: v1
kind: Service
metadata:
name: langchain-api
spec:
selector:
app: langchain-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
性能优化 #
1. 连接池 #
python
import httpx
# 使用连接池
async_client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=100),
timeout=httpx.Timeout(30.0)
)
2. 缓存 #
python
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_embedding(text_hash: str):
return embeddings.embed_query(text_hash)
def get_embedding(text: str):
text_hash = hashlib.md5(text.encode()).hexdigest()
return cached_embedding(text_hash)
3. 异步处理 #
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def process_batch(items):
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(executor, process_item, item)
for item in items
]
return await asyncio.gather(*tasks)
监控与日志 #
健康检查 #
python
@app.get("/health")
async def health():
checks = {
"api": True,
"redis": await check_redis(),
"model": await check_model()
}
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": checks
}
async def check_redis():
try:
await redis.ping()
return True
except:
return False
async def check_model():
try:
await model.ainvoke("test")
return True
except:
return False
结构化日志 #
python
import logging
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module
}
return json.dumps(log_entry)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
错误处理 #
全局异常处理 #
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"detail": str(exc) if app.debug else "An error occurred"
}
)
@app.exception_handler(RateLimitError)
async def rate_limit_handler(request: Request, exc: RateLimitError):
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded", "retry_after": 60}
)
重试机制 #
python
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def call_model_with_retry(prompt: str):
return await model.ainvoke(prompt)
最佳实践 #
1. 环境配置 #
python
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
openai_api_key: str
model_name: str = "gpt-4o-mini"
max_tokens: int = 1000
class Config:
env_file = ".env"
settings = Settings()
2. 优雅关闭 #
python
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时
await initialize_resources()
yield
# 关闭时
await cleanup_resources()
app = FastAPI(lifespan=lifespan)
3. 请求验证 #
python
from pydantic import BaseModel, Field, validator
class ChatRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=10000)
temperature: float = Field(0.7, ge=0, le=2)
@validator('message')
def validate_message(cls, v):
if not v.strip():
raise ValueError('消息不能为空')
return v
下一步 #
最后更新:2026-03-30