人机交互 #
概述 #
人机交互(Human-in-the-Loop,HITL)是 LangGraph 的重要特性,允许人类在 Agent 执行过程中进行检查、审批和干预。这对于构建安全、可控的 AI 应用至关重要。
text
┌─────────────────────────────────────────────────────────────┐
│ 人机交互场景 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 审批敏感操作 │
│ Agent 想要发送邮件 → 人工确认 → 执行/拒绝 │
│ │
│ 2. 审核生成内容 │
│ Agent 生成报告 → 人工审核 → 接受/修改 │
│ │
│ 3. 提供额外信息 │
│ Agent 需要更多信息 → 人工输入 → 继续执行 │
│ │
│ 4. 错误纠正 │
│ Agent 出现错误 → 人工修正 → 继续执行 │
│ │
│ 5. 质量控制 │
│ Agent 完成任务 → 人工验收 → 通过/重做 │
│ │
└─────────────────────────────────────────────────────────────┘
中断机制 #
LangGraph 使用 interrupt() 函数来暂停图的执行,等待人工输入。
基本使用 #
python
from langgraph.types import interrupt
from langgraph.graph import StateGraph, MessagesState, START, END
def human_approval_node(state: MessagesState):
user_input = interrupt("Do you approve this action?")
return {"messages": [{"role": "user", "content": user_input}]}
graph = StateGraph(MessagesState)
graph.add_node("approval", human_approval_node)
graph.add_edge(START, "approval")
graph.add_edge("approval", END)
app = graph.compile(checkpointer=MemorySaver())
中断流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ 中断执行流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 执行图 │
│ app.invoke({"messages": [...]}, config) │
│ │
│ 2. 遇到 interrupt() │
│ 图暂停执行,返回中断信息 │
│ │
│ 3. 人工处理 │
│ 查看中断信息,做出决定 │
│ │
│ 4. 恢复执行 │
│ app.invoke(Command(resume="approved"), config) │
│ │
│ 5. 继续执行 │
│ interrupt() 返回人工输入,图继续执行 │
│ │
└─────────────────────────────────────────────────────────────┘
完整示例 #
python
from typing import TypedDict
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
action: str
approved: bool
result: str
def plan_action(state: State):
action = "Send email to customer@example.com"
return {"action": action}
def human_review(state: State):
answer = interrupt(f"Do you approve: {state['action']}?")
approved = answer.lower() in ["yes", "y", "approve"]
return {"approved": approved}
def execute_action(state: State):
if state["approved"]:
return {"result": "Action executed successfully"}
return {"result": "Action cancelled"}
graph = StateGraph(State)
graph.add_node("plan", plan_action)
graph.add_node("review", human_review)
graph.add_node("execute", execute_action)
graph.add_edge(START, "plan")
graph.add_edge("plan", "review")
graph.add_edge("review", "execute")
graph.add_edge("execute", END)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "review-1"}}
result = app.invoke({"action": ""}, config)
print(f"Interrupted: {result}")
result = app.invoke(Command(resume="yes"), config)
print(f"Final: {result}")
中断点配置 #
interrupt_before #
在特定节点执行前中断:
python
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["sensitive_operation"]
)
result = app.invoke({"messages": [...]}, config)
print("Paused before sensitive_operation")
result = app.invoke(None, config)
interrupt_after #
在特定节点执行后中断:
python
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_after=["generate_report"]
)
result = app.invoke({"messages": [...]}, config)
print("Paused after generate_report")
result = app.invoke(None, config)
组合使用 #
python
app = graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["send_email"],
interrupt_after=["generate_content"]
)
恢复执行 #
使用 Command 恢复 #
python
from langgraph.types import Command
result = app.invoke(Command(resume="approved"), config)
恢复值类型 #
python
result = app.invoke(Command(resume="yes"), config)
result = app.invoke(Command(resume={"approved": True, "comment": "Looks good"}), config)
result = app.invoke(Command(resume=["item1", "item2"]), config)
不提供值恢复 #
python
result = app.invoke(None, config)
人机交互模式 #
1. 审批模式 #
python
from langgraph.types import interrupt
def approval_node(state: State):
question = f"""
Proposed Action: {state['action']}
Details: {state['details']}
Do you approve? (yes/no)
"""
answer = interrupt(question)
if answer.lower() in ["yes", "y", "approve"]:
return {"approved": True}
return {"approved": False}
2. 编辑模式 #
python
def edit_node(state: State):
current_content = state["content"]
question = f"""
Current content:
{current_content}
Please edit or press Enter to keep:
"""
edited = interrupt(question)
if edited:
return {"content": edited}
return {}
3. 选择模式 #
python
def choice_node(state: State):
options = state["options"]
question = f"""
Please select an option:
{chr(10).join(f"{i+1}. {opt}" for i, opt in enumerate(options))}
"""
choice = interrupt(question)
try:
index = int(choice) - 1
return {"selected": options[index]}
except:
return {"selected": options[0]}
4. 输入模式 #
python
def input_node(state: State):
question = "Please provide additional information:"
info = interrupt(question)
return {"user_input": info}
5. 多步骤审批 #
python
def multi_step_approval(state: State):
steps = state["steps"]
approvals = []
for i, step in enumerate(steps):
answer = interrupt(f"Approve step {i+1}: {step}?")
approvals.append(answer.lower() == "yes")
return {"approvals": approvals}
检查中断状态 #
获取中断信息 #
python
config = {"configurable": {"thread_id": "task-1"}}
result = app.invoke({"messages": [...]}, config)
state = app.get_state(config)
if state.next:
print(f"Interrupted at: {state.next}")
print(f"Tasks pending: {state.values}")
检查是否在中断状态 #
python
def is_interrupted(app, config):
state = app.get_state(config)
return state.next is not None and len(state.next) > 0
获取中断历史 #
python
history = list(app.get_state_history(config))
for snapshot in history:
if snapshot.next:
print(f"Interrupted at step {snapshot.metadata['step']}")
高级模式 #
条件性中断 #
python
from langgraph.types import interrupt
def conditional_interrupt(state: State):
if state["action"].startswith("DELETE"):
answer = interrupt(f"Confirm DELETE: {state['action']}?")
return {"confirmed": answer.lower() == "yes"}
return {"confirmed": True}
超时处理 #
python
import asyncio
async def invoke_with_timeout(app, input_data, config, timeout=300):
result = app.invoke(input_data, config)
if is_interrupted(app, config):
try:
answer = await asyncio.wait_for(
get_user_input(),
timeout=timeout
)
return app.invoke(Command(resume=answer), config)
except asyncio.TimeoutError:
app.update_state(config, {"cancelled": True})
return {"error": "Timeout"}
return result
批量审批 #
python
def batch_approval(state: State):
items = state["pending_items"]
question = f"Approve all {len(items)} items? (yes/no/individual)"
answer = interrupt(question)
if answer.lower() == "yes":
return {"approved_items": items}
elif answer.lower() == "individual":
approved = []
for item in items:
ans = interrupt(f"Approve: {item}?")
if ans.lower() == "yes":
approved.append(item)
return {"approved_items": approved}
return {"approved_items": []}
工作流回退 #
python
def review_with_rollback(state: State):
while True:
answer = interrupt(f"""
Current state: {state}
Options:
1. Approve and continue
2. Modify and retry
3. Rollback to previous step
""")
if answer == "1":
return {"status": "approved"}
elif answer == "2":
new_data = interrupt("Enter modifications:")
return {"status": "modified", "data": new_data}
elif answer == "3":
return {"status": "rollback"}
Web 应用集成 #
FastAPI 示例 #
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ResumeRequest(BaseModel):
thread_id: str
resume_value: str
@app.post("/invoke")
async def invoke_agent(message: str, thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
result = langgraph_app.invoke({"messages": [message]}, config)
return {"status": "interrupted" if is_interrupted(result) else "completed", "result": result}
@app.post("/resume")
async def resume_agent(request: ResumeRequest):
config = {"configurable": {"thread_id": request.thread_id}}
result = langgraph_app.invoke(Command(resume=request.resume_value), config)
return {"result": result}
@app.get("/state/{thread_id}")
async def get_state(thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
state = langgraph_app.get_state(config)
return {"state": state.values, "next": state.next}
前端集成 #
javascript
async function invokeAgent(message, threadId) {
const response = await fetch('/invoke', {
method: 'POST',
body: JSON.stringify({ message, thread_id: threadId })
});
return response.json();
}
async function resumeAgent(threadId, value) {
const response = await fetch('/resume', {
method: 'POST',
body: JSON.stringify({ thread_id: threadId, resume_value: value })
});
return response.json();
}
最佳实践 #
1. 清晰的提示 #
python
def good_interrupt(state: State):
question = f"""
╔══════════════════════════════════════════════════════════╗
║ Action Required: Approval ║
╠══════════════════════════════════════════════════════════╣
║ Type: Send Email ║
║ To: {state['recipient']:<45} ║
║ Subject: {state['subject']:<43} ║
║ ║
║ Preview: ║
║ {state['preview'][:50]}... ║
║ ║
║ Options: ║
║ - Type 'yes' to approve ║
║ - Type 'no' to reject ║
║ - Type 'edit' to modify ║
╚══════════════════════════════════════════════════════════╝
"""
return interrupt(question)
2. 默认值和验证 #
python
def validated_input(state: State):
while True:
answer = interrupt("Enter a number between 1-10:")
try:
value = int(answer)
if 1 <= value <= 10:
return {"value": value}
except ValueError:
pass
interrupt("Invalid input. Please try again.")
3. 上下文信息 #
python
def contextual_approval(state: State):
context = f"""
Previous actions: {state['history'][-5:]}
Current task: {state['current_task']}
Risk level: {state['risk_level']}
Do you want to proceed?
"""
return interrupt(context)
4. 可取消的操作 #
python
def cancellable_operation(state: State):
answer = interrupt("Starting long operation. Type 'cancel' to abort.")
if answer.lower() == "cancel":
return {"cancelled": True}
for i in range(100):
if check_cancelled():
return {"cancelled": True}
do_step(i)
return {"cancelled": False, "result": "completed"}
下一步 #
现在你已经掌握了人机交互,接下来学习 高级模式,了解更复杂的 Agent 架构!
最后更新:2026-03-30