链(Chains) #

链是 LangChain 中组合多个组件的核心方式。通过链,你可以将模型、提示、解析器等组件串联成复杂的工作流。

LCEL 概述 #

LangChain Expression Language (LCEL) 是一种声明式语法,用于组合组件。

text
┌─────────────────────────────────────────────────────────────┐
│                    LCEL 核心概念                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  组件 = Runnable(可运行对象)                               │
│                                                             │
│  所有组件都实现 Runnable 接口:                              │
│  - invoke: 同步调用                                         │
│  - stream: 流式输出                                         │
│  - batch: 批量处理                                          │
│  - ainvoke/astream/abatch: 异步版本                         │
│                                                             │
│  使用 | 操作符组合:                                         │
│  chain = prompt | model | parser                            │
│                                                             │
│  特性:                                                     │
│  ✅ 自动类型推断                                            │
│  ✅ 流式支持                                                │
│  ✅ 异步支持                                                │
│  ✅ 并行执行                                                │
│  ✅ 错误处理                                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基础链 #

简单链 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义组件
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
parser = StrOutputParser()

# 组合成链
chain = prompt | model | parser

# 调用
result = chain.invoke({"topic": "程序员"})
print(result)
text
┌─────────────────────────────────────────────────────────────┐
│                    简单链数据流                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  {"topic": "程序员"}                                        │
│        │                                                    │
│        ▼                                                    │
│  ┌─────────┐                                                │
│  │ prompt  │  "讲一个关于程序员的笑话"                       │
│  └────┬────┘                                                │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────┐                                                │
│  │  model  │  AIMessage(content="...")                      │
│  └────┬────┘                                                │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────┐                                                │
│  │ parser  │  "为什么程序员..."                              │
│  └─────────┘                                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

多步骤链 #

python
from langchain_core.runnables import RunnablePassthrough

# 定义多个步骤
model = ChatOpenAI(model="gpt-4o-mini")

# 步骤1:生成主题
topic_prompt = ChatPromptTemplate.from_template("生成一个关于{domain}的主题")
topic_chain = topic_prompt | model | StrOutputParser()

# 步骤2:生成内容
content_prompt = ChatPromptTemplate.from_template("写一段关于{topic}的介绍")

# 组合
chain = (
    {"topic": topic_chain, "domain": RunnablePassthrough()}
    | content_prompt
    | model
    | StrOutputParser()
)

result = chain.invoke("人工智能")

Runnable 接口 #

所有 LCEL 组件都实现 Runnable 接口,提供统一的调用方式。

调用方法 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("你好,{name}")
chain = prompt | model

# 同步调用
result = chain.invoke({"name": "张三"})

# 流式输出
for chunk in chain.stream({"name": "张三"}):
    print(chunk.content, end="")

# 批量处理
results = chain.batch([
    {"name": "张三"},
    {"name": "李四"},
    {"name": "王五"}
])

# 异步调用
import asyncio

async def main():
    result = await chain.ainvoke({"name": "张三"})
    
    async for chunk in chain.astream({"name": "张三"}):
        print(chunk.content, end="")
    
    results = await chain.abatch([
        {"name": "张三"},
        {"name": "李四"}
    ])

asyncio.run(main())

RunnablePassthrough #

python
from langchain_core.runnables import RunnablePassthrough

# 传递输入
chain = RunnablePassthrough()
result = chain.invoke("hello")  # "hello"

# 组合使用
chain = {
    "original": RunnablePassthrough(),
    "processed": RunnablePassthrough() | (lambda x: x.upper())
}

result = chain.invoke("hello")
# {"original": "hello", "processed": "HELLO"}

RunnableLambda #

python
from langchain_core.runnables import RunnableLambda

# 自定义函数
def parse_length(text: str) -> int:
    return len(text)

chain = RunnableLambda(parse_length)
result = chain.invoke("hello")  # 5

# 在链中使用
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{topic}")

chain = (
    prompt 
    | model 
    | StrOutputParser()
    | RunnableLambda(lambda x: f"长度: {len(x)}")
)

RunnableParallel #

python
from langchain_core.runnables import RunnableParallel

model = ChatOpenAI(model="gpt-4o-mini")

# 并行执行多个任务
chain = RunnableParallel(
    summary=ChatPromptTemplate.from_template("总结: {text}") | model | StrOutputParser(),
    keywords=ChatPromptTemplate.from_template("提取关键词: {text}") | model | StrOutputParser(),
    sentiment=ChatPromptTemplate.from_template("情感分析: {text}") | model | StrOutputParser(),
)

result = chain.invoke({"text": "今天天气真好,心情很愉快!"})
# {
#     "summary": "...",
#     "keywords": "...",
#     "sentiment": "..."
# }
text
┌─────────────────────────────────────────────────────────────┐
│                    并行执行示意                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  输入: {"text": "..."}                                      │
│              │                                              │
│              ▼                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                 RunnableParallel                     │   │
│  │                                                      │   │
│  │   summary ──────> [chain1] ──────> "摘要结果"        │   │
│  │                                                      │   │
│  │   keywords ─────> [chain2] ──────> "关键词结果"      │   │
│  │                                                      │   │
│  │   sentiment ────> [chain3] ──────> "情感结果"        │   │
│  │                                                      │   │
│  └─────────────────────────────────────────────────────┘   │
│              │                                              │
│              ▼                                              │
│  {"summary": "...", "keywords": "...", "sentiment": "..."} │
│                                                             │
└─────────────────────────────────────────────────────────────┘

常用链模式 #

1. 问答链 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini")

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个{role},请用专业但易懂的语言回答问题"),
    ("human", "{question}")
])

qa_chain = prompt | model | StrOutputParser()

result = qa_chain.invoke({
    "role": "Python 专家",
    "question": "什么是装饰器?"
})

2. 翻译链 #

python
translate_prompt = ChatPromptTemplate.from_template(
    "将以下{source_lang}文本翻译成{target_lang}:\n\n{text}"
)

translate_chain = translate_prompt | model | StrOutputParser()

result = translate_chain.invoke({
    "source_lang": "中文",
    "target_lang": "英文",
    "text": "你好,世界!"
})

3. 摘要链 #

python
summary_prompt = ChatPromptTemplate.from_template(
    "请用{length}字总结以下内容:\n\n{content}"
)

summary_chain = summary_prompt | model | StrOutputParser()

result = summary_chain.invoke({
    "length": "100",
    "content": "长文本内容..."
})

4. 分类链 #

python
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel
from typing import Literal

class Classification(BaseModel):
    category: Literal["技术", "娱乐", "体育", "财经", "其他"]
    confidence: float
    keywords: list[str]

parser = JsonOutputParser(pydantic_object=Classification)

classify_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个文本分类专家。{format_instructions}"),
    ("human", "请分类以下文本:\n\n{text}")
])

classify_chain = classify_prompt | model | parser

result = classify_chain.invoke({
    "text": "苹果公司发布了最新款 iPhone...",
    "format_instructions": parser.get_format_instructions()
})

5. 信息提取链 #

python
from pydantic import BaseModel, Field

class PersonInfo(BaseModel):
    name: str = Field(description="姓名")
    age: int | None = Field(description="年龄")
    occupation: str | None = Field(description="职业")
    location: str | None = Field(description="地点")

parser = JsonOutputParser(pydantic_object=PersonInfo)

extract_prompt = ChatPromptTemplate.from_messages([
    ("system", "从文本中提取人物信息。{format_instructions}"),
    ("human", "{text}")
])

extract_chain = extract_prompt | model | parser

result = extract_chain.invoke({
    "text": "张三今年28岁,是一名软件工程师,目前在北京工作。",
    "format_instructions": parser.get_format_instructions()
})

条件路由 #

RunnableBranch #

python
from langchain_core.runnables import RunnableBranch

model = ChatOpenAI(model="gpt-4o-mini")

# 不同类型的处理链
code_prompt = ChatPromptTemplate.from_template(
    "你是代码专家,请解释这段代码:\n{input}"
)
text_prompt = ChatPromptTemplate.from_template(
    "请分析这段文本:\n{input}"
)
math_prompt = ChatPromptTemplate.from_template(
    "请解答这个数学问题:\n{input}"
)

code_chain = code_prompt | model | StrOutputParser()
text_chain = text_prompt | model | StrOutputParser()
math_chain = math_prompt | model | StrOutputParser()

# 条件路由
def classify_input(input_dict):
    text = input_dict["input"]
    if "def " in text or "function" in text or "class " in text:
        return "code"
    elif any(c in text for c in "+-*/=∫∑"):
        return "math"
    else:
        return "text"

branch = RunnableBranch(
    (lambda x: classify_input(x) == "code", code_chain),
    (lambda x: classify_input(x) == "math", math_chain),
    text_chain,  # 默认
)

chain = branch

result = chain.invoke({"input": "def hello(): print('hello')"})
text
┌─────────────────────────────────────────────────────────────┐
│                    条件路由流程                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  输入: {"input": "def hello(): ..."}                        │
│              │                                              │
│              ▼                                              │
│  ┌─────────────────────┐                                    │
│  │   条件判断          │                                    │
│  │                     │                                    │
│  │  包含 "def "? ──────┼──> code_chain                      │
│  │                     │                                    │
│  │  包含数学符号? ─────┼──> math_chain                      │
│  │                     │                                    │
│  │  其他 ──────────────┼──> text_chain                      │
│  │                     │                                    │
│  └─────────────────────┘                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

链的配置 #

运行时配置 #

python
from langchain_core.runnables import ConfigurableField

model = ChatOpenAI(model="gpt-4o-mini").configurable_fields(
    temperature=ConfigurableField(
        id="temperature",
        name="Temperature",
        description="模型温度"
    )
)

prompt = ChatPromptTemplate.from_template("{topic}")
chain = prompt | model | StrOutputParser()

# 使用默认配置
result = chain.invoke({"topic": "讲个笑话"})

# 运行时修改配置
result = chain.invoke(
    {"topic": "讲个笑话"},
    config={"configurable": {"temperature": 0.9}}
)

备选方案 #

python
from langchain_core.runnables import RunnableFallback

primary_model = ChatOpenAI(model="gpt-4o-mini")
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")

prompt = ChatPromptTemplate.from_template("{input}")

chain = prompt | primary_model.with_fallback([fallback_model]) | StrOutputParser()

# 如果 primary_model 失败,自动切换到 fallback_model
result = chain.invoke({"input": "Hello"})

重试机制 #

python
chain = (
    prompt 
    | model.with_retry(
        stop_after_attempt=3,
        wait_exponential_jitter=True,
        retry_if_exception_type=(Exception,)
    ) 
    | StrOutputParser()
)

完整示例 #

文档处理链 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from pydantic import BaseModel
from typing import List

model = ChatOpenAI(model="gpt-4o-mini")

# 定义输出结构
class DocumentAnalysis(BaseModel):
    summary: str
    key_points: List[str]
    sentiment: str
    word_count: int

# 步骤1:预处理
def preprocess(text: str) -> dict:
    return {
        "text": text.strip(),
        "length": len(text)
    }

# 步骤2:并行分析
analysis_chain = RunnableParallel(
    summary=ChatPromptTemplate.from_template("用一句话总结:{text}") 
             | model | StrOutputParser(),
    
    key_points=ChatPromptTemplate.from_template("提取3个要点:{text}") 
               | model | StrOutputParser(),
    
    sentiment=ChatPromptTemplate.from_template("分析情感(积极/消极/中性):{text}") 
              | model | StrOutputParser()
)

# 步骤3:格式化输出
def format_output(data: dict) -> dict:
    return {
        "summary": data["summary"],
        "key_points": data["key_points"].split("\n"),
        "sentiment": data["sentiment"],
        "word_count": data["length"]
    }

# 完整链
chain = (
    RunnablePassthrough()
    | RunnableLambda(preprocess)
    | RunnablePassthrough.assign(**analysis_chain)
    | RunnableLambda(format_output)
)

# 使用
result = chain.invoke("长文本内容...")

多轮对话链 #

python
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

model = ChatOpenAI(model="gpt-4o-mini")

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的助手"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

# 简单的记忆管理
history = []

def chat(input_text: str) -> str:
    chain = prompt | model | StrOutputParser()
    
    result = chain.invoke({
        "history": history,
        "input": input_text
    })
    
    # 更新历史
    history.append(HumanMessage(content=input_text))
    history.append(AIMessage(content=result))
    
    return result

# 使用
print(chat("你好!"))
print(chat("我刚才说了什么?"))

最佳实践 #

1. 模块化设计 #

python
# 将常用链封装成函数
def create_summary_chain(model):
    prompt = ChatPromptTemplate.from_template("总结:{text}")
    return prompt | model | StrOutputParser()

def create_translate_chain(model):
    prompt = ChatPromptTemplate.from_template(
        "翻译成{lang}:{text}"
    )
    return prompt | model | StrOutputParser()

2. 错误处理 #

python
from langchain_core.runnables import RunnableLambda

def safe_parse(output):
    try:
        return json.loads(output)
    except:
        return {"error": "解析失败", "raw": output}

chain = (
    prompt 
    | model 
    | StrOutputParser()
    | RunnableLambda(safe_parse)
)

3. 日志记录 #

python
from langchain_core.callbacks import StdOutCallbackHandler

chain.invoke(
    {"input": "Hello"},
    config={"callbacks": [StdOutCallbackHandler()]}
)

下一步 #

最后更新:2026-03-30