流式处理(Streaming) #
流式处理让 LLM 应用能够实时输出内容,而不是等待完整响应。这大大改善了用户体验,特别是在生成长文本时。
流式处理概述 #
text
┌─────────────────────────────────────────────────────────────┐
│ 为什么需要流式? │
├─────────────────────────────────────────────────────────────┤
│ │
│ 非流式输出: │
│ 用户发送请求 ───> 等待... ───> 完整响应 │
│ (用户需要等待很长时间) │
│ │
│ 流式输出: │
│ 用户发送请求 ───> "人" ───> "工" ───> "智" ───> "能" │
│ (用户可以实时看到内容生成) │
│ │
│ 优势: │
│ ✅ 更好的用户体验 │
│ ✅ 降低首字延迟感知 │
│ ✅ 适合长文本生成 │
│ ✅ 支持实时交互 │
│ │
└─────────────────────────────────────────────────────────────┘
基础流式输出 #
同步流式 #
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("写一篇关于{topic}的短文")
parser = StrOutputParser()
chain = prompt | model | parser
# 流式输出
for chunk in chain.stream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
print() # 换行
异步流式 #
python
import asyncio
async def stream_example():
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("写一篇关于{topic}的短文")
chain = prompt | model | StrOutputParser()
# 异步流式输出
async for chunk in chain.astream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
print()
asyncio.run(stream_example())
模型级别的流式 #
python
model = ChatOpenAI(model="gpt-4o-mini")
# 直接在模型上流式
for chunk in model.stream("讲一个故事"):
print(chunk.content, end="", flush=True)
流式输出处理 #
收集流式输出 #
python
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini")
chain = model | StrOutputParser()
# 收集所有块
chunks = []
for chunk in chain.stream("讲一个故事"):
chunks.append(chunk)
print(chunk, end="", flush=True)
full_response = "".join(chunks)
print(f"\n\n完整响应长度: {len(full_response)}")
带回调的流式 #
python
from langchain_core.callbacks import BaseCallbackHandler
class StreamingHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs):
print(token, end="", flush=True)
model = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
callbacks=[StreamingHandler()]
)
response = model.invoke("讲一个故事")
流式与链 #
复杂链的流式 #
python
from langchain_core.runnables import RunnablePassthrough
model = ChatOpenAI(model="gpt-4o-mini")
# 多步骤链
chain = (
{"topic": RunnablePassthrough()}
| ChatPromptTemplate.from_template("写一个关于{topic}的标题")
| model
| StrOutputParser()
)
# 流式输出
for chunk in chain.stream("人工智能"):
print(chunk, end="")
带检索的流式 RAG #
python
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 假设已有向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
["机器学习是AI的分支", "深度学习使用神经网络"],
embeddings
)
retriever = vectorstore.as_retriever()
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("""根据上下文回答问题:
上下文:
{context}
问题:{question}
答案:""")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| prompt
| model
| StrOutputParser()
)
# 流式 RAG
for chunk in chain.stream("什么是机器学习?"):
print(chunk, end="", flush=True)
高级流式模式 #
流式中间步骤 #
python
from langchain_core.callbacks import BaseCallbackHandler
class DetailedStreamingHandler(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
print("开始生成...\n")
def on_llm_new_token(self, token: str, **kwargs):
print(token, end="", flush=True)
def on_llm_end(self, response, **kwargs):
print("\n\n生成完成!")
model = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
callbacks=[DetailedStreamingHandler()]
)
response = model.invoke("讲一个故事")
并行流式处理 #
python
from langchain_core.runnables import RunnableParallel
model = ChatOpenAI(model="gpt-4o-mini")
# 并行生成
chain = RunnableParallel(
story=ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | model | StrOutputParser(),
poem=ChatPromptTemplate.from_template("写一首关于{topic}的诗") | model | StrOutputParser()
)
# 注意:并行流式需要特殊处理
async def parallel_stream():
async for event in chain.astream({"topic": "春天"}):
for key, value in event.items():
print(f"[{key}] {value}", end="", flush=True)
print()
asyncio.run(parallel_stream())
流式代理 #
python
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city}今天晴,25°C"
model = ChatOpenAI(model="gpt-4o-mini")
tools = [get_weather]
agent = create_react_agent(model, tools)
# 流式代理输出
async def stream_agent():
async for event in agent.astream({"messages": [("user", "北京天气怎么样?")]}):
if "agent" in event:
if "messages" in event["agent"]:
for msg in event["agent"]["messages"]:
if hasattr(msg, "content"):
print(msg.content, end="", flush=True)
elif "tools" in event:
print(f"\n[工具调用: {event['tools']['messages']}]")
asyncio.run(stream_agent())
Web 应用中的流式 #
FastAPI 流式 #
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
app = FastAPI()
model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{input}")
chain = prompt | model | StrOutputParser()
@app.post("/chat")
async def chat(input: str):
async def generate():
async for chunk in chain.astream({"input": input}):
yield f"data: {chunk}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
WebSocket 流式 #
python
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
model = ChatOpenAI(model="gpt-4o-mini")
chain = model | StrOutputParser()
while True:
data = await websocket.receive_text()
async for chunk in chain.astream(data):
await websocket.send_text(chunk)
await websocket.send_text("[DONE]")
流式输出格式化 #
Markdown 流式渲染 #
python
def stream_markdown(chain, input_data):
"""流式输出并格式化 Markdown"""
buffer = ""
for chunk in chain.stream(input_data):
buffer += chunk
# 检测代码块
if "```" in buffer:
lines = buffer.split("\n")
for line in lines:
if line.startswith("```"):
print("\n" + line)
else:
print(chunk, end="", flush=True)
else:
print(chunk, end="", flush=True)
打字机效果 #
python
import time
def typewriter_effect(chain, input_data, delay: float = 0.02):
"""打字机效果"""
for chunk in chain.stream(input_data):
print(chunk, end="", flush=True)
time.sleep(delay)
print()
错误处理 #
流式错误处理 #
python
async def safe_stream(chain, input_data):
"""安全的流式输出"""
try:
async for chunk in chain.astream(input_data):
yield chunk
except Exception as e:
yield f"\n\n[错误: {str(e)}]"
# 使用
async def main():
model = ChatOpenAI(model="gpt-4o-mini")
chain = model | StrOutputParser()
async for chunk in safe_stream(chain, "讲一个故事"):
print(chunk, end="", flush=True)
asyncio.run(main())
超时处理 #
python
import asyncio
async def stream_with_timeout(chain, input_data, timeout: float = 30.0):
"""带超时的流式输出"""
try:
async with asyncio.timeout(timeout):
async for chunk in chain.astream(input_data):
print(chunk, end="", flush=True)
except asyncio.TimeoutError:
print("\n\n[超时: 响应时间过长]")
最佳实践 #
1. 选择合适的流式方式 #
python
# 短响应 - 非流式即可
response = chain.invoke({"input": "你好"})
# 长响应 - 使用流式
for chunk in chain.stream({"input": "写一篇长文"}):
print(chunk, end="", flush=True)
2. 合理的缓冲策略 #
python
class BufferedStreamer:
"""带缓冲的流式输出"""
def __init__(self, buffer_size: int = 10):
self.buffer_size = buffer_size
self.buffer = []
def add_chunk(self, chunk: str):
self.buffer.append(chunk)
if len(self.buffer) >= self.buffer_size:
self.flush()
def flush(self):
print("".join(self.buffer), end="", flush=True)
self.buffer = []
3. 优雅的终止 #
python
import signal
class GracefulStreamer:
"""可中断的流式输出"""
def __init__(self):
self.should_stop = False
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signum, frame):
print("\n\n[用户中断]")
self.should_stop = True
def stream(self, chain, input_data):
for chunk in chain.stream(input_data):
if self.should_stop:
break
print(chunk, end="", flush=True)
下一步 #
最后更新:2026-03-30