流式处理(Streaming) #

流式处理让 LLM 应用能够实时输出内容,而不是等待完整响应。这大大改善了用户体验,特别是在生成长文本时。

流式处理概述 #

text
┌─────────────────────────────────────────────────────────────┐
│                    为什么需要流式?                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  非流式输出:                                                │
│  用户发送请求 ───> 等待... ───> 完整响应                    │
│  (用户需要等待很长时间)                                    │
│                                                             │
│  流式输出:                                                  │
│  用户发送请求 ───> "人" ───> "工" ───> "智" ───> "能"      │
│  (用户可以实时看到内容生成)                                │
│                                                             │
│  优势:                                                     │
│  ✅ 更好的用户体验                                          │
│  ✅ 降低首字延迟感知                                        │
│  ✅ 适合长文本生成                                          │
│  ✅ 支持实时交互                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基础流式输出 #

同步流式 #

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("写一篇关于{topic}的短文")
parser = StrOutputParser()

chain = prompt | model | parser

# 流式输出
for chunk in chain.stream({"topic": "人工智能"}):
    print(chunk, end="", flush=True)

print()  # 换行

异步流式 #

python
import asyncio

async def stream_example():
    model = ChatOpenAI(model="gpt-4o-mini")
    prompt = ChatPromptTemplate.from_template("写一篇关于{topic}的短文")
    chain = prompt | model | StrOutputParser()
    
    # 异步流式输出
    async for chunk in chain.astream({"topic": "人工智能"}):
        print(chunk, end="", flush=True)
    
    print()

asyncio.run(stream_example())

模型级别的流式 #

python
model = ChatOpenAI(model="gpt-4o-mini")

# 直接在模型上流式
for chunk in model.stream("讲一个故事"):
    print(chunk.content, end="", flush=True)

流式输出处理 #

收集流式输出 #

python
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini")
chain = model | StrOutputParser()

# 收集所有块
chunks = []
for chunk in chain.stream("讲一个故事"):
    chunks.append(chunk)
    print(chunk, end="", flush=True)

full_response = "".join(chunks)
print(f"\n\n完整响应长度: {len(full_response)}")

带回调的流式 #

python
from langchain_core.callbacks import BaseCallbackHandler

class StreamingHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)

model = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    callbacks=[StreamingHandler()]
)

response = model.invoke("讲一个故事")

流式与链 #

复杂链的流式 #

python
from langchain_core.runnables import RunnablePassthrough

model = ChatOpenAI(model="gpt-4o-mini")

# 多步骤链
chain = (
    {"topic": RunnablePassthrough()}
    | ChatPromptTemplate.from_template("写一个关于{topic}的标题")
    | model
    | StrOutputParser()
)

# 流式输出
for chunk in chain.stream("人工智能"):
    print(chunk, end="")

带检索的流式 RAG #

python
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# 假设已有向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
    ["机器学习是AI的分支", "深度学习使用神经网络"],
    embeddings
)
retriever = vectorstore.as_retriever()

model = ChatOpenAI(model="gpt-4o-mini")

prompt = ChatPromptTemplate.from_template("""根据上下文回答问题:

上下文:
{context}

问题:{question}

答案:""")

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | prompt
    | model
    | StrOutputParser()
)

# 流式 RAG
for chunk in chain.stream("什么是机器学习?"):
    print(chunk, end="", flush=True)

高级流式模式 #

流式中间步骤 #

python
from langchain_core.callbacks import BaseCallbackHandler

class DetailedStreamingHandler(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        print("开始生成...\n")
    
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end="", flush=True)
    
    def on_llm_end(self, response, **kwargs):
        print("\n\n生成完成!")

model = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    callbacks=[DetailedStreamingHandler()]
)

response = model.invoke("讲一个故事")

并行流式处理 #

python
from langchain_core.runnables import RunnableParallel

model = ChatOpenAI(model="gpt-4o-mini")

# 并行生成
chain = RunnableParallel(
    story=ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | model | StrOutputParser(),
    poem=ChatPromptTemplate.from_template("写一首关于{topic}的诗") | model | StrOutputParser()
)

# 注意:并行流式需要特殊处理
async def parallel_stream():
    async for event in chain.astream({"topic": "春天"}):
        for key, value in event.items():
            print(f"[{key}] {value}", end="", flush=True)
        print()

asyncio.run(parallel_stream())

流式代理 #

python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool

@tool
def get_weather(city: str) -> str:
    """获取天气"""
    return f"{city}今天晴,25°C"

model = ChatOpenAI(model="gpt-4o-mini")
tools = [get_weather]

agent = create_react_agent(model, tools)

# 流式代理输出
async def stream_agent():
    async for event in agent.astream({"messages": [("user", "北京天气怎么样?")]}):
        if "agent" in event:
            if "messages" in event["agent"]:
                for msg in event["agent"]["messages"]:
                    if hasattr(msg, "content"):
                        print(msg.content, end="", flush=True)
        elif "tools" in event:
            print(f"\n[工具调用: {event['tools']['messages']}]")

asyncio.run(stream_agent())

Web 应用中的流式 #

FastAPI 流式 #

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI()

model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{input}")
chain = prompt | model | StrOutputParser()

@app.post("/chat")
async def chat(input: str):
    async def generate():
        async for chunk in chain.astream({"input": input}):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

WebSocket 流式 #

python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    model = ChatOpenAI(model="gpt-4o-mini")
    chain = model | StrOutputParser()
    
    while True:
        data = await websocket.receive_text()
        
        async for chunk in chain.astream(data):
            await websocket.send_text(chunk)
        
        await websocket.send_text("[DONE]")

流式输出格式化 #

Markdown 流式渲染 #

python
def stream_markdown(chain, input_data):
    """流式输出并格式化 Markdown"""
    buffer = ""
    
    for chunk in chain.stream(input_data):
        buffer += chunk
        
        # 检测代码块
        if "```" in buffer:
            lines = buffer.split("\n")
            for line in lines:
                if line.startswith("```"):
                    print("\n" + line)
                else:
                    print(chunk, end="", flush=True)
        else:
            print(chunk, end="", flush=True)

打字机效果 #

python
import time

def typewriter_effect(chain, input_data, delay: float = 0.02):
    """打字机效果"""
    for chunk in chain.stream(input_data):
        print(chunk, end="", flush=True)
        time.sleep(delay)
    print()

错误处理 #

流式错误处理 #

python
async def safe_stream(chain, input_data):
    """安全的流式输出"""
    try:
        async for chunk in chain.astream(input_data):
            yield chunk
    except Exception as e:
        yield f"\n\n[错误: {str(e)}]"

# 使用
async def main():
    model = ChatOpenAI(model="gpt-4o-mini")
    chain = model | StrOutputParser()
    
    async for chunk in safe_stream(chain, "讲一个故事"):
        print(chunk, end="", flush=True)

asyncio.run(main())

超时处理 #

python
import asyncio

async def stream_with_timeout(chain, input_data, timeout: float = 30.0):
    """带超时的流式输出"""
    try:
        async with asyncio.timeout(timeout):
            async for chunk in chain.astream(input_data):
                print(chunk, end="", flush=True)
    except asyncio.TimeoutError:
        print("\n\n[超时: 响应时间过长]")

最佳实践 #

1. 选择合适的流式方式 #

python
# 短响应 - 非流式即可
response = chain.invoke({"input": "你好"})

# 长响应 - 使用流式
for chunk in chain.stream({"input": "写一篇长文"}):
    print(chunk, end="", flush=True)

2. 合理的缓冲策略 #

python
class BufferedStreamer:
    """带缓冲的流式输出"""
    
    def __init__(self, buffer_size: int = 10):
        self.buffer_size = buffer_size
        self.buffer = []
    
    def add_chunk(self, chunk: str):
        self.buffer.append(chunk)
        if len(self.buffer) >= self.buffer_size:
            self.flush()
    
    def flush(self):
        print("".join(self.buffer), end="", flush=True)
        self.buffer = []

3. 优雅的终止 #

python
import signal

class GracefulStreamer:
    """可中断的流式输出"""
    
    def __init__(self):
        self.should_stop = False
        signal.signal(signal.SIGINT, self._handle_signal)
    
    def _handle_signal(self, signum, frame):
        print("\n\n[用户中断]")
        self.should_stop = True
    
    def stream(self, chain, input_data):
        for chunk in chain.stream(input_data):
            if self.should_stop:
                break
            print(chunk, end="", flush=True)

下一步 #

最后更新:2026-03-30