状态管理 #

概述 #

状态是 LangGraph 的核心概念之一。状态在图的节点之间共享,记录应用程序的当前快照。理解状态管理对于构建复杂的 Agent 应用至关重要。

text
┌─────────────────────────────────────────────────────────────┐
│                    状态管理架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                    State                             │   │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │   │
│   │  │  Schema     │  │  Reducer    │  │  Channel    │  │   │
│   │  │  数据结构   │  │  更新逻辑   │  │  通信通道   │  │   │
│   │  └─────────────┘  └─────────────┘  └─────────────┘  │   │
│   └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│              ┌────────────┼────────────┐                    │
│              ▼            ▼            ▼                    │
│         ┌────────┐   ┌────────┐   ┌────────┐               │
│         │ Node A │   │ Node B │   │ Node C │               │
│         │ 读取   │   │ 更新   │   │ 读取   │               │
│         └────────┘   └────────┘   └────────┘               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

定义状态 Schema #

使用 TypedDict #

TypedDict 是定义状态最常用的方式:

python
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Sequence[BaseMessage]
    documents: list[str]
    current_step: str
    retry_count: int

使用 Pydantic #

Pydantic 提供数据验证和默认值:

python
from pydantic import BaseModel, Field
from typing import Optional

class AgentState(BaseModel):
    messages: list = Field(default_factory=list)
    documents: list[str] = Field(default_factory=list)
    current_step: str = "init"
    retry_count: int = 0
    error: Optional[str] = None

使用 dataclass #

dataclass 提供默认值和简洁语法:

python
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AgentState:
    messages: list = field(default_factory=list)
    documents: list[str] = field(default_factory=list)
    current_step: str = "init"
    retry_count: int = 0
    error: Optional[str] = None

Schema 类型对比 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Schema 类型对比                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  TypedDict:                                                 │
│  ✅ 类型提示清晰                                            │
│  ✅ 性能最好                                                │
│  ⚠️ 无默认值                                                │
│                                                             │
│  Pydantic:                                                  │
│  ✅ 数据验证                                                │
│  ✅ 默认值支持                                              │
│  ⚠️ 性能稍低                                                │
│                                                             │
│  dataclass:                                                 │
│  ✅ 默认值支持                                              │
│  ✅ 简洁语法                                                │
│  ⚠️ 无验证                                                  │
│                                                             │
│  推荐:                                                     │
│  - 简单场景:TypedDict                                      │
│  - 需要验证:Pydantic                                       │
│  - 需要默认值:dataclass                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Reducer 函数 #

Reducer 定义了如何将节点的更新应用到状态上。每个状态键可以有独立的 Reducer。

默认 Reducer(覆盖) #

python
from typing import TypedDict

class State(TypedDict):
    count: int
    name: str

def node_a(state: State):
    return {"count": 1}

def node_b(state: State):
    return {"count": 2}

result: {"count": 2}

使用 operator.add(追加) #

python
from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
    items: Annotated[list, add]

def node_a(state: State):
    return {"items": [1, 2]}

def node_b(state: State):
    return {"items": [3, 4]}

result: {"items": [1, 2, 3, 4]}

使用 add_messages(消息专用) #

python
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

自定义 Reducer #

python
from typing import TypedDict, Annotated

def merge_dicts(left: dict, right: dict) -> dict:
    result = left.copy()
    result.update(right)
    return result

class State(TypedDict):
    config: Annotated[dict, merge_dicts]

def node_a(state: State):
    return {"config": {"theme": "dark"}}

def node_b(state: State):
    return {"config": {"language": "zh"}}

result: {"config": {"theme": "dark", "language": "zh"}}

Reducer 工作流程 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Reducer 工作流程                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  初始状态:                                                  │
│  state = {"items": [1, 2]}                                  │
│                                                             │
│  节点返回更新:                                              │
│  update = {"items": [3]}                                    │
│                                                             │
│  Reducer 处理:                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  reducer(state["items"], update["items"])           │   │
│  │                                                     │   │
│  │  默认:update["items"]  -> [3]                      │   │
│  │  add: [1,2] + [3]      -> [1, 2, 3]                │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  最终状态:                                                  │
│  state = {"items": [1, 2, 3]}  (使用 add)                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

MessagesState #

MessagesState 是一个预构建的状态类型,专门用于处理消息。

基本使用 #

python
from langgraph.graph import MessagesState

class State(MessagesState):
    documents: list[str]

graph = StateGraph(State)

MessagesState 定义 #

python
from typing import Annotated, Sequence
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

class MessagesState(TypedDict):
    messages: Annotated[Sequence[AnyMessage], add_messages]

add_messages 功能 #

text
┌─────────────────────────────────────────────────────────────┐
│                    add_messages 功能                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 追加新消息:                                             │
│     state = {"messages": [msg1]}                            │
│     update = {"messages": [msg2]}                           │
│     result = {"messages": [msg1, msg2]}                     │
│                                                             │
│  2. 更新已有消息(通过 ID):                                │
│     state = {"messages": [msg_with_id_1]}                   │
│     update = {"messages": [updated_msg_with_id_1]}          │
│     result = {"messages": [updated_msg_with_id_1]}          │
│                                                             │
│  3. 删除消息:                                               │
│     update = {"messages": [RemoveMessage(id="msg_1")]}      │
│                                                             │
│  4. 自动反序列化:                                           │
│     支持字典格式自动转换为 Message 对象                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

消息操作示例 #

python
from langchain_core.messages import AIMessage, HumanMessage, RemoveMessage
from langgraph.graph import MessagesState

def add_message(state: MessagesState):
    return {"messages": [AIMessage(content="Hello!")]}

def update_message(state: MessagesState):
    return {"messages": [AIMessage(content="Updated!", id=state["messages"][-1].id)]}

def delete_message(state: MessagesState):
    return {"messages": [RemoveMessage(id=state["messages"][0].id)]}

多 Schema 设计 #

输入/输出 Schema #

分离输入和输出 Schema,提供更清晰的接口:

python
from typing import TypedDict

class InputState(TypedDict):
    user_query: str

class OutputState(TypedDict):
    answer: str
    sources: list[str]

class InternalState(TypedDict):
    user_query: str
    answer: str
    sources: list[str]
    internal_notes: str
    processing_steps: list[str]

graph = StateGraph(
    InternalState,
    input_schema=InputState,
    output_schema=OutputState
)

私有状态通道 #

节点可以使用私有状态通道进行内部通信:

python
from typing import TypedDict

class PublicState(TypedDict):
    user_input: str
    output: str

class PrivateState(TypedDict):
    internal_data: str

def node_a(state: PublicState) -> PrivateState:
    return {"internal_data": process(state["user_input"])}

def node_b(state: PrivateState) -> PublicState:
    return {"output": format(state["internal_data"])}

graph = StateGraph(PublicState)
graph.add_node("a", node_a)
graph.add_node("b", node_b)

Schema 组合示例 #

text
┌─────────────────────────────────────────────────────────────┐
│                    多 Schema 数据流                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Input Schema:                                              │
│  { "user_input": "query" }                                  │
│                                                             │
│           │                                                 │
│           ▼                                                 │
│  ┌─────────────────┐                                        │
│  │    Node A       │                                        │
│  │  读取 Input     │                                        │
│  │  写入 Private   │                                        │
│  └────────┬────────┘                                        │
│           │                                                 │
│           ▼                                                 │
│  Private Schema:                                            │
│  { "internal_data": "..." }                                 │
│                                                             │
│           │                                                 │
│           ▼                                                 │
│  ┌─────────────────┐                                        │
│  │    Node B       │                                        │
│  │  读取 Private   │                                        │
│  │  写入 Output    │                                        │
│  └────────┬────────┘                                        │
│           │                                                 │
│           ▼                                                 │
│  Output Schema:                                             │
│  { "answer": "...", "sources": [...] }                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

状态访问与更新 #

在节点中访问状态 #

python
def my_node(state: State):
    messages = state["messages"]
    count = state.get("count", 0)
    
    if "documents" in state:
        docs = state["documents"]
    
    return {"count": count + 1}

返回状态更新 #

节点返回一个字典,包含要更新的状态键:

python
def my_node(state: State):
    return {
        "messages": [AIMessage(content="Done")],
        "count": state["count"] + 1,
        "current_step": "completed"
    }

部分更新 #

节点只需返回需要更新的键:

python
class State(TypedDict):
    messages: list
    count: int
    name: str
    status: str

def update_count(state: State):
    return {"count": state["count"] + 1}

使用 Command 更新 #

python
from langgraph.types import Command

def my_node(state: State) -> Command:
    return Command(
        update={"count": state["count"] + 1},
        goto="next_node"
    )

状态初始化 #

默认值 #

python
from dataclasses import dataclass, field

@dataclass
class State:
    messages: list = field(default_factory=list)
    count: int = 0
    status: str = "initialized"

初始化函数 #

python
def initialize_state(user_input: str) -> State:
    return {
        "messages": [HumanMessage(content=user_input)],
        "count": 0,
        "status": "initialized"
    }

result = app.invoke(initialize_state("Hello"))

使用 RunnableConfig #

python
from langchain_core.runnables import RunnableConfig

def entry_node(state: State, config: RunnableConfig):
    user_id = config["configurable"].get("user_id", "anonymous")
    return {
        "messages": [HumanMessage(content=state["messages"][0].content)],
        "user_id": user_id
    }

状态验证 #

Pydantic 验证 #

python
from pydantic import BaseModel, Field, validator

class State(BaseModel):
    messages: list = Field(default_factory=list)
    count: int = Field(ge=0, default=0)
    
    @validator("messages")
    def validate_messages(cls, v):
        if len(v) > 100:
            raise ValueError("Too many messages")
        return v

自定义验证节点 #

python
def validate_state(state: State):
    if len(state["messages"]) > 100:
        raise ValueError("Message limit exceeded")
    if state["retry_count"] > 5:
        raise ValueError("Retry limit exceeded")
    return state

状态快照与历史 #

获取当前状态 #

python
config = {"configurable": {"thread_id": "conversation-1"}}

result = app.invoke({"messages": [...]}, config)
current_state = app.get_state(config)

获取状态历史 #

python
history = app.get_state_history(config)
for snapshot in history:
    print(f"Step: {snapshot.metadata['step']}")
    print(f"State: {snapshot.values}")

状态快照结构 #

python
snapshot = app.get_state(config)

print(snapshot.values)      # 状态值
print(snapshot.next)        # 下一个要执行的节点
print(snapshot.config)      # 配置
print(snapshot.metadata)    # 元数据
print(snapshot.created_at)  # 创建时间
print(snapshot.parent_config)  # 父状态配置

状态更新 API #

update_state #

手动更新状态:

python
app.update_state(
    config,
    {"messages": [HumanMessage(content="Override")]}
)

从特定状态恢复 #

python
history = list(app.get_state_history(config))
target_state = history[-5]

app.invoke(None, target_state.config)

最佳实践 #

1. 状态设计原则 #

text
┌─────────────────────────────────────────────────────────────┐
│                    状态设计原则                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  最小化:                                                    │
│  - 只存储必要的数据                                         │
│  - 避免冗余信息                                             │
│                                                             │
│  类型安全:                                                  │
│  - 使用 TypedDict 或 Pydantic                               │
│  - 添加类型注解                                             │
│                                                             │
│  清晰命名:                                                  │
│  - 使用描述性的键名                                         │
│  - 遵循一致的命名规范                                       │
│                                                             │
│  合理 Reducer:                                              │
│  - 列表用 add                                               │
│  - 消息用 add_messages                                      │
│  - 标量用默认(覆盖)                                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 避免状态膨胀 #

python
class BadState(TypedDict):
    messages: list
    full_history: list
    all_intermediate_results: list
    all_metadata: dict

class GoodState(TypedDict):
    messages: Annotated[list, add_messages]
    current_context: list
    result: str

3. 使用私有通道 #

python
class PublicState(TypedDict):
    query: str
    answer: str

class InternalState(TypedDict):
    query: str
    answer: str
    _internal_cache: dict
    _debug_info: list

下一步 #

现在你已经掌握了状态管理,接下来学习 节点与边,深入了解如何定义 Agent 的控制流!

最后更新:2026-03-30