OpenAI 流式响应 #
什么是流式响应? #
流式响应(Streaming)是一种数据传输方式,服务器将响应分成多个小块逐步发送,而不是等待全部内容生成完毕后一次性返回。
text
┌─────────────────────────────────────────────────────────────┐
│ 传统响应 vs 流式响应 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 传统响应: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 用户请求 ───> 等待... ───> 等待... ───> 完整响应 │ │
│ └─────────────────────────────────────────────────────┘ │
│ 特点:用户需要等待全部内容生成完毕 │
│ │
│ 流式响应: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 用户请求 ───> "你" ───> "好" ───> "!" ───> ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ 特点:实时显示,用户立即看到输出 │
│ │
└─────────────────────────────────────────────────────────────┘
流式响应的优势 #
text
┌─────────────────────────────────────────────────────────────┐
│ 流式响应优势 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ✅ 更好的用户体验 │
│ - 立即看到响应开始 │
│ - 类似打字机的效果 │
│ - 减少等待焦虑 │
│ │
│ ✅ 降低首字延迟 │
│ - 用户更快看到第一个字 │
│ - 感知响应更快 │
│ │
│ ✅ 支持长文本生成 │
│ - 不需要等待全部生成 │
│ - 可以提前显示内容 │
│ │
│ ✅ 更好的资源利用 │
│ - 边生成边处理 │
│ - 可以提前中断 │
│ │
└─────────────────────────────────────────────────────────────┘
基本用法 #
Python 流式响应 #
python
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "写一个关于人工智能的故事"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Node.js 流式响应 #
javascript
import OpenAI from 'openai';
const client = new OpenAI();
async function streamChat() {
const stream = await client.chat.completions.create({
model: 'gpt-4o-mini',
messages: [
{ role: 'user', content: '写一个关于人工智能的故事' }
],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}
}
streamChat();
cURL 流式请求 #
bash
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "你好"}],
"stream": true
}'
流式响应结构 #
SSE 数据格式 #
text
┌─────────────────────────────────────────────────────────────┐
│ Server-Sent Events 格式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ data: {"id":"chatcmpl-xxx","choices":[{"delta":{"role": │
│ "assistant"},"index":0}]}} │
│ │
│ data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│ "你"},"index":0}]}} │
│ │
│ data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│ "好"},"index":0}]}} │
│ │
│ data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│ "!"},"index":0}]}} │
│ │
│ data: [DONE] │
│ │
└─────────────────────────────────────────────────────────────┘
响应块结构 #
python
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好"}],
stream=True
)
for chunk in stream:
print(f"ID: {chunk.id}")
print(f"Object: {chunk.object}")
print(f"Created: {chunk.created}")
print(f"Model: {chunk.model}")
print(f"Delta: {chunk.choices[0].delta}")
print(f"Finish Reason: {chunk.choices[0].finish_reason}")
print("---")
Delta 对象 #
text
┌─────────────────────────────────────────────────────────────┐
│ Delta 对象结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 第一个 chunk(角色信息): │
│ { │
│ "role": "assistant", │
│ "content": null │
│ } │
│ │
│ 中间 chunks(内容增量): │
│ { │
│ "content": "部分文本" │
│ } │
│ │
│ 最后一个 chunk(结束标记): │
│ { │
│ "content": null, │
│ "finish_reason": "stop" │
│ } │
│ │
└─────────────────────────────────────────────────────────────┘
高级用法 #
收集完整响应 #
python
from openai import OpenAI
client = OpenAI()
def stream_and_collect(messages: list) -> str:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
full_content = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
print()
return full_content
result = stream_and_collect([
{"role": "user", "content": "介绍一下 Python"}
])
print(f"\n完整内容长度: {len(result)}")
带进度显示 #
python
import sys
import time
def stream_with_progress(messages: list):
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
print("开始生成...", end="\n\n")
start_time = time.time()
char_count = 0
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
char_count += len(content)
elapsed = time.time() - start_time
print(f"\n\n---")
print(f"生成字符: {char_count}")
print(f"耗时: {elapsed:.2f} 秒")
print(f"速度: {char_count/elapsed:.1f} 字符/秒")
stream_with_progress([
{"role": "user", "content": "写一篇关于人工智能的短文"}
])
异步流式处理 #
python
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def async_stream_chat():
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "写一个故事"}
],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(async_stream_chat())
并发流式请求 #
python
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def stream_one(prompt: str, index: int):
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
print(f"\n=== 任务 {index} ===")
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
async def concurrent_streams():
prompts = [
"写一首关于春天的诗",
"写一首关于夏天的诗",
"写一首关于秋天的诗"
]
tasks = [
stream_one(prompt, i+1)
for i, prompt in enumerate(prompts)
]
await asyncio.gather(*tasks)
asyncio.run(concurrent_streams())
Web 应用集成 #
Flask 流式响应 #
python
from flask import Flask, Response, stream_with_context
from openai import OpenAI
app = Flask(__name__)
client = OpenAI()
@app.route('/chat')
def chat():
def generate():
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "讲一个笑话"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)
FastAPI 流式响应 #
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
@app.get("/chat")
async def chat():
async def generate():
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "你好"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
data = json.dumps({
"content": chunk.choices[0].delta.content
})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
前端接收流式数据 #
html
<!DOCTYPE html>
<html>
<head>
<title>流式聊天</title>
</head>
<body>
<div id="output"></div>
<script>
const output = document.getElementById('output');
const eventSource = new EventSource('/chat');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
output.textContent += data.content;
};
eventSource.onerror = (error) => {
console.error('SSE Error:', error);
eventSource.close();
};
</script>
</body>
</html>
流式响应处理技巧 #
处理中断 #
python
import signal
import sys
client = OpenAI()
interrupted = False
def signal_handler(sig, frame):
global interrupted
print("\n\n[用户中断]")
interrupted = True
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
def stream_with_interrupt(messages: list):
global interrupted
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
for chunk in stream:
if interrupted:
break
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
超时处理 #
python
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def stream_with_timeout(messages: list, timeout: float = 30.0):
try:
stream = await asyncio.wait_for(
async_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
),
timeout=timeout
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
except asyncio.TimeoutError:
print("\n[请求超时]")
asyncio.run(stream_with_timeout([
{"role": "user", "content": "写一篇长文"}
], timeout=10.0))
错误重试 #
python
from openai import OpenAI, APIError, RateLimitError
import time
client = OpenAI()
def stream_with_retry(messages: list, max_retries: int = 3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
return
except RateLimitError:
wait = 2 ** attempt
print(f"\n[速率限制,{wait}秒后重试...]")
time.sleep(wait)
except APIError as e:
print(f"\n[API错误: {e}]")
if attempt == max_retries - 1:
raise
for content in stream_with_retry([
{"role": "user", "content": "你好"}
]):
print(content, end="", flush=True)
流式响应与 Token 统计 #
计算 Token 使用量 #
python
from openai import OpenAI
client = OpenAI()
def stream_with_usage(messages: list):
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
stream_options={"include_usage": True}
)
full_content = ""
usage_info = None
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
if hasattr(chunk, 'usage') and chunk.usage:
usage_info = chunk.usage
print("\n")
if usage_info:
print(f"输入 Token: {usage_info.prompt_tokens}")
print(f"输出 Token: {usage_info.completion_tokens}")
print(f"总 Token: {usage_info.total_tokens}")
return full_content
stream_with_usage([
{"role": "user", "content": "介绍一下 Python"}
])
最佳实践 #
1. 选择合适的场景 #
text
┌─────────────────────────────────────────────────────────────┐
│ 流式响应适用场景 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ✅ 推荐使用流式: │
│ - 聊天应用界面 │
│ - 长文本生成 │
│ - 实时交互场景 │
│ - 需要快速反馈的场景 │
│ │
│ ❌ 不推荐使用流式: │
│ - 需要完整响应后处理 │
│ - 批量处理任务 │
│ - 需要精确 Token 统计 │
│ - 后端 API 调用 │
│ │
└─────────────────────────────────────────────────────────────┘
2. 完整的流式聊天类 #
python
from openai import OpenAI
from typing import List, Dict, Generator, Optional
class StreamingChat:
def __init__(self, model: str = "gpt-4o-mini"):
self.client = OpenAI()
self.model = model
self.messages: List[Dict] = []
def set_system_prompt(self, prompt: str):
self.messages.insert(0, {
"role": "system",
"content": prompt
})
def chat(
self,
user_input: str,
stream: bool = True
) -> Generator[str, None, None]:
self.messages.append({
"role": "user",
"content": user_input
})
if stream:
return self._stream_response()
else:
return self._sync_response()
def _stream_response(self) -> Generator[str, None, None]:
full_content = ""
stream = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
yield content
self.messages.append({
"role": "assistant",
"content": full_content
})
def _sync_response(self) -> str:
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages
)
content = response.choices[0].message.content
self.messages.append({
"role": "assistant",
"content": content
})
return content
def get_history(self) -> List[Dict]:
return self.messages.copy()
def clear_history(self):
system_prompt = None
if self.messages and self.messages[0]["role"] == "system":
system_prompt = self.messages[0]
self.messages = [system_prompt] if system_prompt else []
chat = StreamingChat()
chat.set_system_prompt("你是一个友好的助手。")
for content in chat.chat("你好"):
print(content, end="", flush=True)
print("\n")
for content in chat.chat("我刚才说了什么?"):
print(content, end="", flush=True)
3. 前端完整示例 #
html
<!DOCTYPE html>
<html>
<head>
<title>AI 聊天</title>
<style>
#chat-container {
max-width: 600px;
margin: 20px auto;
border: 1px solid #ccc;
border-radius: 8px;
height: 400px;
overflow-y: auto;
padding: 10px;
}
.message {
margin: 10px 0;
padding: 8px 12px;
border-radius: 4px;
}
.user { background: #e3f2fd; text-align: right; }
.assistant { background: #f5f5f5; }
#input-container {
max-width: 600px;
margin: 0 auto;
display: flex;
gap: 10px;
}
#user-input { flex: 1; padding: 10px; }
#send-btn { padding: 10px 20px; }
</style>
</head>
<body>
<div id="chat-container"></div>
<div id="input-container">
<input type="text" id="user-input" placeholder="输入消息...">
<button id="send-btn">发送</button>
</div>
<script>
const chatContainer = document.getElementById('chat-container');
const userInput = document.getElementById('user-input');
const sendBtn = document.getElementById('send-btn');
function addMessage(role, content) {
const div = document.createElement('div');
div.className = `message ${role}`;
div.textContent = content;
chatContainer.appendChild(div);
chatContainer.scrollTop = chatContainer.scrollHeight;
return div;
}
async function sendMessage() {
const message = userInput.value.trim();
if (!message) return;
addMessage('user', message);
userInput.value = '';
const assistantDiv = addMessage('assistant', '');
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') continue;
try {
const json = JSON.parse(data);
assistantDiv.textContent += json.content;
chatContainer.scrollTop = chatContainer.scrollHeight;
} catch (e) {}
}
}
}
}
sendBtn.onclick = sendMessage;
userInput.onkeypress = (e) => {
if (e.key === 'Enter') sendMessage();
};
</script>
</body>
</html>
下一步 #
现在你已经掌握了流式响应的使用方法,接下来学习 函数调用,了解如何让 GPT 调用外部函数!
最后更新:2026-03-29