部署与生产 #

概述 #

将 LangGraph Agent 部署到生产环境需要考虑多个方面:API 服务、持久化存储、监控、扩展性和安全性。本章介绍各种部署选项和最佳实践。

text
┌─────────────────────────────────────────────────────────────┐
│                    生产部署架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   用户请求 ───> 负载均衡 ───> API 服务                       │
│                                    │                        │
│                                    ▼                        │
│                              ┌─────────┐                    │
│                              │ Agent   │                    │
│                              │ Runtime │                    │
│                              └────┬────┘                    │
│                                   │                         │
│              ┌────────────────────┼────────────────────┐    │
│              ▼                    ▼                    ▼    │
│        ┌─────────┐         ┌─────────┐         ┌─────────┐ │
│        │   LLM   │         │ 工具/API│         │ 存储    │ │
│        │ Provider│         │ 服务    │         │ 数据库  │ │
│        └─────────┘         └─────────┘         └─────────┘ │
│                                                             │
│   监控:                                                    │
│   ┌─────────────────────────────────────────────────────┐  │
│   │  日志 │ 追踪 │ 指标 │ 告警                           │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

部署选项 #

1. 自托管 API 服务 #

使用 FastAPI 或 Flask 创建 API 服务:

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph

app = FastAPI()

class InvokeRequest(BaseModel):
    message: str
    thread_id: str

checkpointer = PostgresSaver("postgresql://localhost/langgraph")
graph_app = graph.compile(checkpointer=checkpointer)

@app.post("/invoke")
async def invoke_agent(request: InvokeRequest):
    config = {"configurable": {"thread_id": request.thread_id}}
    result = graph_app.invoke(
        {"messages": [("user", request.message)]},
        config
    )
    return {"result": result}

@app.post("/stream")
async def stream_agent(request: InvokeRequest):
    config = {"configurable": {"thread_id": request.thread_id}}
    
    async def generate():
        async for event in graph_app.astream_events(
            {"messages": [("user", request.message)]},
            config,
            version="v2"
        ):
            yield f"data: {event}\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

@app.get("/state/{thread_id}")
async def get_state(thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}
    state = graph_app.get_state(config)
    return {"state": state.values}

2. LangGraph Cloud #

LangGraph Cloud 是官方托管服务:

bash
langgraph-cli deploy

3. Docker 部署 #

dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
version: '3.8'

services:
  langgraph-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=postgresql://postgres:password@db:5432/langgraph
    depends_on:
      - db
      - redis

  db:
    image: postgres:15
    environment:
      - POSTGRES_DB=langgraph
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

4. Kubernetes 部署 #

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph-api
  template:
    metadata:
      labels:
        app: langgraph-api
    spec:
      containers:
      - name: api
        image: langgraph-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: langgraph-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: langgraph-api
spec:
  selector:
    app: langgraph-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

LangGraph Cloud #

项目配置 #

toml
[project]
name = "my-agent"
version = "0.1.0"

[langgraph]
graph = "./agent.py:graph"

[dependencies]
langgraph = ">=0.2.0"
langchain-openai = ">=0.1.0"

部署命令 #

bash
langgraph-cli deploy

langgraph-cli deploy --env production

langgraph-cli logs

langgraph-cli scale --replicas 3

API 使用 #

python
from langgraph_sdk import get_client

client = get_client(url="https://your-deployment.langgraph.cloud")

assistant = await client.assistants.create(graph_id="agent")

thread = await client.threads.create()

result = await client.runs.wait(
    thread_id=thread["thread_id"],
    assistant_id=assistant["assistant_id"],
    input={"messages": [{"role": "user", "content": "Hello"}]}
)

持久化配置 #

PostgreSQL Checkpointer #

python
from langgraph.checkpoint.postgres import PostgresSaver
import os

conn_string = os.environ.get("DATABASE_URL", "postgresql://localhost/langgraph")

checkpointer = PostgresSaver(conn_string)
checkpointer.setup()

app = graph.compile(checkpointer=checkpointer)

Redis Checkpointer #

python
from langgraph.checkpoint.redis import RedisSaver

redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379")
checkpointer = RedisSaver(redis_url)

app = graph.compile(checkpointer=checkpointer)

连接池配置 #

python
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    conn_string,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True
)

checkpointer = PostgresSaver(engine)

监控与可观测性 #

LangSmith 集成 #

python
import os

os.environ["LANGSMITH_API_KEY"] = "your-key"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "production"

# 所有执行自动追踪

结构化日志 #

python
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if hasattr(record, "thread_id"):
            log_data["thread_id"] = record.thread_id
        return json.dumps(log_data)

logger = logging.getLogger("langgraph")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

def logged_node(state: State):
    logger.info("Processing request", extra={"thread_id": state.get("thread_id")})
    result = process(state)
    logger.info("Processing complete", extra={"thread_id": state.get("thread_id")})
    return result

Prometheus 指标 #

python
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

REQUEST_COUNT = Counter(
    'langgraph_requests_total',
    'Total requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'langgraph_request_latency_seconds',
    'Request latency',
    ['method', 'endpoint']
)

@app.middleware("http")
async def metrics_middleware(request, call_next):
    start_time = time.time()
    response = await call_next(request)
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(time.time() - start_time)
    
    return response

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

健康检查 #

python
@app.get("/health")
async def health_check():
    checks = {
        "database": await check_database(),
        "redis": await check_redis(),
        "llm": await check_llm_provider(),
    }
    
    all_healthy = all(checks.values())
    status_code = 200 if all_healthy else 503
    
    return JSONResponse(
        status_code=status_code,
        content={"status": "healthy" if all_healthy else "unhealthy", "checks": checks}
    )

async def check_database():
    try:
        async with engine.connect() as conn:
            await conn.execute("SELECT 1")
        return True
    except:
        return False

安全配置 #

API 密钥认证 #

python
from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def verify_api_key(api_key: str = Security(api_key_header)):
    if api_key != os.environ.get("API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return api_key

@app.post("/invoke", dependencies=[Depends(verify_api_key)])
async def invoke_agent(request: InvokeRequest):
    ...

速率限制 #

python
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/invoke")
@limiter.limit("10/minute")
async def invoke_agent(request: InvokeRequest):
    ...

输入验证 #

python
from pydantic import BaseModel, Field, validator

class InvokeRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=10000)
    thread_id: str = Field(..., pattern=r"^[a-zA-Z0-9-]+$")
    
    @validator("message")
    def sanitize_message(cls, v):
        if "<script>" in v.lower():
            raise ValueError("Invalid content")
        return v

敏感信息处理 #

python
import os
from dotenv import load_dotenv

load_dotenv()

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
DATABASE_URL = os.environ.get("DATABASE_URL")

def mask_sensitive(value: str, visible: int = 4) -> str:
    if len(value) <= visible:
        return "*" * len(value)
    return value[:visible] + "*" * (len(value) - visible)

logger.info(f"Using API key: {mask_sensitive(OPENAI_API_KEY)}")

性能优化 #

异步处理 #

python
async def async_node(state: State):
    tasks = [
        async_operation(item)
        for item in state["items"]
    ]
    results = await asyncio.gather(*tasks)
    return {"results": results}

缓存策略 #

python
from functools import lru_cache
from langgraph.cache.memory import InMemoryCache

@lru_cache(maxsize=1000)
def cached_llm_call(prompt: str) -> str:
    return llm.invoke(prompt)

graph.add_node("cached_llm", cached_llm_node, cache_policy=CachePolicy(ttl=3600))
app = graph.compile(cache=InMemoryCache())

并发控制 #

python
from asyncio import Semaphore

semaphore = Semaphore(10)

async def rate_limited_node(state: State):
    async with semaphore:
        return await external_api(state)

扩展性 #

水平扩展 #

text
┌─────────────────────────────────────────────────────────────┐
│                    水平扩展架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│                    ┌─────────────┐                          │
│                    │ Load        │                          │
│                    │ Balancer    │                          │
│                    └──────┬──────┘                          │
│                           │                                 │
│         ┌─────────────────┼─────────────────┐               │
│         │                 │                 │               │
│         ▼                 ▼                 ▼               │
│   ┌───────────┐    ┌───────────┐    ┌───────────┐          │
│   │ Instance 1│    │ Instance 2│    │ Instance 3│          │
│   └─────┬─────┘    └─────┬─────┘    └─────┬─────┘          │
│         │                 │                 │               │
│         └─────────────────┼─────────────────┘               │
│                           │                                 │
│                    ┌──────┴──────┐                          │
│                    │  Shared     │                          │
│                    │  Storage    │                          │
│                    │  (Postgres) │                          │
│                    └─────────────┘                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

无状态设计 #

python
def stateless_node(state: State, config: RunnableConfig):
    thread_id = config["configurable"]["thread_id"]
    
    state = load_state_from_db(thread_id)
    
    result = process(state)
    
    save_state_to_db(thread_id, result)
    
    return result

最佳实践清单 #

部署前检查 #

text
┌─────────────────────────────────────────────────────────────┐
│                    部署检查清单                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  安全:                                                     │
│  □ API 密钥已配置                                          │
│  □ 敏感信息已加密                                          │
│  □ 速率限制已启用                                          │
│  □ 输入验证已实现                                          │
│                                                             │
│  可靠性:                                                   │
│  □ 持久化存储已配置                                        │
│  □ 错误处理已实现                                          │
│  □ 重试机制已添加                                          │
│  □ 超时已设置                                              │
│                                                             │
│  可观测性:                                                 │
│  □ 日志已配置                                              │
│  □ 追踪已启用                                              │
│  □ 指标已收集                                              │
│  □ 告警已设置                                              │
│                                                             │
│  性能:                                                     │
│  □ 缓存已实现                                              │
│  □ 并发已优化                                              │
│  □ 资源限制已设置                                          │
│                                                             │
│  测试:                                                     │
│  □ 单元测试已通过                                          │
│  □ 集成测试已通过                                          │
│  □ 负载测试已通过                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

生产配置模板 #

python
import os
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph

def create_production_app():
    checkpointer = PostgresSaver(
        os.environ["DATABASE_URL"],
        pool_size=10,
        max_overflow=20
    )
    
    app = graph.compile(
        checkpointer=checkpointer,
        interrupt_before=["sensitive_operations"],
    )
    
    return app

app = create_production_app()

总结 #

恭喜你完成了 LangGraph 文档的学习!现在你已经掌握了:

  • LangGraph 的核心概念和架构
  • 状态管理和控制流设计
  • 工具使用和人机交互
  • 记忆与持久化机制
  • 高级设计模式
  • 生产部署最佳实践

继续探索和实践,构建更强大的 AI Agent 应用!

最后更新:2026-03-30