链(Chains) #
链是 LangChain 中组合多个组件的核心方式。通过链,你可以将模型、提示、解析器等组件串联成复杂的工作流。
LCEL 概述 #
LangChain Expression Language (LCEL) 是一种声明式语法,用于组合组件。
text
┌─────────────────────────────────────────────────────────────┐
│ LCEL 核心概念 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 组件 = Runnable(可运行对象) │
│ │
│ 所有组件都实现 Runnable 接口: │
│ - invoke: 同步调用 │
│ - stream: 流式输出 │
│ - batch: 批量处理 │
│ - ainvoke/astream/abatch: 异步版本 │
│ │
│ 使用 | 操作符组合: │
│ chain = prompt | model | parser │
│ │
│ 特性: │
│ ✅ 自动类型推断 │
│ ✅ 流式支持 │
│ ✅ 异步支持 │
│ ✅ 并行执行 │
│ ✅ 错误处理 │
│ │
└─────────────────────────────────────────────────────────────┘
基础链 #
简单链 #
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义组件
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
parser = StrOutputParser()
# 组合成链
chain = prompt | model | parser
# 调用
result = chain.invoke({"topic": "程序员"})
print(result)
text
┌─────────────────────────────────────────────────────────────┐
│ 简单链数据流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ {"topic": "程序员"} │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ prompt │ "讲一个关于程序员的笑话" │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ model │ AIMessage(content="...") │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ parser │ "为什么程序员..." │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
多步骤链 #
python
from langchain_core.runnables import RunnablePassthrough
# 定义多个步骤
model = ChatOpenAI(model="gpt-4o-mini")
# 步骤1:生成主题
topic_prompt = ChatPromptTemplate.from_template("生成一个关于{domain}的主题")
topic_chain = topic_prompt | model | StrOutputParser()
# 步骤2:生成内容
content_prompt = ChatPromptTemplate.from_template("写一段关于{topic}的介绍")
# 组合
chain = (
{"topic": topic_chain, "domain": RunnablePassthrough()}
| content_prompt
| model
| StrOutputParser()
)
result = chain.invoke("人工智能")
Runnable 接口 #
所有 LCEL 组件都实现 Runnable 接口,提供统一的调用方式。
调用方法 #
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("你好,{name}")
chain = prompt | model
# 同步调用
result = chain.invoke({"name": "张三"})
# 流式输出
for chunk in chain.stream({"name": "张三"}):
print(chunk.content, end="")
# 批量处理
results = chain.batch([
{"name": "张三"},
{"name": "李四"},
{"name": "王五"}
])
# 异步调用
import asyncio
async def main():
result = await chain.ainvoke({"name": "张三"})
async for chunk in chain.astream({"name": "张三"}):
print(chunk.content, end="")
results = await chain.abatch([
{"name": "张三"},
{"name": "李四"}
])
asyncio.run(main())
RunnablePassthrough #
python
from langchain_core.runnables import RunnablePassthrough
# 传递输入
chain = RunnablePassthrough()
result = chain.invoke("hello") # "hello"
# 组合使用
chain = {
"original": RunnablePassthrough(),
"processed": RunnablePassthrough() | (lambda x: x.upper())
}
result = chain.invoke("hello")
# {"original": "hello", "processed": "HELLO"}
RunnableLambda #
python
from langchain_core.runnables import RunnableLambda
# 自定义函数
def parse_length(text: str) -> int:
return len(text)
chain = RunnableLambda(parse_length)
result = chain.invoke("hello") # 5
# 在链中使用
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{topic}")
chain = (
prompt
| model
| StrOutputParser()
| RunnableLambda(lambda x: f"长度: {len(x)}")
)
RunnableParallel #
python
from langchain_core.runnables import RunnableParallel
model = ChatOpenAI(model="gpt-4o-mini")
# 并行执行多个任务
chain = RunnableParallel(
summary=ChatPromptTemplate.from_template("总结: {text}") | model | StrOutputParser(),
keywords=ChatPromptTemplate.from_template("提取关键词: {text}") | model | StrOutputParser(),
sentiment=ChatPromptTemplate.from_template("情感分析: {text}") | model | StrOutputParser(),
)
result = chain.invoke({"text": "今天天气真好,心情很愉快!"})
# {
# "summary": "...",
# "keywords": "...",
# "sentiment": "..."
# }
text
┌─────────────────────────────────────────────────────────────┐
│ 并行执行示意 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 输入: {"text": "..."} │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ RunnableParallel │ │
│ │ │ │
│ │ summary ──────> [chain1] ──────> "摘要结果" │ │
│ │ │ │
│ │ keywords ─────> [chain2] ──────> "关键词结果" │ │
│ │ │ │
│ │ sentiment ────> [chain3] ──────> "情感结果" │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ {"summary": "...", "keywords": "...", "sentiment": "..."} │
│ │
└─────────────────────────────────────────────────────────────┘
常用链模式 #
1. 问答链 #
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role},请用专业但易懂的语言回答问题"),
("human", "{question}")
])
qa_chain = prompt | model | StrOutputParser()
result = qa_chain.invoke({
"role": "Python 专家",
"question": "什么是装饰器?"
})
2. 翻译链 #
python
translate_prompt = ChatPromptTemplate.from_template(
"将以下{source_lang}文本翻译成{target_lang}:\n\n{text}"
)
translate_chain = translate_prompt | model | StrOutputParser()
result = translate_chain.invoke({
"source_lang": "中文",
"target_lang": "英文",
"text": "你好,世界!"
})
3. 摘要链 #
python
summary_prompt = ChatPromptTemplate.from_template(
"请用{length}字总结以下内容:\n\n{content}"
)
summary_chain = summary_prompt | model | StrOutputParser()
result = summary_chain.invoke({
"length": "100",
"content": "长文本内容..."
})
4. 分类链 #
python
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel
from typing import Literal
class Classification(BaseModel):
category: Literal["技术", "娱乐", "体育", "财经", "其他"]
confidence: float
keywords: list[str]
parser = JsonOutputParser(pydantic_object=Classification)
classify_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个文本分类专家。{format_instructions}"),
("human", "请分类以下文本:\n\n{text}")
])
classify_chain = classify_prompt | model | parser
result = classify_chain.invoke({
"text": "苹果公司发布了最新款 iPhone...",
"format_instructions": parser.get_format_instructions()
})
5. 信息提取链 #
python
from pydantic import BaseModel, Field
class PersonInfo(BaseModel):
name: str = Field(description="姓名")
age: int | None = Field(description="年龄")
occupation: str | None = Field(description="职业")
location: str | None = Field(description="地点")
parser = JsonOutputParser(pydantic_object=PersonInfo)
extract_prompt = ChatPromptTemplate.from_messages([
("system", "从文本中提取人物信息。{format_instructions}"),
("human", "{text}")
])
extract_chain = extract_prompt | model | parser
result = extract_chain.invoke({
"text": "张三今年28岁,是一名软件工程师,目前在北京工作。",
"format_instructions": parser.get_format_instructions()
})
条件路由 #
RunnableBranch #
python
from langchain_core.runnables import RunnableBranch
model = ChatOpenAI(model="gpt-4o-mini")
# 不同类型的处理链
code_prompt = ChatPromptTemplate.from_template(
"你是代码专家,请解释这段代码:\n{input}"
)
text_prompt = ChatPromptTemplate.from_template(
"请分析这段文本:\n{input}"
)
math_prompt = ChatPromptTemplate.from_template(
"请解答这个数学问题:\n{input}"
)
code_chain = code_prompt | model | StrOutputParser()
text_chain = text_prompt | model | StrOutputParser()
math_chain = math_prompt | model | StrOutputParser()
# 条件路由
def classify_input(input_dict):
text = input_dict["input"]
if "def " in text or "function" in text or "class " in text:
return "code"
elif any(c in text for c in "+-*/=∫∑"):
return "math"
else:
return "text"
branch = RunnableBranch(
(lambda x: classify_input(x) == "code", code_chain),
(lambda x: classify_input(x) == "math", math_chain),
text_chain, # 默认
)
chain = branch
result = chain.invoke({"input": "def hello(): print('hello')"})
text
┌─────────────────────────────────────────────────────────────┐
│ 条件路由流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 输入: {"input": "def hello(): ..."} │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 条件判断 │ │
│ │ │ │
│ │ 包含 "def "? ──────┼──> code_chain │
│ │ │ │
│ │ 包含数学符号? ─────┼──> math_chain │
│ │ │ │
│ │ 其他 ──────────────┼──> text_chain │
│ │ │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
链的配置 #
运行时配置 #
python
from langchain_core.runnables import ConfigurableField
model = ChatOpenAI(model="gpt-4o-mini").configurable_fields(
temperature=ConfigurableField(
id="temperature",
name="Temperature",
description="模型温度"
)
)
prompt = ChatPromptTemplate.from_template("{topic}")
chain = prompt | model | StrOutputParser()
# 使用默认配置
result = chain.invoke({"topic": "讲个笑话"})
# 运行时修改配置
result = chain.invoke(
{"topic": "讲个笑话"},
config={"configurable": {"temperature": 0.9}}
)
备选方案 #
python
from langchain_core.runnables import RunnableFallback
primary_model = ChatOpenAI(model="gpt-4o-mini")
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("{input}")
chain = prompt | primary_model.with_fallback([fallback_model]) | StrOutputParser()
# 如果 primary_model 失败,自动切换到 fallback_model
result = chain.invoke({"input": "Hello"})
重试机制 #
python
chain = (
prompt
| model.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
retry_if_exception_type=(Exception,)
)
| StrOutputParser()
)
完整示例 #
文档处理链 #
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from pydantic import BaseModel
from typing import List
model = ChatOpenAI(model="gpt-4o-mini")
# 定义输出结构
class DocumentAnalysis(BaseModel):
summary: str
key_points: List[str]
sentiment: str
word_count: int
# 步骤1:预处理
def preprocess(text: str) -> dict:
return {
"text": text.strip(),
"length": len(text)
}
# 步骤2:并行分析
analysis_chain = RunnableParallel(
summary=ChatPromptTemplate.from_template("用一句话总结:{text}")
| model | StrOutputParser(),
key_points=ChatPromptTemplate.from_template("提取3个要点:{text}")
| model | StrOutputParser(),
sentiment=ChatPromptTemplate.from_template("分析情感(积极/消极/中性):{text}")
| model | StrOutputParser()
)
# 步骤3:格式化输出
def format_output(data: dict) -> dict:
return {
"summary": data["summary"],
"key_points": data["key_points"].split("\n"),
"sentiment": data["sentiment"],
"word_count": data["length"]
}
# 完整链
chain = (
RunnablePassthrough()
| RunnableLambda(preprocess)
| RunnablePassthrough.assign(**analysis_chain)
| RunnableLambda(format_output)
)
# 使用
result = chain.invoke("长文本内容...")
多轮对话链 #
python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的助手"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
# 简单的记忆管理
history = []
def chat(input_text: str) -> str:
chain = prompt | model | StrOutputParser()
result = chain.invoke({
"history": history,
"input": input_text
})
# 更新历史
history.append(HumanMessage(content=input_text))
history.append(AIMessage(content=result))
return result
# 使用
print(chat("你好!"))
print(chat("我刚才说了什么?"))
最佳实践 #
1. 模块化设计 #
python
# 将常用链封装成函数
def create_summary_chain(model):
prompt = ChatPromptTemplate.from_template("总结:{text}")
return prompt | model | StrOutputParser()
def create_translate_chain(model):
prompt = ChatPromptTemplate.from_template(
"翻译成{lang}:{text}"
)
return prompt | model | StrOutputParser()
2. 错误处理 #
python
from langchain_core.runnables import RunnableLambda
def safe_parse(output):
try:
return json.loads(output)
except:
return {"error": "解析失败", "raw": output}
chain = (
prompt
| model
| StrOutputParser()
| RunnableLambda(safe_parse)
)
3. 日志记录 #
python
from langchain_core.callbacks import StdOutCallbackHandler
chain.invoke(
{"input": "Hello"},
config={"callbacks": [StdOutCallbackHandler()]}
)
下一步 #
- 记忆(Memory) - 管理对话状态
- 代理(Agents) - 让模型自主决策
- 检索增强生成 - 构建知识库应用
最后更新:2026-03-30