高级模式 #

概述 #

随着 Agent 应用复杂度的增加,需要更高级的设计模式来组织代码和逻辑。本章介绍 LangGraph 支持的几种重要高级模式。

text
┌─────────────────────────────────────────────────────────────┐
│                    高级模式概览                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 多 Agent 系统                                            │
│     - Supervisor 模式                                       │
│     - 层级结构                                              │
│     - Agent 通信                                            │
│                                                             │
│  2. 子图                                                    │
│     - 模块化设计                                            │
│     - 图嵌套                                                │
│     - 状态隔离                                              │
│                                                             │
│  3. Map-Reduce                                              │
│     - 并行处理                                              │
│     - 动态分支                                              │
│     - 结果聚合                                              │
│                                                             │
│  4. 分支与合并                                               │
│     - 条件分支                                              │
│     - 并行执行                                              │
│     - 结果合并                                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

多 Agent 系统 #

Supervisor 模式 #

Supervisor 模式是最常见的多 Agent 架构,一个中央 Supervisor 协调多个专业 Agent。

text
┌─────────────────────────────────────────────────────────────┐
│                    Supervisor 模式                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│                     ┌─────────┐                             │
│                     │  START  │                             │
│                     └────┬────┘                             │
│                          │                                  │
│                          ▼                                  │
│                    ┌──────────┐                             │
│                    │Supervisor│                             │
│                    └────┬─────┘                             │
│                         │                                   │
│         ┌───────────────┼───────────────┐                   │
│         ▼               ▼               ▼                   │
│    ┌─────────┐    ┌─────────┐    ┌─────────┐              │
│    │Research │    │  Coder  │    │ Writer  │              │
│    │ Agent   │    │  Agent  │    │ Agent   │              │
│    └────┬────┘    └────┬────┘    └────┬────┘              │
│         │               │               │                   │
│         └───────────────┼───────────────┘                   │
│                         │                                   │
│                         ▼                                   │
│                    ┌──────────┐                             │
│                    │Supervisor│                             │
│                    └────┬─────┘                             │
│                         │                                   │
│                         ▼                                   │
│                    ┌─────────┐                              │
│                    │   END   │                              │
│                    └─────────┘                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现 Supervisor 模式 #

python
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

class State(TypedDict):
    messages: list
    next_agent: str

llm = ChatOpenAI(model="gpt-4o-mini")

def supervisor(state: State):
    response = llm.invoke([
        {"role": "system", "content": "You are a supervisor. Decide which agent to use: researcher, coder, or writer."},
        *state["messages"]
    ])
    next_agent = parse_agent_choice(response.content)
    return {"next_agent": next_agent}

def researcher(state: State):
    response = llm.invoke([
        {"role": "system", "content": "You are a researcher."},
        *state["messages"]
    ])
    return {"messages": [response]}

def coder(state: State):
    response = llm.invoke([
        {"role": "system", "content": "You are a coder."},
        *state["messages"]
    ])
    return {"messages": [response]}

def writer(state: State):
    response = llm.invoke([
        {"role": "system", "content": "You are a writer."},
        *state["messages"]
    ])
    return {"messages": [response]}

def route_agent(state: State) -> Literal["researcher", "coder", "writer", END]:
    if state["next_agent"] == "FINISH":
        return END
    return state["next_agent"]

graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("coder", coder)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_agent)
graph.add_edge("researcher", "supervisor")
graph.add_edge("coder", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

使用预构建 Supervisor #

python
from langgraph.supervisor import create_supervisor

supervisor = create_supervisor(
    llm,
    [researcher_agent, coder_agent, writer_agent]
)

app = supervisor.compile()

子图(Subgraphs) #

子图允许将复杂的图分解为可重用的模块。

创建子图 #

python
from langgraph.graph import StateGraph, START, END

class SubgraphState(TypedDict):
    input: str
    output: str

def process_a(state: SubgraphState):
    return {"output": state["input"] + " processed by A"}

def process_b(state: SubgraphState):
    return {"output": state["output"] + " and B"}

subgraph = StateGraph(SubgraphState)
subgraph.add_node("a", process_a)
subgraph.add_node("b", process_b)
subgraph.add_edge(START, "a")
subgraph.add_edge("a", "b")
subgraph.add_edge("b", END)

subgraph_app = subgraph.compile()

在父图中使用子图 #

python
class ParentState(TypedDict):
    data: str
    result: str

def call_subgraph(state: ParentState):
    result = subgraph_app.invoke({"input": state["data"]})
    return {"result": result["output"]}

parent_graph = StateGraph(ParentState)
parent_graph.add_node("subgraph", call_subgraph)
parent_graph.add_edge(START, "subgraph")
parent_graph.add_edge("subgraph", END)

app = parent_graph.compile()

子图状态转换 #

python
def transform_to_subgraph(state: ParentState) -> SubgraphState:
    return {"input": state["data"], "output": ""}

def transform_from_subgraph(subgraph_state: SubgraphState) -> dict:
    return {"result": subgraph_state["output"]}

def call_subgraph_with_transform(state: ParentState):
    subgraph_input = transform_to_subgraph(state)
    subgraph_result = subgraph_app.invoke(subgraph_input)
    return transform_from_subgraph(subgraph_result)

使用 Command 导航 #

python
from langgraph.types import Command

def subgraph_node(state: State) -> Command:
    result = subgraph_app.invoke(state)
    return Command(
        update={"result": result["output"]},
        goto="next_node",
        graph=Command.PARENT
    )

Map-Reduce 模式 #

Map-Reduce 模式用于并行处理多个项目。

使用 Send 实现 #

python
from langgraph.types import Send
from typing import Annotated
from operator import add

class State(TypedDict):
    items: list[str]
    results: Annotated[list[str], add]

def map_items(state: State):
    return [Send("process", {"item": item}) for item in state["items"]]

def process_item(state: State):
    result = process(state["item"])
    return {"results": [result]}

def reduce_results(state: State):
    return {"final": "\n".join(state["results"])}

graph = StateGraph(State)
graph.add_node("map", map_items)
graph.add_node("process", process_item)
graph.add_node("reduce", reduce_results)

graph.add_edge(START, "map")
graph.add_conditional_edges("map", map_items)
graph.add_edge("process", "reduce")
graph.add_edge("reduce", END)

Map-Reduce 示例:批量翻译 #

python
from langgraph.types import Send
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class TranslationState(TypedDict):
    texts: list[str]
    target_lang: str
    translations: Annotated[list[str], add]

def distribute_translations(state: TranslationState):
    return [
        Send("translate", {"text": text, "target_lang": state["target_lang"]})
        for text in state["texts"]
    ]

def translate(state: TranslationState):
    response = llm.invoke(f"Translate to {state['target_lang']}: {state['text']}")
    return {"translations": [response.content]}

def combine(state: TranslationState):
    return {"final_translations": state["translations"]}

graph = StateGraph(TranslationState)
graph.add_node("distribute", distribute_translations)
graph.add_node("translate", translate)
graph.add_node("combine", combine)

graph.add_edge(START, "distribute")
graph.add_conditional_edges("distribute", distribute_translations)
graph.add_edge("translate", "combine")
graph.add_edge("combine", END)

分支与合并 #

并行分支 #

python
from typing import TypedDict

class State(TypedDict):
    input: str
    result_a: str
    result_b: str
    result_c: str
    final: str

def branch_a(state: State):
    return {"result_a": process_a(state["input"])}

def branch_b(state: State):
    return {"result_b": process_b(state["input"])}

def branch_c(state: State):
    return {"result_c": process_c(state["input"])}

def merge(state: State):
    return {"final": combine(state["result_a"], state["result_b"], state["result_c"])}

graph = StateGraph(State)
graph.add_node("a", branch_a)
graph.add_node("b", branch_b)
graph.add_node("c", branch_c)
graph.add_node("merge", merge)

graph.add_edge(START, "a")
graph.add_edge(START, "b")
graph.add_edge(START, "c")
graph.add_edge("a", "merge")
graph.add_edge("b", "merge")
graph.add_edge("c", "merge")
graph.add_edge("merge", END)

条件分支 #

python
from typing import Literal

def router(state: State) -> Literal["path_a", "path_b", "path_c"]:
    if state["type"] == "urgent":
        return "path_a"
    elif state["type"] == "normal":
        return "path_b"
    return "path_c"

graph.add_conditional_edges("entry", router)

层级 Agent #

两层架构 #

python
class ManagerState(TypedDict):
    task: str
    subtasks: list[str]
    results: Annotated[list, add]

def decompose(state: ManagerState):
    subtasks = llm.invoke(f"Decompose task: {state['task']}")
    return {"subtasks": parse_subtasks(subtasks)}

def assign_subtasks(state: ManagerState):
    return [
        Send("worker", {"subtask": subtask})
        for subtask in state["subtasks"]
    ]

def worker(state: ManagerState):
    result = execute_subtask(state["subtask"])
    return {"results": [result]}

def synthesize(state: ManagerState):
    final = llm.invoke(f"Combine results: {state['results']}")
    return {"final_result": final}

三层架构 #

text
┌─────────────────────────────────────────────────────────────┐
│                    三层 Agent 架构                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Layer 1: Orchestrator                                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  任务分解、结果整合、最终决策                         │   │
│  └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│  Layer 2: Managers                                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Research Mgr│  │  Code Mgr   │  │  QA Mgr     │         │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │
│         │                │                │                 │
│  Layer 3: Workers                                           │
│  ┌───┐ ┌───┐ ┌───┐  ┌───┐ ┌───┐ ┌───┐  ┌───┐ ┌───┐ ┌───┐  │
│  │W1 │ │W2 │ │W3 │  │W4 │ │W5 │ │W6 │  │W7 │ │W8 │ │W9 │  │
│  └───┘ └───┘ └───┘  └───┘ └───┘ └───┘  └───┘ └───┘ └───┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

错误处理模式 #

重试模式 #

python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def reliable_node(state: State):
    return external_api_call(state)

降级模式 #

python
def primary_node(state: State):
    try:
        result = primary_api(state)
        return {"result": result, "source": "primary"}
    except Exception:
        return {"fallback": True}

def fallback_node(state: State):
    if state.get("fallback"):
        result = fallback_api(state)
        return {"result": result, "source": "fallback"}
    return {}

错误恢复模式 #

python
def error_handler(state: State):
    if state.get("error"):
        return {"error_handled": True, "recovery_action": "retry"}
    return {}

def route_after_error(state: State):
    if state.get("error_handled"):
        if state["recovery_action"] == "retry":
            return "retry_node"
        return "abort_node"
    return "continue_node"

流式模式 #

流式处理 #

python
async def streaming_node(state: State):
    async for chunk in llm.astream(state["messages"]):
        yield {"streaming_chunk": chunk.content}
    yield {"complete": True}

增量更新 #

python
def incremental_node(state: State):
    for item in state["items"]:
        partial_result = process_item(item)
        yield {"partial_results": [partial_result]}
    yield {"complete": True}

最佳实践 #

1. 模块化设计 #

python
def create_research_agent():
    graph = StateGraph(ResearchState)
    graph.add_node("search", search_node)
    graph.add_node("analyze", analyze_node)
    graph.add_node("summarize", summarize_node)
    return graph.compile()

def create_code_agent():
    graph = StateGraph(CodeState)
    graph.add_node("plan", plan_node)
    graph.add_node("implement", implement_node)
    graph.add_node("test", test_node)
    return graph.compile()

2. 清晰的接口 #

python
class AgentInterface(TypedDict):
    input: str
    output: str
    status: Literal["pending", "running", "completed", "failed"]
    error: Optional[str]

3. 状态隔离 #

python
class ResearchState(TypedDict):
    query: str
    results: list
    _internal_cache: dict

class CodeState(TypedDict):
    task: str
    code: str
    _internal_context: dict

4. 监控和日志 #

python
import logging

logger = logging.getLogger(__name__)

def monitored_node(state: State):
    logger.info(f"Node started with state: {state}")
    try:
        result = process(state)
        logger.info(f"Node completed: {result}")
        return result
    except Exception as e:
        logger.error(f"Node failed: {e}")
        raise

下一步 #

现在你已经掌握了高级模式,接下来学习 部署与生产,了解如何将 Agent 部署到生产环境!

最后更新:2026-03-30