OpenAI 流式响应 #

什么是流式响应? #

流式响应(Streaming)是一种数据传输方式,服务器将响应分成多个小块逐步发送,而不是等待全部内容生成完毕后一次性返回。

text
┌─────────────────────────────────────────────────────────────┐
│                    传统响应 vs 流式响应                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  传统响应:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 用户请求 ───> 等待... ───> 等待... ───> 完整响应    │   │
│  └─────────────────────────────────────────────────────┘   │
│  特点:用户需要等待全部内容生成完毕                         │
│                                                             │
│  流式响应:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 用户请求 ───> "你" ───> "好" ───> "!" ───> ...     │   │
│  └─────────────────────────────────────────────────────┘   │
│  特点:实时显示,用户立即看到输出                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

流式响应的优势 #

text
┌─────────────────────────────────────────────────────────────┐
│                    流式响应优势                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ✅ 更好的用户体验                                          │
│     - 立即看到响应开始                                      │
│     - 类似打字机的效果                                      │
│     - 减少等待焦虑                                          │
│                                                             │
│  ✅ 降低首字延迟                                            │
│     - 用户更快看到第一个字                                  │
│     - 感知响应更快                                          │
│                                                             │
│  ✅ 支持长文本生成                                          │
│     - 不需要等待全部生成                                    │
│     - 可以提前显示内容                                      │
│                                                             │
│  ✅ 更好的资源利用                                          │
│     - 边生成边处理                                          │
│     - 可以提前中断                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本用法 #

Python 流式响应 #

python
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "user", "content": "写一个关于人工智能的故事"}
    ],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Node.js 流式响应 #

javascript
import OpenAI from 'openai';

const client = new OpenAI();

async function streamChat() {
  const stream = await client.chat.completions.create({
    model: 'gpt-4o-mini',
    messages: [
      { role: 'user', content: '写一个关于人工智能的故事' }
    ],
    stream: true
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    process.stdout.write(content);
  }
}

streamChat();

cURL 流式请求 #

bash
curl https://api.openai.com/v1/chat/completions \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $OPENAI_API_KEY" \
  -d '{
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "你好"}],
    "stream": true
  }'

流式响应结构 #

SSE 数据格式 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Server-Sent Events 格式                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"role":   │
│         "assistant"},"index":0}]}}                          │
│                                                             │
│  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│         "你"},"index":0}]}}                                 │
│                                                             │
│  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│         "好"},"index":0}]}}                                 │
│                                                             │
│  data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":│
│         "!"},"index":0}]}}                                 │
│                                                             │
│  data: [DONE]                                               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

响应块结构 #

python
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "你好"}],
    stream=True
)

for chunk in stream:
    print(f"ID: {chunk.id}")
    print(f"Object: {chunk.object}")
    print(f"Created: {chunk.created}")
    print(f"Model: {chunk.model}")
    print(f"Delta: {chunk.choices[0].delta}")
    print(f"Finish Reason: {chunk.choices[0].finish_reason}")
    print("---")

Delta 对象 #

text
┌─────────────────────────────────────────────────────────────┐
│                    Delta 对象结构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  第一个 chunk(角色信息):                                  │
│  {                                                          │
│    "role": "assistant",                                     │
│    "content": null                                          │
│  }                                                          │
│                                                             │
│  中间 chunks(内容增量):                                   │
│  {                                                          │
│    "content": "部分文本"                                    │
│  }                                                          │
│                                                             │
│  最后一个 chunk(结束标记):                                │
│  {                                                          │
│    "content": null,                                         │
│    "finish_reason": "stop"                                  │
│  }                                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

高级用法 #

收集完整响应 #

python
from openai import OpenAI

client = OpenAI()

def stream_and_collect(messages: list) -> str:
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True
    )
    
    full_content = ""
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_content += content
    
    print()
    return full_content

result = stream_and_collect([
    {"role": "user", "content": "介绍一下 Python"}
])
print(f"\n完整内容长度: {len(result)}")

带进度显示 #

python
import sys
import time

def stream_with_progress(messages: list):
    client = OpenAI()
    
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True
    )
    
    print("开始生成...", end="\n\n")
    start_time = time.time()
    char_count = 0
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            char_count += len(content)
    
    elapsed = time.time() - start_time
    print(f"\n\n---")
    print(f"生成字符: {char_count}")
    print(f"耗时: {elapsed:.2f} 秒")
    print(f"速度: {char_count/elapsed:.1f} 字符/秒")

stream_with_progress([
    {"role": "user", "content": "写一篇关于人工智能的短文"}
])

异步流式处理 #

python
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def async_stream_chat():
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "user", "content": "写一个故事"}
        ],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(async_stream_chat())

并发流式请求 #

python
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def stream_one(prompt: str, index: int):
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    print(f"\n=== 任务 {index} ===")
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()

async def concurrent_streams():
    prompts = [
        "写一首关于春天的诗",
        "写一首关于夏天的诗",
        "写一首关于秋天的诗"
    ]
    
    tasks = [
        stream_one(prompt, i+1) 
        for i, prompt in enumerate(prompts)
    ]
    
    await asyncio.gather(*tasks)

asyncio.run(concurrent_streams())

Web 应用集成 #

Flask 流式响应 #

python
from flask import Flask, Response, stream_with_context
from openai import OpenAI

app = Flask(__name__)
client = OpenAI()

@app.route('/chat')
def chat():
    def generate():
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "user", "content": "讲一个笑话"}
            ],
            stream=True
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                yield f"data: {chunk.choices[0].delta.content}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

FastAPI 流式响应 #

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.get("/chat")
async def chat():
    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "user", "content": "你好"}
            ],
            stream=True
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                data = json.dumps({
                    "content": chunk.choices[0].delta.content
                })
                yield f"data: {data}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

前端接收流式数据 #

html
<!DOCTYPE html>
<html>
<head>
    <title>流式聊天</title>
</head>
<body>
    <div id="output"></div>
    <script>
        const output = document.getElementById('output');
        
        const eventSource = new EventSource('/chat');
        
        eventSource.onmessage = (event) => {
            if (event.data === '[DONE]') {
                eventSource.close();
                return;
            }
            
            const data = JSON.parse(event.data);
            output.textContent += data.content;
        };
        
        eventSource.onerror = (error) => {
            console.error('SSE Error:', error);
            eventSource.close();
        };
    </script>
</body>
</html>

流式响应处理技巧 #

处理中断 #

python
import signal
import sys

client = OpenAI()

interrupted = False

def signal_handler(sig, frame):
    global interrupted
    print("\n\n[用户中断]")
    interrupted = True
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

def stream_with_interrupt(messages: list):
    global interrupted
    
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True
    )
    
    for chunk in stream:
        if interrupted:
            break
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)

超时处理 #

python
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def stream_with_timeout(messages: list, timeout: float = 30.0):
    try:
        stream = await asyncio.wait_for(
            async_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                stream=True
            ),
            timeout=timeout
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                print(chunk.choices[0].delta.content, end="", flush=True)
                
    except asyncio.TimeoutError:
        print("\n[请求超时]")

asyncio.run(stream_with_timeout([
    {"role": "user", "content": "写一篇长文"}
], timeout=10.0))

错误重试 #

python
from openai import OpenAI, APIError, RateLimitError
import time

client = OpenAI()

def stream_with_retry(messages: list, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                stream=True
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    yield chunk.choices[0].delta.content
            return
            
        except RateLimitError:
            wait = 2 ** attempt
            print(f"\n[速率限制,{wait}秒后重试...]")
            time.sleep(wait)
            
        except APIError as e:
            print(f"\n[API错误: {e}]")
            if attempt == max_retries - 1:
                raise

for content in stream_with_retry([
    {"role": "user", "content": "你好"}
]):
    print(content, end="", flush=True)

流式响应与 Token 统计 #

计算 Token 使用量 #

python
from openai import OpenAI

client = OpenAI()

def stream_with_usage(messages: list):
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True,
        stream_options={"include_usage": True}
    )
    
    full_content = ""
    usage_info = None
    
    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_content += content
        
        if hasattr(chunk, 'usage') and chunk.usage:
            usage_info = chunk.usage
    
    print("\n")
    if usage_info:
        print(f"输入 Token: {usage_info.prompt_tokens}")
        print(f"输出 Token: {usage_info.completion_tokens}")
        print(f"总 Token: {usage_info.total_tokens}")
    
    return full_content

stream_with_usage([
    {"role": "user", "content": "介绍一下 Python"}
])

最佳实践 #

1. 选择合适的场景 #

text
┌─────────────────────────────────────────────────────────────┐
│                    流式响应适用场景                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ✅ 推荐使用流式:                                          │
│  - 聊天应用界面                                             │
│  - 长文本生成                                               │
│  - 实时交互场景                                             │
│  - 需要快速反馈的场景                                       │
│                                                             │
│  ❌ 不推荐使用流式:                                        │
│  - 需要完整响应后处理                                       │
│  - 批量处理任务                                             │
│  - 需要精确 Token 统计                                      │
│  - 后端 API 调用                                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 完整的流式聊天类 #

python
from openai import OpenAI
from typing import List, Dict, Generator, Optional

class StreamingChat:
    def __init__(self, model: str = "gpt-4o-mini"):
        self.client = OpenAI()
        self.model = model
        self.messages: List[Dict] = []
    
    def set_system_prompt(self, prompt: str):
        self.messages.insert(0, {
            "role": "system",
            "content": prompt
        })
    
    def chat(
        self, 
        user_input: str, 
        stream: bool = True
    ) -> Generator[str, None, None]:
        self.messages.append({
            "role": "user",
            "content": user_input
        })
        
        if stream:
            return self._stream_response()
        else:
            return self._sync_response()
    
    def _stream_response(self) -> Generator[str, None, None]:
        full_content = ""
        
        stream = self.client.chat.completions.create(
            model=self.model,
            messages=self.messages,
            stream=True
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                full_content += content
                yield content
        
        self.messages.append({
            "role": "assistant",
            "content": full_content
        })
    
    def _sync_response(self) -> str:
        response = self.client.chat.completions.create(
            model=self.model,
            messages=self.messages
        )
        
        content = response.choices[0].message.content
        self.messages.append({
            "role": "assistant",
            "content": content
        })
        
        return content
    
    def get_history(self) -> List[Dict]:
        return self.messages.copy()
    
    def clear_history(self):
        system_prompt = None
        if self.messages and self.messages[0]["role"] == "system":
            system_prompt = self.messages[0]
        self.messages = [system_prompt] if system_prompt else []


chat = StreamingChat()
chat.set_system_prompt("你是一个友好的助手。")

for content in chat.chat("你好"):
    print(content, end="", flush=True)

print("\n")

for content in chat.chat("我刚才说了什么?"):
    print(content, end="", flush=True)

3. 前端完整示例 #

html
<!DOCTYPE html>
<html>
<head>
    <title>AI 聊天</title>
    <style>
        #chat-container {
            max-width: 600px;
            margin: 20px auto;
            border: 1px solid #ccc;
            border-radius: 8px;
            height: 400px;
            overflow-y: auto;
            padding: 10px;
        }
        .message {
            margin: 10px 0;
            padding: 8px 12px;
            border-radius: 4px;
        }
        .user { background: #e3f2fd; text-align: right; }
        .assistant { background: #f5f5f5; }
        #input-container {
            max-width: 600px;
            margin: 0 auto;
            display: flex;
            gap: 10px;
        }
        #user-input { flex: 1; padding: 10px; }
        #send-btn { padding: 10px 20px; }
    </style>
</head>
<body>
    <div id="chat-container"></div>
    <div id="input-container">
        <input type="text" id="user-input" placeholder="输入消息...">
        <button id="send-btn">发送</button>
    </div>

    <script>
        const chatContainer = document.getElementById('chat-container');
        const userInput = document.getElementById('user-input');
        const sendBtn = document.getElementById('send-btn');

        function addMessage(role, content) {
            const div = document.createElement('div');
            div.className = `message ${role}`;
            div.textContent = content;
            chatContainer.appendChild(div);
            chatContainer.scrollTop = chatContainer.scrollHeight;
            return div;
        }

        async function sendMessage() {
            const message = userInput.value.trim();
            if (!message) return;

            addMessage('user', message);
            userInput.value = '';

            const assistantDiv = addMessage('assistant', '');

            const response = await fetch('/chat', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ message })
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();

            while (true) {
                const { done, value } = await reader.read();
                if (done) break;

                const text = decoder.decode(value);
                const lines = text.split('\n');

                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        const data = line.slice(6);
                        if (data === '[DONE]') continue;
                        
                        try {
                            const json = JSON.parse(data);
                            assistantDiv.textContent += json.content;
                            chatContainer.scrollTop = chatContainer.scrollHeight;
                        } catch (e) {}
                    }
                }
            }
        }

        sendBtn.onclick = sendMessage;
        userInput.onkeypress = (e) => {
            if (e.key === 'Enter') sendMessage();
        };
    </script>
</body>
</html>

下一步 #

现在你已经掌握了流式响应的使用方法,接下来学习 函数调用,了解如何让 GPT 调用外部函数!

最后更新:2026-03-29