人机交互 #

概述 #

人机交互(Human-in-the-Loop,HITL)是 LangGraph 的重要特性,允许人类在 Agent 执行过程中进行检查、审批和干预。这对于构建安全、可控的 AI 应用至关重要。

text
┌─────────────────────────────────────────────────────────────┐
│                    人机交互场景                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 审批敏感操作                                           │
│      Agent 想要发送邮件 → 人工确认 → 执行/拒绝              │
│                                                             │
│   2. 审核生成内容                                           │
│      Agent 生成报告 → 人工审核 → 接受/修改                  │
│                                                             │
│   3. 提供额外信息                                           │
│      Agent 需要更多信息 → 人工输入 → 继续执行               │
│                                                             │
│   4. 错误纠正                                               │
│      Agent 出现错误 → 人工修正 → 继续执行                   │
│                                                             │
│   5. 质量控制                                               │
│      Agent 完成任务 → 人工验收 → 通过/重做                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

中断机制 #

LangGraph 使用 interrupt() 函数来暂停图的执行,等待人工输入。

基本使用 #

python
from langgraph.types import interrupt
from langgraph.graph import StateGraph, MessagesState, START, END

def human_approval_node(state: MessagesState):
    user_input = interrupt("Do you approve this action?")
    return {"messages": [{"role": "user", "content": user_input}]}

graph = StateGraph(MessagesState)
graph.add_node("approval", human_approval_node)
graph.add_edge(START, "approval")
graph.add_edge("approval", END)

app = graph.compile(checkpointer=MemorySaver())

中断流程 #

text
┌─────────────────────────────────────────────────────────────┐
│                    中断执行流程                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 执行图                                                   │
│     app.invoke({"messages": [...]}, config)                 │
│                                                             │
│  2. 遇到 interrupt()                                         │
│     图暂停执行,返回中断信息                                 │
│                                                             │
│  3. 人工处理                                                 │
│     查看中断信息,做出决定                                   │
│                                                             │
│  4. 恢复执行                                                 │
│     app.invoke(Command(resume="approved"), config)          │
│                                                             │
│  5. 继续执行                                                 │
│     interrupt() 返回人工输入,图继续执行                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

完整示例 #

python
from typing import TypedDict
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class State(TypedDict):
    action: str
    approved: bool
    result: str

def plan_action(state: State):
    action = "Send email to customer@example.com"
    return {"action": action}

def human_review(state: State):
    answer = interrupt(f"Do you approve: {state['action']}?")
    approved = answer.lower() in ["yes", "y", "approve"]
    return {"approved": approved}

def execute_action(state: State):
    if state["approved"]:
        return {"result": "Action executed successfully"}
    return {"result": "Action cancelled"}

graph = StateGraph(State)
graph.add_node("plan", plan_action)
graph.add_node("review", human_review)
graph.add_node("execute", execute_action)

graph.add_edge(START, "plan")
graph.add_edge("plan", "review")
graph.add_edge("review", "execute")
graph.add_edge("execute", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-1"}}

result = app.invoke({"action": ""}, config)
print(f"Interrupted: {result}")

result = app.invoke(Command(resume="yes"), config)
print(f"Final: {result}")

中断点配置 #

interrupt_before #

在特定节点执行前中断:

python
app = graph.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["sensitive_operation"]
)

result = app.invoke({"messages": [...]}, config)
print("Paused before sensitive_operation")

result = app.invoke(None, config)

interrupt_after #

在特定节点执行后中断:

python
app = graph.compile(
    checkpointer=MemorySaver(),
    interrupt_after=["generate_report"]
)

result = app.invoke({"messages": [...]}, config)
print("Paused after generate_report")

result = app.invoke(None, config)

组合使用 #

python
app = graph.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["send_email"],
    interrupt_after=["generate_content"]
)

恢复执行 #

使用 Command 恢复 #

python
from langgraph.types import Command

result = app.invoke(Command(resume="approved"), config)

恢复值类型 #

python
result = app.invoke(Command(resume="yes"), config)

result = app.invoke(Command(resume={"approved": True, "comment": "Looks good"}), config)

result = app.invoke(Command(resume=["item1", "item2"]), config)

不提供值恢复 #

python
result = app.invoke(None, config)

人机交互模式 #

1. 审批模式 #

python
from langgraph.types import interrupt

def approval_node(state: State):
    question = f"""
    Proposed Action: {state['action']}
    Details: {state['details']}
    
    Do you approve? (yes/no)
    """
    answer = interrupt(question)
    
    if answer.lower() in ["yes", "y", "approve"]:
        return {"approved": True}
    return {"approved": False}

2. 编辑模式 #

python
def edit_node(state: State):
    current_content = state["content"]
    
    question = f"""
    Current content:
    {current_content}
    
    Please edit or press Enter to keep:
    """
    edited = interrupt(question)
    
    if edited:
        return {"content": edited}
    return {}

3. 选择模式 #

python
def choice_node(state: State):
    options = state["options"]
    
    question = f"""
    Please select an option:
    {chr(10).join(f"{i+1}. {opt}" for i, opt in enumerate(options))}
    """
    choice = interrupt(question)
    
    try:
        index = int(choice) - 1
        return {"selected": options[index]}
    except:
        return {"selected": options[0]}

4. 输入模式 #

python
def input_node(state: State):
    question = "Please provide additional information:"
    info = interrupt(question)
    return {"user_input": info}

5. 多步骤审批 #

python
def multi_step_approval(state: State):
    steps = state["steps"]
    approvals = []
    
    for i, step in enumerate(steps):
        answer = interrupt(f"Approve step {i+1}: {step}?")
        approvals.append(answer.lower() == "yes")
    
    return {"approvals": approvals}

检查中断状态 #

获取中断信息 #

python
config = {"configurable": {"thread_id": "task-1"}}

result = app.invoke({"messages": [...]}, config)

state = app.get_state(config)

if state.next:
    print(f"Interrupted at: {state.next}")
    print(f"Tasks pending: {state.values}")

检查是否在中断状态 #

python
def is_interrupted(app, config):
    state = app.get_state(config)
    return state.next is not None and len(state.next) > 0

获取中断历史 #

python
history = list(app.get_state_history(config))

for snapshot in history:
    if snapshot.next:
        print(f"Interrupted at step {snapshot.metadata['step']}")

高级模式 #

条件性中断 #

python
from langgraph.types import interrupt

def conditional_interrupt(state: State):
    if state["action"].startswith("DELETE"):
        answer = interrupt(f"Confirm DELETE: {state['action']}?")
        return {"confirmed": answer.lower() == "yes"}
    return {"confirmed": True}

超时处理 #

python
import asyncio

async def invoke_with_timeout(app, input_data, config, timeout=300):
    result = app.invoke(input_data, config)
    
    if is_interrupted(app, config):
        try:
            answer = await asyncio.wait_for(
                get_user_input(),
                timeout=timeout
            )
            return app.invoke(Command(resume=answer), config)
        except asyncio.TimeoutError:
            app.update_state(config, {"cancelled": True})
            return {"error": "Timeout"}
    
    return result

批量审批 #

python
def batch_approval(state: State):
    items = state["pending_items"]
    
    question = f"Approve all {len(items)} items? (yes/no/individual)"
    answer = interrupt(question)
    
    if answer.lower() == "yes":
        return {"approved_items": items}
    elif answer.lower() == "individual":
        approved = []
        for item in items:
            ans = interrupt(f"Approve: {item}?")
            if ans.lower() == "yes":
                approved.append(item)
        return {"approved_items": approved}
    
    return {"approved_items": []}

工作流回退 #

python
def review_with_rollback(state: State):
    while True:
        answer = interrupt(f"""
        Current state: {state}
        
        Options:
        1. Approve and continue
        2. Modify and retry
        3. Rollback to previous step
        """)
        
        if answer == "1":
            return {"status": "approved"}
        elif answer == "2":
            new_data = interrupt("Enter modifications:")
            return {"status": "modified", "data": new_data}
        elif answer == "3":
            return {"status": "rollback"}

Web 应用集成 #

FastAPI 示例 #

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ResumeRequest(BaseModel):
    thread_id: str
    resume_value: str

@app.post("/invoke")
async def invoke_agent(message: str, thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}
    result = langgraph_app.invoke({"messages": [message]}, config)
    return {"status": "interrupted" if is_interrupted(result) else "completed", "result": result}

@app.post("/resume")
async def resume_agent(request: ResumeRequest):
    config = {"configurable": {"thread_id": request.thread_id}}
    result = langgraph_app.invoke(Command(resume=request.resume_value), config)
    return {"result": result}

@app.get("/state/{thread_id}")
async def get_state(thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}
    state = langgraph_app.get_state(config)
    return {"state": state.values, "next": state.next}

前端集成 #

javascript
async function invokeAgent(message, threadId) {
    const response = await fetch('/invoke', {
        method: 'POST',
        body: JSON.stringify({ message, thread_id: threadId })
    });
    return response.json();
}

async function resumeAgent(threadId, value) {
    const response = await fetch('/resume', {
        method: 'POST',
        body: JSON.stringify({ thread_id: threadId, resume_value: value })
    });
    return response.json();
}

最佳实践 #

1. 清晰的提示 #

python
def good_interrupt(state: State):
    question = f"""
    ╔══════════════════════════════════════════════════════════╗
    ║  Action Required: Approval                               ║
    ╠══════════════════════════════════════════════════════════╣
    ║  Type: Send Email                                        ║
    ║  To: {state['recipient']:<45} ║
    ║  Subject: {state['subject']:<43} ║
    ║                                                          ║
    ║  Preview:                                                ║
    ║  {state['preview'][:50]}...                               ║
    ║                                                          ║
    ║  Options:                                                ║
    ║  - Type 'yes' to approve                                 ║
    ║  - Type 'no' to reject                                   ║
    ║  - Type 'edit' to modify                                 ║
    ╚══════════════════════════════════════════════════════════╝
    """
    return interrupt(question)

2. 默认值和验证 #

python
def validated_input(state: State):
    while True:
        answer = interrupt("Enter a number between 1-10:")
        try:
            value = int(answer)
            if 1 <= value <= 10:
                return {"value": value}
        except ValueError:
            pass
        interrupt("Invalid input. Please try again.")

3. 上下文信息 #

python
def contextual_approval(state: State):
    context = f"""
    Previous actions: {state['history'][-5:]}
    Current task: {state['current_task']}
    Risk level: {state['risk_level']}
    
    Do you want to proceed?
    """
    return interrupt(context)

4. 可取消的操作 #

python
def cancellable_operation(state: State):
    answer = interrupt("Starting long operation. Type 'cancel' to abort.")
    
    if answer.lower() == "cancel":
        return {"cancelled": True}
    
    for i in range(100):
        if check_cancelled():
            return {"cancelled": True}
        do_step(i)
    
    return {"cancelled": False, "result": "completed"}

下一步 #

现在你已经掌握了人机交互,接下来学习 高级模式,了解更复杂的 Agent 架构!

最后更新:2026-03-30