回调系统(Callbacks) #

回调系统是 LangChain 的可观测性基础,让你能够监控、记录和调试 LLM 应用的执行过程。

回调系统概述 #

text
┌─────────────────────────────────────────────────────────────┐
│                    回调的作用                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  执行流程:                                                  │
│  用户请求 ───> 链执行 ───> 模型调用 ───> 返回结果           │
│                    │                                        │
│                    ▼                                        │
│              回调系统                                        │
│                    │                                        │
│     ┌──────────────┼──────────────┐                         │
│     │              │              │                         │
│   日志记录      性能监控       自定义处理                    │
│                                                             │
│  应用场景:                                                  │
│  ✅ 记录执行日志                                            │
│  ✅ 监控性能指标                                            │
│  ✅ 调试问题                                                │
│  ✅ 追踪 Token 使用                                         │
│  ✅ 集成监控系统                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基础回调 #

使用内置回调处理器 #

python
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StdOutCallbackHandler

model = ChatOpenAI(model="gpt-4o-mini")

# 使用标准输出回调
handler = StdOutCallbackHandler()

response = model.invoke(
    "你好",
    config={"callbacks": [handler]}
)

自定义回调处理器 #

python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage

class MyCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器"""
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        """LLM 开始时调用"""
        print(f"[LLM 开始] 提示: {prompts}")
    
    def on_llm_new_token(self, token: str, **kwargs):
        """生成新 token 时调用"""
        print(token, end="", flush=True)
    
    def on_llm_end(self, response, **kwargs):
        """LLM 结束时调用"""
        print("\n[LLM 结束]")
    
    def on_llm_error(self, error, **kwargs):
        """LLM 出错时调用"""
        print(f"[LLM 错误] {error}")
    
    def on_chain_start(self, serialized, inputs, **kwargs):
        """链开始时调用"""
        print(f"[链开始] 输入: {inputs}")
    
    def on_chain_end(self, outputs, **kwargs):
        """链结束时调用"""
        print(f"[链结束] 输出: {outputs}")

# 使用
model = ChatOpenAI(model="gpt-4o-mini")
handler = MyCallbackHandler()

response = model.invoke(
    "讲一个故事",
    config={"callbacks": [handler]}
)

回调事件 #

完整事件列表 #

python
from langchain_core.callbacks import BaseCallbackHandler

class FullCallbackHandler(BaseCallbackHandler):
    """完整的回调处理器"""
    
    # LLM 相关
    def on_llm_start(self, serialized, prompts, **kwargs):
        print("LLM 开始")
    
    def on_llm_new_token(self, token, **kwargs):
        print(f"新 Token: {token}")
    
    def on_llm_end(self, response, **kwargs):
        print("LLM 结束")
    
    def on_llm_error(self, error, **kwargs):
        print(f"LLM 错误: {error}")
    
    # 链相关
    def on_chain_start(self, serialized, inputs, **kwargs):
        print("链开始")
    
    def on_chain_end(self, outputs, **kwargs):
        print("链结束")
    
    def on_chain_error(self, error, **kwargs):
        print(f"链错误: {error}")
    
    # 工具相关
    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"工具开始: {input_str}")
    
    def on_tool_end(self, output, **kwargs):
        print(f"工具结束: {output}")
    
    def on_tool_error(self, error, **kwargs):
        print(f"工具错误: {error}")
    
    # 检索相关
    def on_retriever_start(self, serialized, query, **kwargs):
        print(f"检索开始: {query}")
    
    def on_retriever_end(self, documents, **kwargs):
        print(f"检索结束: {len(documents)} 个文档")
    
    # 提示相关
    def on_prompt_start(self, serialized, prompts, **kwargs):
        print("提示开始")
    
    def on_prompt_end(self, response, **kwargs):
        print("提示结束")

实用回调示例 #

Token 使用追踪 #

python
from langchain_core.callbacks import BaseCallbackHandler

class TokenCounter(BaseCallbackHandler):
    """Token 计数器"""
    
    def __init__(self):
        self.total_tokens = 0
        self.prompt_tokens = 0
        self.completion_tokens = 0
    
    def on_llm_end(self, response, **kwargs):
        """记录 Token 使用"""
        if hasattr(response, 'llm_output') and response.llm_output:
            token_usage = response.llm_output.get('token_usage', {})
            self.prompt_tokens += token_usage.get('prompt_tokens', 0)
            self.completion_tokens += token_usage.get('completion_tokens', 0)
            self.total_tokens += token_usage.get('total_tokens', 0)
    
    def get_stats(self):
        return {
            "total_tokens": self.total_tokens,
            "prompt_tokens": self.prompt_tokens,
            "completion_tokens": self.completion_tokens
        }

# 使用
counter = TokenCounter()
model = ChatOpenAI(model="gpt-4o-mini")

response1 = model.invoke("你好", config={"callbacks": [counter]})
response2 = model.invoke("再见", config={"callbacks": [counter]})

print(f"Token 使用统计: {counter.get_stats()}")

性能监控 #

python
import time
from langchain_core.callbacks import BaseCallbackHandler

class PerformanceMonitor(BaseCallbackHandler):
    """性能监控"""
    
    def __init__(self):
        self.start_times = {}
        self.durations = {}
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        self.start_times['llm'] = time.time()
    
    def on_llm_end(self, response, **kwargs):
        self.durations['llm'] = time.time() - self.start_times.get('llm', 0)
    
    def on_chain_start(self, serialized, inputs, **kwargs):
        self.start_times['chain'] = time.time()
    
    def on_chain_end(self, outputs, **kwargs):
        self.durations['chain'] = time.time() - self.start_times.get('chain', 0)
    
    def get_report(self):
        return {
            "llm_duration": f"{self.durations.get('llm', 0):.2f}s",
            "chain_duration": f"{self.durations.get('chain', 0):.2f}s"
        }

# 使用
monitor = PerformanceMonitor()
model = ChatOpenAI(model="gpt-4o-mini")

response = model.invoke("讲一个故事", config={"callbacks": [monitor]})
print(f"性能报告: {monitor.get_report()}")

文件日志 #

python
from langchain_core.callbacks import BaseCallbackHandler
import json
from datetime import datetime

class FileLogger(BaseCallbackHandler):
    """文件日志记录器"""
    
    def __init__(self, log_file: str = "langchain.log"):
        self.log_file = log_file
    
    def _log(self, event: str, data: dict):
        with open(self.log_file, "a") as f:
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "event": event,
                "data": data
            }
            f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        self._log("llm_start", {"prompts": prompts})
    
    def on_llm_end(self, response, **kwargs):
        self._log("llm_end", {"response": str(response)})
    
    def on_llm_error(self, error, **kwargs):
        self._log("llm_error", {"error": str(error)})
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        self._log("tool_start", {"input": input_str})
    
    def on_tool_end(self, output, **kwargs):
        self._log("tool_end", {"output": str(output)})

# 使用
logger = FileLogger()
model = ChatOpenAI(model="gpt-4o-mini")

response = model.invoke("你好", config={"callbacks": [logger]})

多回调处理器 #

python
from langchain_core.callbacks import StdOutCallbackHandler

# 可以同时使用多个回调
handlers = [
    StdOutCallbackHandler(),
    TokenCounter(),
    PerformanceMonitor(),
    FileLogger()
]

model = ChatOpenAI(model="gpt-4o-mini")

response = model.invoke(
    "讲一个故事",
    config={"callbacks": handlers}
)

异步回调 #

python
from langchain_core.callbacks import AsyncCallbackHandler
import asyncio

class AsyncCallbackHandler(AsyncCallbackHandler):
    """异步回调处理器"""
    
    async def on_llm_start(self, serialized, prompts, **kwargs):
        print("异步 LLM 开始")
    
    async def on_llm_end(self, response, **kwargs):
        print("异步 LLM 结束")
    
    async def on_llm_new_token(self, token, **kwargs):
        print(token, end="", flush=True)

# 使用
async def main():
    handler = AsyncCallbackHandler()
    model = ChatOpenAI(model="gpt-4o-mini")
    
    response = await model.ainvoke(
        "讲一个故事",
        config={"callbacks": [handler]}
    )

asyncio.run(main())

LangSmith 集成 #

LangSmith 是 LangChain 官方的监控平台:

python
import os

# 配置 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"

# 所有执行会自动追踪到 LangSmith
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke("你好")

# 在 LangSmith 中可以查看:
# - 完整的执行链路
# - Token 使用情况
# - 延迟统计
# - 成本分析

回调配置 #

全局回调 #

python
from langchain_core.callbacks import set_handler

# 设置全局回调
set_handler(MyCallbackHandler())

# 之后所有调用都会使用这个回调

运行时回调 #

python
# 每次调用时指定回调
response = model.invoke(
    "你好",
    config={"callbacks": [handler1, handler2]}
)

标签和元数据 #

python
response = model.invoke(
    "你好",
    config={
        "callbacks": [handler],
        "tags": ["production", "chat"],
        "metadata": {
            "user_id": "user-123",
            "session_id": "session-456"
        }
    }
)

完整示例 #

生产级监控 #

python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from datetime import datetime
import json

class ProductionMonitor(BaseCallbackHandler):
    """生产级监控回调"""
    
    def __init__(self):
        self.runs = []
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        self.current_run = {
            "start_time": datetime.now().isoformat(),
            "prompts": prompts,
            "tokens": {"prompt": 0, "completion": 0, "total": 0}
        }
    
    def on_llm_end(self, response, **kwargs):
        self.current_run["end_time"] = datetime.now().isoformat()
        
        # 提取 token 使用
        if hasattr(response, 'llm_output') and response.llm_output:
            usage = response.llm_output.get('token_usage', {})
            self.current_run["tokens"] = {
                "prompt": usage.get('prompt_tokens', 0),
                "completion": usage.get('completion_tokens', 0),
                "total": usage.get('total_tokens', 0)
            }
        
        self.runs.append(self.current_run)
    
    def on_llm_error(self, error, **kwargs):
        self.current_run["error"] = str(error)
        self.runs.append(self.current_run)
    
    def get_report(self):
        total_tokens = sum(r["tokens"]["total"] for r in self.runs)
        errors = sum(1 for r in self.runs if "error" in r)
        
        return {
            "total_runs": len(self.runs),
            "total_tokens": total_tokens,
            "errors": errors,
            "runs": self.runs
        }

# 使用
monitor = ProductionMonitor()
model = ChatOpenAI(model="gpt-4o-mini")

# 模拟多次调用
for i in range(3):
    response = model.invoke(
        f"问题 {i+1}",
        config={"callbacks": [monitor]}
    )

# 获取报告
print(json.dumps(monitor.get_report(), indent=2))

最佳实践 #

1. 选择合适的回调级别 #

python
# 开发阶段 - 详细日志
handler = StdOutCallbackHandler()

# 生产环境 - 关键指标
handler = ProductionMonitor()

2. 避免回调中的阻塞操作 #

python
# 不好的做法
class BadHandler(BaseCallbackHandler):
    def on_llm_end(self, response, **kwargs):
        time.sleep(1)  # 阻塞操作
        save_to_db(response)

# 好的做法
class GoodHandler(BaseCallbackHandler):
    def on_llm_end(self, response, **kwargs):
        # 异步或队列处理
        queue.put(response)

3. 错误处理 #

python
class SafeHandler(BaseCallbackHandler):
    def on_llm_end(self, response, **kwargs):
        try:
            # 处理逻辑
            pass
        except Exception as e:
            # 不要让回调错误影响主流程
            print(f"回调错误: {e}")

下一步 #

最后更新:2026-03-30