WebSocket 实时语音 #

概述 #

WebSocket API 提供实时双向语音通信能力,适合需要低延迟响应的应用场景。

text
┌─────────────────────────────────────────────────────────────┐
│                    WebSocket 架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   客户端 ←──────────────────→ 服务器                        │
│     │                            │                          │
│     │  ┌──────────────────────┐  │                          │
│     │  │  WebSocket 连接       │  │                          │
│     │  │  - 双向通信           │  │                          │
│     │  │  - 低延迟             │  │                          │
│     │  │  - 实时流             │  │                          │
│     │  └──────────────────────┘  │                          │
│     │                            │                          │
│   音频输入 ──────────────────→ 处理                         │
│   音频输出 ←────────────────── 生成                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

连接端点 #

文本转语音流 #

text
wss://api.elevenlabs.io/v1/text-to-speech/{model_id}/stream-input

对话式 AI #

text
wss://api.elevenlabs.io/v1/convai/conversation

文本转语音流 #

连接建立 #

python
import websocket
import json
import base64

def on_open(ws):
    config = {
        "text": " ",
        "voice_settings": {
            "stability": 0.5,
            "similarity_boost": 0.75
        },
        "xi_api_key": "your_api_key"
    }
    ws.send(json.dumps(config))

def on_message(ws, message):
    audio_data = base64.b64decode(message)
    with open("output.mp3", "ab") as f:
        f.write(audio_data)

def on_error(ws, error):
    print(f"Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print("Connection closed")

ws = websocket.WebSocketApp(
    "wss://api.elevenlabs.io/v1/text-to-speech/eleven_multilingual_v2/stream-input",
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close
)

ws.run_forever()

发送文本 #

python
def send_text(ws, text, flush=True):
    message = json.dumps({
        "text": text,
        "flush": flush
    })
    ws.send(message)

# 发送多个文本片段
send_text(ws, "Hello, ")
send_text(ws, "this is ")
send_text(ws, "a test.")

关闭连接 #

python
def close_connection(ws):
    ws.send(json.dumps({"text": ""}))
    ws.close()

完整示例 #

Python 实时 TTS #

python
import websocket
import json
import base64
import threading
import queue

class RealtimeTTS:
    def __init__(self, api_key, voice_id, model_id="eleven_multilingual_v2"):
        self.api_key = api_key
        self.voice_id = voice_id
        self.model_id = model_id
        self.audio_queue = queue.Queue()
        self.ws = None
        self.connected = False
    
    def on_open(self, ws):
        config = {
            "text": " ",
            "voice_settings": {
                "stability": 0.5,
                "similarity_boost": 0.75
            },
            "xi_api_key": self.api_key
        }
        ws.send(json.dumps(config))
        self.connected = True
    
    def on_message(self, ws, message):
        if message:
            audio_data = base64.b64decode(message)
            self.audio_queue.put(audio_data)
    
    def on_error(self, ws, error):
        print(f"Error: {error}")
        self.connected = False
    
    def on_close(self, ws, close_status_code, close_msg):
        self.connected = False
    
    def connect(self):
        url = f"wss://api.elevenlabs.io/v1/text-to-speech/{self.model_id}/stream-input"
        
        self.ws = websocket.WebSocketApp(
            url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        
        thread = threading.Thread(target=self.ws.run_forever)
        thread.daemon = True
        thread.start()
        
        while not self.connected:
            pass
    
    def send_text(self, text, flush=True):
        if self.connected and self.ws:
            message = json.dumps({
                "text": text,
                "flush": flush
            })
            self.ws.send(message)
    
    def get_audio(self):
        return self.audio_queue.get()
    
    def close(self):
        if self.ws:
            self.ws.close()

# 使用示例
tts = RealtimeTTS("your_api_key", "JBFqnCBsd6RMkjVDRZzb")
tts.connect()

tts.send_text("Hello, this is a real-time test.")

while True:
    audio = tts.get_audio()
    # 处理音频数据
    pass

Node.js 实时 TTS #

typescript
import WebSocket from "ws";

interface TTSConfig {
  apiKey: string;
  voiceId: string;
  modelId?: string;
}

class RealtimeTTS {
  private ws: WebSocket | null = null;
  private config: TTSConfig;
  private onAudio: (audio: Buffer) => void;

  constructor(config: TTSConfig, onAudio: (audio: Buffer) => void) {
    this.config = config;
    this.onAudio = onAudio;
  }

  connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      const modelId = this.config.modelId || "eleven_multilingual_v2";
      const url = `wss://api.elevenlabs.io/v1/text-to-speech/${modelId}/stream-input`;

      this.ws = new WebSocket(url);

      this.ws.on("open", () => {
        const initMessage = {
          text: " ",
          voice_settings: {
            stability: 0.5,
            similarity_boost: 0.75,
          },
          xi_api_key: this.config.apiKey,
        };
        this.ws!.send(JSON.stringify(initMessage));
        resolve();
      });

      this.ws.on("message", (data: Buffer) => {
        this.onAudio(data);
      });

      this.ws.on("error", (error) => {
        reject(error);
      });
    });
  }

  sendText(text: string, flush = true): void {
    if (this.ws) {
      const message = JSON.stringify({ text, flush });
      this.ws.send(message);
    }
  }

  close(): void {
    if (this.ws) {
      this.ws.send(JSON.stringify({ text: "" }));
      this.ws.close();
    }
  }
}

// 使用示例
const tts = new RealtimeTTS(
  {
    apiKey: process.env.ELEVENLABS_API_KEY!,
    voiceId: "JBFqnCBsd6RMkjVDRZzb",
  },
  (audio) => {
    console.log(`Received ${audio.length} bytes of audio`);
  }
);

async function main() {
  await tts.connect();
  tts.sendText("Hello, this is a real-time test.");
}

main();

对话式 AI WebSocket #

配置选项 #

python
agent_config = {
    "agent": {
        "prompt": {
            "text": "You are a helpful assistant.",
            "temperature": 0.7
        },
        "first_message": "Hello! How can I help you?",
        "language": "en",
        "voice": {
            "voice_id": "JBFqnCBsd6RMkjVDRZzb"
        }
    }
}

完整对话示例 #

python
import websocket
import json
import base64
import pyaudio

class VoiceConversation:
    def __init__(self, api_key, agent_config):
        self.api_key = api_key
        self.agent_config = agent_config
        self.ws = None
        self.audio = pyaudio.PyAudio()
        self.stream = None
    
    def on_open(self, ws):
        ws.send(json.dumps(self.agent_config))
        self.start_recording()
    
    def on_message(self, ws, message):
        event = json.loads(message)
        
        if event.get("type") == "audio":
            audio_data = base64.b64decode(event["audio"])
            self.play_audio(audio_data)
        
        elif event.get("type") == "transcript":
            print(f"User: {event['transcript']}")
        
        elif event.get("type") == "agent_response":
            print(f"Agent: {event['agent_response']}")
    
    def on_error(self, ws, error):
        print(f"Error: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        self.stop_recording()
    
    def start_recording(self):
        self.stream = self.audio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=16000,
            input=True,
            frames_per_buffer=1024,
            stream_callback=self.audio_callback
        )
        self.stream.start_stream()
    
    def audio_callback(self, in_data, frame_count, time_info, status):
        if self.ws:
            message = json.dumps({
                "type": "audio",
                "audio": base64.b64encode(in_data).decode()
            })
            self.ws.send(message)
        return (in_data, pyaudio.paContinue)
    
    def play_audio(self, audio_data):
        if not self.stream:
            self.stream = self.audio.open(
                format=pyaudio.paInt16,
                channels=1,
                rate=22050,
                output=True
            )
        self.stream.write(audio_data)
    
    def stop_recording(self):
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
    
    def connect(self):
        self.ws = websocket.WebSocketApp(
            "wss://api.elevenlabs.io/v1/convai/conversation",
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            header={"xi-api-key": self.api_key}
        )
        self.ws.run_forever()

# 使用示例
agent_config = {
    "agent": {
        "prompt": {"text": "You are a helpful assistant."},
        "first_message": "Hello!",
        "language": "en"
    }
}

conv = VoiceConversation("your_api_key", agent_config)
conv.connect()

最佳实践 #

连接管理 #

text
┌─────────────────────────────────────────────────────────────┐
│                    连接管理建议                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  连接复用:                                                  │
│  ├── 保持长连接                                             │
│  ├── 避免频繁断开重连                                       │
│  └── 使用心跳保持连接                                       │
│                                                             │
│  错误恢复:                                                  │
│  ├── 实现自动重连                                           │
│  ├── 指数退避策略                                           │
│  └── 保存状态恢复                                           │
│                                                             │
│  资源清理:                                                  │
│  ├── 正确关闭连接                                           │
│  ├── 释放音频资源                                           │
│  └── 清理缓冲区                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

性能优化 #

python
# 音频缓冲
class AudioBuffer:
    def __init__(self, max_size=1024 * 1024):
        self.buffer = bytearray()
        self.max_size = max_size
    
    def add(self, data):
        if len(self.buffer) + len(data) > self.max_size:
            self.buffer = self.buffer[-self.max_size + len(data):]
        self.buffer.extend(data)
    
    def get(self):
        return bytes(self.buffer)

下一步 #

最后更新:2026-04-05