回调系统(Callbacks) #
回调系统是 LangChain 的可观测性基础,让你能够监控、记录和调试 LLM 应用的执行过程。
回调系统概述 #
text
┌─────────────────────────────────────────────────────────────┐
│ 回调的作用 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 执行流程: │
│ 用户请求 ───> 链执行 ───> 模型调用 ───> 返回结果 │
│ │ │
│ ▼ │
│ 回调系统 │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ 日志记录 性能监控 自定义处理 │
│ │
│ 应用场景: │
│ ✅ 记录执行日志 │
│ ✅ 监控性能指标 │
│ ✅ 调试问题 │
│ ✅ 追踪 Token 使用 │
│ ✅ 集成监控系统 │
│ │
└─────────────────────────────────────────────────────────────┘
基础回调 #
使用内置回调处理器 #
python
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StdOutCallbackHandler
model = ChatOpenAI(model="gpt-4o-mini")
# 使用标准输出回调
handler = StdOutCallbackHandler()
response = model.invoke(
"你好",
config={"callbacks": [handler]}
)
自定义回调处理器 #
python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
class MyCallbackHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_llm_start(self, serialized, prompts, **kwargs):
"""LLM 开始时调用"""
print(f"[LLM 开始] 提示: {prompts}")
def on_llm_new_token(self, token: str, **kwargs):
"""生成新 token 时调用"""
print(token, end="", flush=True)
def on_llm_end(self, response, **kwargs):
"""LLM 结束时调用"""
print("\n[LLM 结束]")
def on_llm_error(self, error, **kwargs):
"""LLM 出错时调用"""
print(f"[LLM 错误] {error}")
def on_chain_start(self, serialized, inputs, **kwargs):
"""链开始时调用"""
print(f"[链开始] 输入: {inputs}")
def on_chain_end(self, outputs, **kwargs):
"""链结束时调用"""
print(f"[链结束] 输出: {outputs}")
# 使用
model = ChatOpenAI(model="gpt-4o-mini")
handler = MyCallbackHandler()
response = model.invoke(
"讲一个故事",
config={"callbacks": [handler]}
)
回调事件 #
完整事件列表 #
python
from langchain_core.callbacks import BaseCallbackHandler
class FullCallbackHandler(BaseCallbackHandler):
"""完整的回调处理器"""
# LLM 相关
def on_llm_start(self, serialized, prompts, **kwargs):
print("LLM 开始")
def on_llm_new_token(self, token, **kwargs):
print(f"新 Token: {token}")
def on_llm_end(self, response, **kwargs):
print("LLM 结束")
def on_llm_error(self, error, **kwargs):
print(f"LLM 错误: {error}")
# 链相关
def on_chain_start(self, serialized, inputs, **kwargs):
print("链开始")
def on_chain_end(self, outputs, **kwargs):
print("链结束")
def on_chain_error(self, error, **kwargs):
print(f"链错误: {error}")
# 工具相关
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"工具开始: {input_str}")
def on_tool_end(self, output, **kwargs):
print(f"工具结束: {output}")
def on_tool_error(self, error, **kwargs):
print(f"工具错误: {error}")
# 检索相关
def on_retriever_start(self, serialized, query, **kwargs):
print(f"检索开始: {query}")
def on_retriever_end(self, documents, **kwargs):
print(f"检索结束: {len(documents)} 个文档")
# 提示相关
def on_prompt_start(self, serialized, prompts, **kwargs):
print("提示开始")
def on_prompt_end(self, response, **kwargs):
print("提示结束")
实用回调示例 #
Token 使用追踪 #
python
from langchain_core.callbacks import BaseCallbackHandler
class TokenCounter(BaseCallbackHandler):
"""Token 计数器"""
def __init__(self):
self.total_tokens = 0
self.prompt_tokens = 0
self.completion_tokens = 0
def on_llm_end(self, response, **kwargs):
"""记录 Token 使用"""
if hasattr(response, 'llm_output') and response.llm_output:
token_usage = response.llm_output.get('token_usage', {})
self.prompt_tokens += token_usage.get('prompt_tokens', 0)
self.completion_tokens += token_usage.get('completion_tokens', 0)
self.total_tokens += token_usage.get('total_tokens', 0)
def get_stats(self):
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens
}
# 使用
counter = TokenCounter()
model = ChatOpenAI(model="gpt-4o-mini")
response1 = model.invoke("你好", config={"callbacks": [counter]})
response2 = model.invoke("再见", config={"callbacks": [counter]})
print(f"Token 使用统计: {counter.get_stats()}")
性能监控 #
python
import time
from langchain_core.callbacks import BaseCallbackHandler
class PerformanceMonitor(BaseCallbackHandler):
"""性能监控"""
def __init__(self):
self.start_times = {}
self.durations = {}
def on_llm_start(self, serialized, prompts, **kwargs):
self.start_times['llm'] = time.time()
def on_llm_end(self, response, **kwargs):
self.durations['llm'] = time.time() - self.start_times.get('llm', 0)
def on_chain_start(self, serialized, inputs, **kwargs):
self.start_times['chain'] = time.time()
def on_chain_end(self, outputs, **kwargs):
self.durations['chain'] = time.time() - self.start_times.get('chain', 0)
def get_report(self):
return {
"llm_duration": f"{self.durations.get('llm', 0):.2f}s",
"chain_duration": f"{self.durations.get('chain', 0):.2f}s"
}
# 使用
monitor = PerformanceMonitor()
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke("讲一个故事", config={"callbacks": [monitor]})
print(f"性能报告: {monitor.get_report()}")
文件日志 #
python
from langchain_core.callbacks import BaseCallbackHandler
import json
from datetime import datetime
class FileLogger(BaseCallbackHandler):
"""文件日志记录器"""
def __init__(self, log_file: str = "langchain.log"):
self.log_file = log_file
def _log(self, event: str, data: dict):
with open(self.log_file, "a") as f:
log_entry = {
"timestamp": datetime.now().isoformat(),
"event": event,
"data": data
}
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
def on_llm_start(self, serialized, prompts, **kwargs):
self._log("llm_start", {"prompts": prompts})
def on_llm_end(self, response, **kwargs):
self._log("llm_end", {"response": str(response)})
def on_llm_error(self, error, **kwargs):
self._log("llm_error", {"error": str(error)})
def on_tool_start(self, serialized, input_str, **kwargs):
self._log("tool_start", {"input": input_str})
def on_tool_end(self, output, **kwargs):
self._log("tool_end", {"output": str(output)})
# 使用
logger = FileLogger()
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke("你好", config={"callbacks": [logger]})
多回调处理器 #
python
from langchain_core.callbacks import StdOutCallbackHandler
# 可以同时使用多个回调
handlers = [
StdOutCallbackHandler(),
TokenCounter(),
PerformanceMonitor(),
FileLogger()
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(
"讲一个故事",
config={"callbacks": handlers}
)
异步回调 #
python
from langchain_core.callbacks import AsyncCallbackHandler
import asyncio
class AsyncCallbackHandler(AsyncCallbackHandler):
"""异步回调处理器"""
async def on_llm_start(self, serialized, prompts, **kwargs):
print("异步 LLM 开始")
async def on_llm_end(self, response, **kwargs):
print("异步 LLM 结束")
async def on_llm_new_token(self, token, **kwargs):
print(token, end="", flush=True)
# 使用
async def main():
handler = AsyncCallbackHandler()
model = ChatOpenAI(model="gpt-4o-mini")
response = await model.ainvoke(
"讲一个故事",
config={"callbacks": [handler]}
)
asyncio.run(main())
LangSmith 集成 #
LangSmith 是 LangChain 官方的监控平台:
python
import os
# 配置 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# 所有执行会自动追踪到 LangSmith
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke("你好")
# 在 LangSmith 中可以查看:
# - 完整的执行链路
# - Token 使用情况
# - 延迟统计
# - 成本分析
回调配置 #
全局回调 #
python
from langchain_core.callbacks import set_handler
# 设置全局回调
set_handler(MyCallbackHandler())
# 之后所有调用都会使用这个回调
运行时回调 #
python
# 每次调用时指定回调
response = model.invoke(
"你好",
config={"callbacks": [handler1, handler2]}
)
标签和元数据 #
python
response = model.invoke(
"你好",
config={
"callbacks": [handler],
"tags": ["production", "chat"],
"metadata": {
"user_id": "user-123",
"session_id": "session-456"
}
}
)
完整示例 #
生产级监控 #
python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from datetime import datetime
import json
class ProductionMonitor(BaseCallbackHandler):
"""生产级监控回调"""
def __init__(self):
self.runs = []
def on_llm_start(self, serialized, prompts, **kwargs):
self.current_run = {
"start_time": datetime.now().isoformat(),
"prompts": prompts,
"tokens": {"prompt": 0, "completion": 0, "total": 0}
}
def on_llm_end(self, response, **kwargs):
self.current_run["end_time"] = datetime.now().isoformat()
# 提取 token 使用
if hasattr(response, 'llm_output') and response.llm_output:
usage = response.llm_output.get('token_usage', {})
self.current_run["tokens"] = {
"prompt": usage.get('prompt_tokens', 0),
"completion": usage.get('completion_tokens', 0),
"total": usage.get('total_tokens', 0)
}
self.runs.append(self.current_run)
def on_llm_error(self, error, **kwargs):
self.current_run["error"] = str(error)
self.runs.append(self.current_run)
def get_report(self):
total_tokens = sum(r["tokens"]["total"] for r in self.runs)
errors = sum(1 for r in self.runs if "error" in r)
return {
"total_runs": len(self.runs),
"total_tokens": total_tokens,
"errors": errors,
"runs": self.runs
}
# 使用
monitor = ProductionMonitor()
model = ChatOpenAI(model="gpt-4o-mini")
# 模拟多次调用
for i in range(3):
response = model.invoke(
f"问题 {i+1}",
config={"callbacks": [monitor]}
)
# 获取报告
print(json.dumps(monitor.get_report(), indent=2))
最佳实践 #
1. 选择合适的回调级别 #
python
# 开发阶段 - 详细日志
handler = StdOutCallbackHandler()
# 生产环境 - 关键指标
handler = ProductionMonitor()
2. 避免回调中的阻塞操作 #
python
# 不好的做法
class BadHandler(BaseCallbackHandler):
def on_llm_end(self, response, **kwargs):
time.sleep(1) # 阻塞操作
save_to_db(response)
# 好的做法
class GoodHandler(BaseCallbackHandler):
def on_llm_end(self, response, **kwargs):
# 异步或队列处理
queue.put(response)
3. 错误处理 #
python
class SafeHandler(BaseCallbackHandler):
def on_llm_end(self, response, **kwargs):
try:
# 处理逻辑
pass
except Exception as e:
# 不要让回调错误影响主流程
print(f"回调错误: {e}")
下一步 #
最后更新:2026-03-30