状态管理 #
概述 #
状态是 LangGraph 的核心概念之一。状态在图的节点之间共享,记录应用程序的当前快照。理解状态管理对于构建复杂的 Agent 应用至关重要。
text
┌─────────────────────────────────────────────────────────────┐
│ 状态管理架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ State │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Schema │ │ Reducer │ │ Channel │ │ │
│ │ │ 数据结构 │ │ 更新逻辑 │ │ 通信通道 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Node A │ │ Node B │ │ Node C │ │
│ │ 读取 │ │ 更新 │ │ 读取 │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
定义状态 Schema #
使用 TypedDict #
TypedDict 是定义状态最常用的方式:
python
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Sequence[BaseMessage]
documents: list[str]
current_step: str
retry_count: int
使用 Pydantic #
Pydantic 提供数据验证和默认值:
python
from pydantic import BaseModel, Field
from typing import Optional
class AgentState(BaseModel):
messages: list = Field(default_factory=list)
documents: list[str] = Field(default_factory=list)
current_step: str = "init"
retry_count: int = 0
error: Optional[str] = None
使用 dataclass #
dataclass 提供默认值和简洁语法:
python
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentState:
messages: list = field(default_factory=list)
documents: list[str] = field(default_factory=list)
current_step: str = "init"
retry_count: int = 0
error: Optional[str] = None
Schema 类型对比 #
text
┌─────────────────────────────────────────────────────────────┐
│ Schema 类型对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ TypedDict: │
│ ✅ 类型提示清晰 │
│ ✅ 性能最好 │
│ ⚠️ 无默认值 │
│ │
│ Pydantic: │
│ ✅ 数据验证 │
│ ✅ 默认值支持 │
│ ⚠️ 性能稍低 │
│ │
│ dataclass: │
│ ✅ 默认值支持 │
│ ✅ 简洁语法 │
│ ⚠️ 无验证 │
│ │
│ 推荐: │
│ - 简单场景:TypedDict │
│ - 需要验证:Pydantic │
│ - 需要默认值:dataclass │
│ │
└─────────────────────────────────────────────────────────────┘
Reducer 函数 #
Reducer 定义了如何将节点的更新应用到状态上。每个状态键可以有独立的 Reducer。
默认 Reducer(覆盖) #
python
from typing import TypedDict
class State(TypedDict):
count: int
name: str
def node_a(state: State):
return {"count": 1}
def node_b(state: State):
return {"count": 2}
result: {"count": 2}
使用 operator.add(追加) #
python
from typing import TypedDict, Annotated
from operator import add
class State(TypedDict):
items: Annotated[list, add]
def node_a(state: State):
return {"items": [1, 2]}
def node_b(state: State):
return {"items": [3, 4]}
result: {"items": [1, 2, 3, 4]}
使用 add_messages(消息专用) #
python
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
自定义 Reducer #
python
from typing import TypedDict, Annotated
def merge_dicts(left: dict, right: dict) -> dict:
result = left.copy()
result.update(right)
return result
class State(TypedDict):
config: Annotated[dict, merge_dicts]
def node_a(state: State):
return {"config": {"theme": "dark"}}
def node_b(state: State):
return {"config": {"language": "zh"}}
result: {"config": {"theme": "dark", "language": "zh"}}
Reducer 工作流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ Reducer 工作流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 初始状态: │
│ state = {"items": [1, 2]} │
│ │
│ 节点返回更新: │
│ update = {"items": [3]} │
│ │
│ Reducer 处理: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ reducer(state["items"], update["items"]) │ │
│ │ │ │
│ │ 默认:update["items"] -> [3] │ │
│ │ add: [1,2] + [3] -> [1, 2, 3] │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 最终状态: │
│ state = {"items": [1, 2, 3]} (使用 add) │
│ │
└─────────────────────────────────────────────────────────────┘
MessagesState #
MessagesState 是一个预构建的状态类型,专门用于处理消息。
基本使用 #
python
from langgraph.graph import MessagesState
class State(MessagesState):
documents: list[str]
graph = StateGraph(State)
MessagesState 定义 #
python
from typing import Annotated, Sequence
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
class MessagesState(TypedDict):
messages: Annotated[Sequence[AnyMessage], add_messages]
add_messages 功能 #
text
┌─────────────────────────────────────────────────────────────┐
│ add_messages 功能 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 追加新消息: │
│ state = {"messages": [msg1]} │
│ update = {"messages": [msg2]} │
│ result = {"messages": [msg1, msg2]} │
│ │
│ 2. 更新已有消息(通过 ID): │
│ state = {"messages": [msg_with_id_1]} │
│ update = {"messages": [updated_msg_with_id_1]} │
│ result = {"messages": [updated_msg_with_id_1]} │
│ │
│ 3. 删除消息: │
│ update = {"messages": [RemoveMessage(id="msg_1")]} │
│ │
│ 4. 自动反序列化: │
│ 支持字典格式自动转换为 Message 对象 │
│ │
└─────────────────────────────────────────────────────────────┘
消息操作示例 #
python
from langchain_core.messages import AIMessage, HumanMessage, RemoveMessage
from langgraph.graph import MessagesState
def add_message(state: MessagesState):
return {"messages": [AIMessage(content="Hello!")]}
def update_message(state: MessagesState):
return {"messages": [AIMessage(content="Updated!", id=state["messages"][-1].id)]}
def delete_message(state: MessagesState):
return {"messages": [RemoveMessage(id=state["messages"][0].id)]}
多 Schema 设计 #
输入/输出 Schema #
分离输入和输出 Schema,提供更清晰的接口:
python
from typing import TypedDict
class InputState(TypedDict):
user_query: str
class OutputState(TypedDict):
answer: str
sources: list[str]
class InternalState(TypedDict):
user_query: str
answer: str
sources: list[str]
internal_notes: str
processing_steps: list[str]
graph = StateGraph(
InternalState,
input_schema=InputState,
output_schema=OutputState
)
私有状态通道 #
节点可以使用私有状态通道进行内部通信:
python
from typing import TypedDict
class PublicState(TypedDict):
user_input: str
output: str
class PrivateState(TypedDict):
internal_data: str
def node_a(state: PublicState) -> PrivateState:
return {"internal_data": process(state["user_input"])}
def node_b(state: PrivateState) -> PublicState:
return {"output": format(state["internal_data"])}
graph = StateGraph(PublicState)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
Schema 组合示例 #
text
┌─────────────────────────────────────────────────────────────┐
│ 多 Schema 数据流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Input Schema: │
│ { "user_input": "query" } │
│ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Node A │ │
│ │ 读取 Input │ │
│ │ 写入 Private │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ Private Schema: │
│ { "internal_data": "..." } │
│ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Node B │ │
│ │ 读取 Private │ │
│ │ 写入 Output │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ Output Schema: │
│ { "answer": "...", "sources": [...] } │
│ │
└─────────────────────────────────────────────────────────────┘
状态访问与更新 #
在节点中访问状态 #
python
def my_node(state: State):
messages = state["messages"]
count = state.get("count", 0)
if "documents" in state:
docs = state["documents"]
return {"count": count + 1}
返回状态更新 #
节点返回一个字典,包含要更新的状态键:
python
def my_node(state: State):
return {
"messages": [AIMessage(content="Done")],
"count": state["count"] + 1,
"current_step": "completed"
}
部分更新 #
节点只需返回需要更新的键:
python
class State(TypedDict):
messages: list
count: int
name: str
status: str
def update_count(state: State):
return {"count": state["count"] + 1}
使用 Command 更新 #
python
from langgraph.types import Command
def my_node(state: State) -> Command:
return Command(
update={"count": state["count"] + 1},
goto="next_node"
)
状态初始化 #
默认值 #
python
from dataclasses import dataclass, field
@dataclass
class State:
messages: list = field(default_factory=list)
count: int = 0
status: str = "initialized"
初始化函数 #
python
def initialize_state(user_input: str) -> State:
return {
"messages": [HumanMessage(content=user_input)],
"count": 0,
"status": "initialized"
}
result = app.invoke(initialize_state("Hello"))
使用 RunnableConfig #
python
from langchain_core.runnables import RunnableConfig
def entry_node(state: State, config: RunnableConfig):
user_id = config["configurable"].get("user_id", "anonymous")
return {
"messages": [HumanMessage(content=state["messages"][0].content)],
"user_id": user_id
}
状态验证 #
Pydantic 验证 #
python
from pydantic import BaseModel, Field, validator
class State(BaseModel):
messages: list = Field(default_factory=list)
count: int = Field(ge=0, default=0)
@validator("messages")
def validate_messages(cls, v):
if len(v) > 100:
raise ValueError("Too many messages")
return v
自定义验证节点 #
python
def validate_state(state: State):
if len(state["messages"]) > 100:
raise ValueError("Message limit exceeded")
if state["retry_count"] > 5:
raise ValueError("Retry limit exceeded")
return state
状态快照与历史 #
获取当前状态 #
python
config = {"configurable": {"thread_id": "conversation-1"}}
result = app.invoke({"messages": [...]}, config)
current_state = app.get_state(config)
获取状态历史 #
python
history = app.get_state_history(config)
for snapshot in history:
print(f"Step: {snapshot.metadata['step']}")
print(f"State: {snapshot.values}")
状态快照结构 #
python
snapshot = app.get_state(config)
print(snapshot.values) # 状态值
print(snapshot.next) # 下一个要执行的节点
print(snapshot.config) # 配置
print(snapshot.metadata) # 元数据
print(snapshot.created_at) # 创建时间
print(snapshot.parent_config) # 父状态配置
状态更新 API #
update_state #
手动更新状态:
python
app.update_state(
config,
{"messages": [HumanMessage(content="Override")]}
)
从特定状态恢复 #
python
history = list(app.get_state_history(config))
target_state = history[-5]
app.invoke(None, target_state.config)
最佳实践 #
1. 状态设计原则 #
text
┌─────────────────────────────────────────────────────────────┐
│ 状态设计原则 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 最小化: │
│ - 只存储必要的数据 │
│ - 避免冗余信息 │
│ │
│ 类型安全: │
│ - 使用 TypedDict 或 Pydantic │
│ - 添加类型注解 │
│ │
│ 清晰命名: │
│ - 使用描述性的键名 │
│ - 遵循一致的命名规范 │
│ │
│ 合理 Reducer: │
│ - 列表用 add │
│ - 消息用 add_messages │
│ - 标量用默认(覆盖) │
│ │
└─────────────────────────────────────────────────────────────┘
2. 避免状态膨胀 #
python
class BadState(TypedDict):
messages: list
full_history: list
all_intermediate_results: list
all_metadata: dict
class GoodState(TypedDict):
messages: Annotated[list, add_messages]
current_context: list
result: str
3. 使用私有通道 #
python
class PublicState(TypedDict):
query: str
answer: str
class InternalState(TypedDict):
query: str
answer: str
_internal_cache: dict
_debug_info: list
下一步 #
现在你已经掌握了状态管理,接下来学习 节点与边,深入了解如何定义 Agent 的控制流!
最后更新:2026-03-30