部署与生产 #
概述 #
将 LangGraph Agent 部署到生产环境需要考虑多个方面:API 服务、持久化存储、监控、扩展性和安全性。本章介绍各种部署选项和最佳实践。
text
┌─────────────────────────────────────────────────────────────┐
│ 生产部署架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户请求 ───> 负载均衡 ───> API 服务 │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ Agent │ │
│ │ Runtime │ │
│ └────┬────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ LLM │ │ 工具/API│ │ 存储 │ │
│ │ Provider│ │ 服务 │ │ 数据库 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 监控: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 日志 │ 追踪 │ 指标 │ 告警 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
部署选项 #
1. 自托管 API 服务 #
使用 FastAPI 或 Flask 创建 API 服务:
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
app = FastAPI()
class InvokeRequest(BaseModel):
message: str
thread_id: str
checkpointer = PostgresSaver("postgresql://localhost/langgraph")
graph_app = graph.compile(checkpointer=checkpointer)
@app.post("/invoke")
async def invoke_agent(request: InvokeRequest):
config = {"configurable": {"thread_id": request.thread_id}}
result = graph_app.invoke(
{"messages": [("user", request.message)]},
config
)
return {"result": result}
@app.post("/stream")
async def stream_agent(request: InvokeRequest):
config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in graph_app.astream_events(
{"messages": [("user", request.message)]},
config,
version="v2"
):
yield f"data: {event}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.get("/state/{thread_id}")
async def get_state(thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
state = graph_app.get_state(config)
return {"state": state.values}
2. LangGraph Cloud #
LangGraph Cloud 是官方托管服务:
bash
langgraph-cli deploy
3. Docker 部署 #
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
version: '3.8'
services:
langgraph-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://postgres:password@db:5432/langgraph
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_DB=langgraph
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
4. Kubernetes 部署 #
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langgraph-api
spec:
replicas: 3
selector:
matchLabels:
app: langgraph-api
template:
metadata:
labels:
app: langgraph-api
spec:
containers:
- name: api
image: langgraph-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: langgraph-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: langgraph-api
spec:
selector:
app: langgraph-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
LangGraph Cloud #
项目配置 #
toml
[project]
name = "my-agent"
version = "0.1.0"
[langgraph]
graph = "./agent.py:graph"
[dependencies]
langgraph = ">=0.2.0"
langchain-openai = ">=0.1.0"
部署命令 #
bash
langgraph-cli deploy
langgraph-cli deploy --env production
langgraph-cli logs
langgraph-cli scale --replicas 3
API 使用 #
python
from langgraph_sdk import get_client
client = get_client(url="https://your-deployment.langgraph.cloud")
assistant = await client.assistants.create(graph_id="agent")
thread = await client.threads.create()
result = await client.runs.wait(
thread_id=thread["thread_id"],
assistant_id=assistant["assistant_id"],
input={"messages": [{"role": "user", "content": "Hello"}]}
)
持久化配置 #
PostgreSQL Checkpointer #
python
from langgraph.checkpoint.postgres import PostgresSaver
import os
conn_string = os.environ.get("DATABASE_URL", "postgresql://localhost/langgraph")
checkpointer = PostgresSaver(conn_string)
checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
Redis Checkpointer #
python
from langgraph.checkpoint.redis import RedisSaver
redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379")
checkpointer = RedisSaver(redis_url)
app = graph.compile(checkpointer=checkpointer)
连接池配置 #
python
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
conn_string,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True
)
checkpointer = PostgresSaver(engine)
监控与可观测性 #
LangSmith 集成 #
python
import os
os.environ["LANGSMITH_API_KEY"] = "your-key"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "production"
# 所有执行自动追踪
结构化日志 #
python
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if hasattr(record, "thread_id"):
log_data["thread_id"] = record.thread_id
return json.dumps(log_data)
logger = logging.getLogger("langgraph")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
def logged_node(state: State):
logger.info("Processing request", extra={"thread_id": state.get("thread_id")})
result = process(state)
logger.info("Processing complete", extra={"thread_id": state.get("thread_id")})
return result
Prometheus 指标 #
python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
REQUEST_COUNT = Counter(
'langgraph_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'langgraph_request_latency_seconds',
'Request latency',
['method', 'endpoint']
)
@app.middleware("http")
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(time.time() - start_time)
return response
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
健康检查 #
python
@app.get("/health")
async def health_check():
checks = {
"database": await check_database(),
"redis": await check_redis(),
"llm": await check_llm_provider(),
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return JSONResponse(
status_code=status_code,
content={"status": "healthy" if all_healthy else "unhealthy", "checks": checks}
)
async def check_database():
try:
async with engine.connect() as conn:
await conn.execute("SELECT 1")
return True
except:
return False
安全配置 #
API 密钥认证 #
python
from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != os.environ.get("API_KEY"):
raise HTTPException(status_code=401, detail="Invalid API key")
return api_key
@app.post("/invoke", dependencies=[Depends(verify_api_key)])
async def invoke_agent(request: InvokeRequest):
...
速率限制 #
python
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/invoke")
@limiter.limit("10/minute")
async def invoke_agent(request: InvokeRequest):
...
输入验证 #
python
from pydantic import BaseModel, Field, validator
class InvokeRequest(BaseModel):
message: str = Field(..., min_length=1, max_length=10000)
thread_id: str = Field(..., pattern=r"^[a-zA-Z0-9-]+$")
@validator("message")
def sanitize_message(cls, v):
if "<script>" in v.lower():
raise ValueError("Invalid content")
return v
敏感信息处理 #
python
import os
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
DATABASE_URL = os.environ.get("DATABASE_URL")
def mask_sensitive(value: str, visible: int = 4) -> str:
if len(value) <= visible:
return "*" * len(value)
return value[:visible] + "*" * (len(value) - visible)
logger.info(f"Using API key: {mask_sensitive(OPENAI_API_KEY)}")
性能优化 #
异步处理 #
python
async def async_node(state: State):
tasks = [
async_operation(item)
for item in state["items"]
]
results = await asyncio.gather(*tasks)
return {"results": results}
缓存策略 #
python
from functools import lru_cache
from langgraph.cache.memory import InMemoryCache
@lru_cache(maxsize=1000)
def cached_llm_call(prompt: str) -> str:
return llm.invoke(prompt)
graph.add_node("cached_llm", cached_llm_node, cache_policy=CachePolicy(ttl=3600))
app = graph.compile(cache=InMemoryCache())
并发控制 #
python
from asyncio import Semaphore
semaphore = Semaphore(10)
async def rate_limited_node(state: State):
async with semaphore:
return await external_api(state)
扩展性 #
水平扩展 #
text
┌─────────────────────────────────────────────────────────────┐
│ 水平扩展架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Load │ │
│ │ Balancer │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Instance 1│ │ Instance 2│ │ Instance 3│ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Shared │ │
│ │ Storage │ │
│ │ (Postgres) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
无状态设计 #
python
def stateless_node(state: State, config: RunnableConfig):
thread_id = config["configurable"]["thread_id"]
state = load_state_from_db(thread_id)
result = process(state)
save_state_to_db(thread_id, result)
return result
最佳实践清单 #
部署前检查 #
text
┌─────────────────────────────────────────────────────────────┐
│ 部署检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 安全: │
│ □ API 密钥已配置 │
│ □ 敏感信息已加密 │
│ □ 速率限制已启用 │
│ □ 输入验证已实现 │
│ │
│ 可靠性: │
│ □ 持久化存储已配置 │
│ □ 错误处理已实现 │
│ □ 重试机制已添加 │
│ □ 超时已设置 │
│ │
│ 可观测性: │
│ □ 日志已配置 │
│ □ 追踪已启用 │
│ □ 指标已收集 │
│ □ 告警已设置 │
│ │
│ 性能: │
│ □ 缓存已实现 │
│ □ 并发已优化 │
│ □ 资源限制已设置 │
│ │
│ 测试: │
│ □ 单元测试已通过 │
│ □ 集成测试已通过 │
│ □ 负载测试已通过 │
│ │
└─────────────────────────────────────────────────────────────┘
生产配置模板 #
python
import os
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph
def create_production_app():
checkpointer = PostgresSaver(
os.environ["DATABASE_URL"],
pool_size=10,
max_overflow=20
)
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["sensitive_operations"],
)
return app
app = create_production_app()
总结 #
恭喜你完成了 LangGraph 文档的学习!现在你已经掌握了:
- LangGraph 的核心概念和架构
- 状态管理和控制流设计
- 工具使用和人机交互
- 记忆与持久化机制
- 高级设计模式
- 生产部署最佳实践
继续探索和实践,构建更强大的 AI Agent 应用!
最后更新:2026-03-30