高级模式 #
概述 #
随着 Agent 应用复杂度的增加,需要更高级的设计模式来组织代码和逻辑。本章介绍 LangGraph 支持的几种重要高级模式。
text
┌─────────────────────────────────────────────────────────────┐
│ 高级模式概览 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 多 Agent 系统 │
│ - Supervisor 模式 │
│ - 层级结构 │
│ - Agent 通信 │
│ │
│ 2. 子图 │
│ - 模块化设计 │
│ - 图嵌套 │
│ - 状态隔离 │
│ │
│ 3. Map-Reduce │
│ - 并行处理 │
│ - 动态分支 │
│ - 结果聚合 │
│ │
│ 4. 分支与合并 │
│ - 条件分支 │
│ - 并行执行 │
│ - 结果合并 │
│ │
└─────────────────────────────────────────────────────────────┘
多 Agent 系统 #
Supervisor 模式 #
Supervisor 模式是最常见的多 Agent 架构,一个中央 Supervisor 协调多个专业 Agent。
text
┌─────────────────────────────────────────────────────────────┐
│ Supervisor 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ START │ │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │Supervisor│ │
│ └────┬─────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Research │ │ Coder │ │ Writer │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │Supervisor│ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ END │ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
实现 Supervisor 模式 #
python
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
class State(TypedDict):
messages: list
next_agent: str
llm = ChatOpenAI(model="gpt-4o-mini")
def supervisor(state: State):
response = llm.invoke([
{"role": "system", "content": "You are a supervisor. Decide which agent to use: researcher, coder, or writer."},
*state["messages"]
])
next_agent = parse_agent_choice(response.content)
return {"next_agent": next_agent}
def researcher(state: State):
response = llm.invoke([
{"role": "system", "content": "You are a researcher."},
*state["messages"]
])
return {"messages": [response]}
def coder(state: State):
response = llm.invoke([
{"role": "system", "content": "You are a coder."},
*state["messages"]
])
return {"messages": [response]}
def writer(state: State):
response = llm.invoke([
{"role": "system", "content": "You are a writer."},
*state["messages"]
])
return {"messages": [response]}
def route_agent(state: State) -> Literal["researcher", "coder", "writer", END]:
if state["next_agent"] == "FINISH":
return END
return state["next_agent"]
graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("coder", coder)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route_agent)
graph.add_edge("researcher", "supervisor")
graph.add_edge("coder", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile()
使用预构建 Supervisor #
python
from langgraph.supervisor import create_supervisor
supervisor = create_supervisor(
llm,
[researcher_agent, coder_agent, writer_agent]
)
app = supervisor.compile()
子图(Subgraphs) #
子图允许将复杂的图分解为可重用的模块。
创建子图 #
python
from langgraph.graph import StateGraph, START, END
class SubgraphState(TypedDict):
input: str
output: str
def process_a(state: SubgraphState):
return {"output": state["input"] + " processed by A"}
def process_b(state: SubgraphState):
return {"output": state["output"] + " and B"}
subgraph = StateGraph(SubgraphState)
subgraph.add_node("a", process_a)
subgraph.add_node("b", process_b)
subgraph.add_edge(START, "a")
subgraph.add_edge("a", "b")
subgraph.add_edge("b", END)
subgraph_app = subgraph.compile()
在父图中使用子图 #
python
class ParentState(TypedDict):
data: str
result: str
def call_subgraph(state: ParentState):
result = subgraph_app.invoke({"input": state["data"]})
return {"result": result["output"]}
parent_graph = StateGraph(ParentState)
parent_graph.add_node("subgraph", call_subgraph)
parent_graph.add_edge(START, "subgraph")
parent_graph.add_edge("subgraph", END)
app = parent_graph.compile()
子图状态转换 #
python
def transform_to_subgraph(state: ParentState) -> SubgraphState:
return {"input": state["data"], "output": ""}
def transform_from_subgraph(subgraph_state: SubgraphState) -> dict:
return {"result": subgraph_state["output"]}
def call_subgraph_with_transform(state: ParentState):
subgraph_input = transform_to_subgraph(state)
subgraph_result = subgraph_app.invoke(subgraph_input)
return transform_from_subgraph(subgraph_result)
使用 Command 导航 #
python
from langgraph.types import Command
def subgraph_node(state: State) -> Command:
result = subgraph_app.invoke(state)
return Command(
update={"result": result["output"]},
goto="next_node",
graph=Command.PARENT
)
Map-Reduce 模式 #
Map-Reduce 模式用于并行处理多个项目。
使用 Send 实现 #
python
from langgraph.types import Send
from typing import Annotated
from operator import add
class State(TypedDict):
items: list[str]
results: Annotated[list[str], add]
def map_items(state: State):
return [Send("process", {"item": item}) for item in state["items"]]
def process_item(state: State):
result = process(state["item"])
return {"results": [result]}
def reduce_results(state: State):
return {"final": "\n".join(state["results"])}
graph = StateGraph(State)
graph.add_node("map", map_items)
graph.add_node("process", process_item)
graph.add_node("reduce", reduce_results)
graph.add_edge(START, "map")
graph.add_conditional_edges("map", map_items)
graph.add_edge("process", "reduce")
graph.add_edge("reduce", END)
Map-Reduce 示例:批量翻译 #
python
from langgraph.types import Send
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class TranslationState(TypedDict):
texts: list[str]
target_lang: str
translations: Annotated[list[str], add]
def distribute_translations(state: TranslationState):
return [
Send("translate", {"text": text, "target_lang": state["target_lang"]})
for text in state["texts"]
]
def translate(state: TranslationState):
response = llm.invoke(f"Translate to {state['target_lang']}: {state['text']}")
return {"translations": [response.content]}
def combine(state: TranslationState):
return {"final_translations": state["translations"]}
graph = StateGraph(TranslationState)
graph.add_node("distribute", distribute_translations)
graph.add_node("translate", translate)
graph.add_node("combine", combine)
graph.add_edge(START, "distribute")
graph.add_conditional_edges("distribute", distribute_translations)
graph.add_edge("translate", "combine")
graph.add_edge("combine", END)
分支与合并 #
并行分支 #
python
from typing import TypedDict
class State(TypedDict):
input: str
result_a: str
result_b: str
result_c: str
final: str
def branch_a(state: State):
return {"result_a": process_a(state["input"])}
def branch_b(state: State):
return {"result_b": process_b(state["input"])}
def branch_c(state: State):
return {"result_c": process_c(state["input"])}
def merge(state: State):
return {"final": combine(state["result_a"], state["result_b"], state["result_c"])}
graph = StateGraph(State)
graph.add_node("a", branch_a)
graph.add_node("b", branch_b)
graph.add_node("c", branch_c)
graph.add_node("merge", merge)
graph.add_edge(START, "a")
graph.add_edge(START, "b")
graph.add_edge(START, "c")
graph.add_edge("a", "merge")
graph.add_edge("b", "merge")
graph.add_edge("c", "merge")
graph.add_edge("merge", END)
条件分支 #
python
from typing import Literal
def router(state: State) -> Literal["path_a", "path_b", "path_c"]:
if state["type"] == "urgent":
return "path_a"
elif state["type"] == "normal":
return "path_b"
return "path_c"
graph.add_conditional_edges("entry", router)
层级 Agent #
两层架构 #
python
class ManagerState(TypedDict):
task: str
subtasks: list[str]
results: Annotated[list, add]
def decompose(state: ManagerState):
subtasks = llm.invoke(f"Decompose task: {state['task']}")
return {"subtasks": parse_subtasks(subtasks)}
def assign_subtasks(state: ManagerState):
return [
Send("worker", {"subtask": subtask})
for subtask in state["subtasks"]
]
def worker(state: ManagerState):
result = execute_subtask(state["subtask"])
return {"results": [result]}
def synthesize(state: ManagerState):
final = llm.invoke(f"Combine results: {state['results']}")
return {"final_result": final}
三层架构 #
text
┌─────────────────────────────────────────────────────────────┐
│ 三层 Agent 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: Orchestrator │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 任务分解、结果整合、最终决策 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ Layer 2: Managers │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Research Mgr│ │ Code Mgr │ │ QA Mgr │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ Layer 3: Workers │
│ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ │W1 │ │W2 │ │W3 │ │W4 │ │W5 │ │W6 │ │W7 │ │W8 │ │W9 │ │
│ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ │
│ │
└─────────────────────────────────────────────────────────────┘
错误处理模式 #
重试模式 #
python
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def reliable_node(state: State):
return external_api_call(state)
降级模式 #
python
def primary_node(state: State):
try:
result = primary_api(state)
return {"result": result, "source": "primary"}
except Exception:
return {"fallback": True}
def fallback_node(state: State):
if state.get("fallback"):
result = fallback_api(state)
return {"result": result, "source": "fallback"}
return {}
错误恢复模式 #
python
def error_handler(state: State):
if state.get("error"):
return {"error_handled": True, "recovery_action": "retry"}
return {}
def route_after_error(state: State):
if state.get("error_handled"):
if state["recovery_action"] == "retry":
return "retry_node"
return "abort_node"
return "continue_node"
流式模式 #
流式处理 #
python
async def streaming_node(state: State):
async for chunk in llm.astream(state["messages"]):
yield {"streaming_chunk": chunk.content}
yield {"complete": True}
增量更新 #
python
def incremental_node(state: State):
for item in state["items"]:
partial_result = process_item(item)
yield {"partial_results": [partial_result]}
yield {"complete": True}
最佳实践 #
1. 模块化设计 #
python
def create_research_agent():
graph = StateGraph(ResearchState)
graph.add_node("search", search_node)
graph.add_node("analyze", analyze_node)
graph.add_node("summarize", summarize_node)
return graph.compile()
def create_code_agent():
graph = StateGraph(CodeState)
graph.add_node("plan", plan_node)
graph.add_node("implement", implement_node)
graph.add_node("test", test_node)
return graph.compile()
2. 清晰的接口 #
python
class AgentInterface(TypedDict):
input: str
output: str
status: Literal["pending", "running", "completed", "failed"]
error: Optional[str]
3. 状态隔离 #
python
class ResearchState(TypedDict):
query: str
results: list
_internal_cache: dict
class CodeState(TypedDict):
task: str
code: str
_internal_context: dict
4. 监控和日志 #
python
import logging
logger = logging.getLogger(__name__)
def monitored_node(state: State):
logger.info(f"Node started with state: {state}")
try:
result = process(state)
logger.info(f"Node completed: {result}")
return result
except Exception as e:
logger.error(f"Node failed: {e}")
raise
下一步 #
现在你已经掌握了高级模式,接下来学习 部署与生产,了解如何将 Agent 部署到生产环境!
最后更新:2026-03-30