WebSocket 实时语音 #
概述 #
WebSocket API 提供实时双向语音通信能力,适合需要低延迟响应的应用场景。
text
┌─────────────────────────────────────────────────────────────┐
│ WebSocket 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 ←──────────────────→ 服务器 │
│ │ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ WebSocket 连接 │ │ │
│ │ │ - 双向通信 │ │ │
│ │ │ - 低延迟 │ │ │
│ │ │ - 实时流 │ │ │
│ │ └──────────────────────┘ │ │
│ │ │ │
│ 音频输入 ──────────────────→ 处理 │
│ 音频输出 ←────────────────── 生成 │
│ │
└─────────────────────────────────────────────────────────────┘
连接端点 #
文本转语音流 #
text
wss://api.elevenlabs.io/v1/text-to-speech/{model_id}/stream-input
对话式 AI #
text
wss://api.elevenlabs.io/v1/convai/conversation
文本转语音流 #
连接建立 #
python
import websocket
import json
import base64
def on_open(ws):
config = {
"text": " ",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
},
"xi_api_key": "your_api_key"
}
ws.send(json.dumps(config))
def on_message(ws, message):
audio_data = base64.b64decode(message)
with open("output.mp3", "ab") as f:
f.write(audio_data)
def on_error(ws, error):
print(f"Error: {error}")
def on_close(ws, close_status_code, close_msg):
print("Connection closed")
ws = websocket.WebSocketApp(
"wss://api.elevenlabs.io/v1/text-to-speech/eleven_multilingual_v2/stream-input",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever()
发送文本 #
python
def send_text(ws, text, flush=True):
message = json.dumps({
"text": text,
"flush": flush
})
ws.send(message)
# 发送多个文本片段
send_text(ws, "Hello, ")
send_text(ws, "this is ")
send_text(ws, "a test.")
关闭连接 #
python
def close_connection(ws):
ws.send(json.dumps({"text": ""}))
ws.close()
完整示例 #
Python 实时 TTS #
python
import websocket
import json
import base64
import threading
import queue
class RealtimeTTS:
def __init__(self, api_key, voice_id, model_id="eleven_multilingual_v2"):
self.api_key = api_key
self.voice_id = voice_id
self.model_id = model_id
self.audio_queue = queue.Queue()
self.ws = None
self.connected = False
def on_open(self, ws):
config = {
"text": " ",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
},
"xi_api_key": self.api_key
}
ws.send(json.dumps(config))
self.connected = True
def on_message(self, ws, message):
if message:
audio_data = base64.b64decode(message)
self.audio_queue.put(audio_data)
def on_error(self, ws, error):
print(f"Error: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
self.connected = False
def connect(self):
url = f"wss://api.elevenlabs.io/v1/text-to-speech/{self.model_id}/stream-input"
self.ws = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
thread = threading.Thread(target=self.ws.run_forever)
thread.daemon = True
thread.start()
while not self.connected:
pass
def send_text(self, text, flush=True):
if self.connected and self.ws:
message = json.dumps({
"text": text,
"flush": flush
})
self.ws.send(message)
def get_audio(self):
return self.audio_queue.get()
def close(self):
if self.ws:
self.ws.close()
# 使用示例
tts = RealtimeTTS("your_api_key", "JBFqnCBsd6RMkjVDRZzb")
tts.connect()
tts.send_text("Hello, this is a real-time test.")
while True:
audio = tts.get_audio()
# 处理音频数据
pass
Node.js 实时 TTS #
typescript
import WebSocket from "ws";
interface TTSConfig {
apiKey: string;
voiceId: string;
modelId?: string;
}
class RealtimeTTS {
private ws: WebSocket | null = null;
private config: TTSConfig;
private onAudio: (audio: Buffer) => void;
constructor(config: TTSConfig, onAudio: (audio: Buffer) => void) {
this.config = config;
this.onAudio = onAudio;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
const modelId = this.config.modelId || "eleven_multilingual_v2";
const url = `wss://api.elevenlabs.io/v1/text-to-speech/${modelId}/stream-input`;
this.ws = new WebSocket(url);
this.ws.on("open", () => {
const initMessage = {
text: " ",
voice_settings: {
stability: 0.5,
similarity_boost: 0.75,
},
xi_api_key: this.config.apiKey,
};
this.ws!.send(JSON.stringify(initMessage));
resolve();
});
this.ws.on("message", (data: Buffer) => {
this.onAudio(data);
});
this.ws.on("error", (error) => {
reject(error);
});
});
}
sendText(text: string, flush = true): void {
if (this.ws) {
const message = JSON.stringify({ text, flush });
this.ws.send(message);
}
}
close(): void {
if (this.ws) {
this.ws.send(JSON.stringify({ text: "" }));
this.ws.close();
}
}
}
// 使用示例
const tts = new RealtimeTTS(
{
apiKey: process.env.ELEVENLABS_API_KEY!,
voiceId: "JBFqnCBsd6RMkjVDRZzb",
},
(audio) => {
console.log(`Received ${audio.length} bytes of audio`);
}
);
async function main() {
await tts.connect();
tts.sendText("Hello, this is a real-time test.");
}
main();
对话式 AI WebSocket #
配置选项 #
python
agent_config = {
"agent": {
"prompt": {
"text": "You are a helpful assistant.",
"temperature": 0.7
},
"first_message": "Hello! How can I help you?",
"language": "en",
"voice": {
"voice_id": "JBFqnCBsd6RMkjVDRZzb"
}
}
}
完整对话示例 #
python
import websocket
import json
import base64
import pyaudio
class VoiceConversation:
def __init__(self, api_key, agent_config):
self.api_key = api_key
self.agent_config = agent_config
self.ws = None
self.audio = pyaudio.PyAudio()
self.stream = None
def on_open(self, ws):
ws.send(json.dumps(self.agent_config))
self.start_recording()
def on_message(self, ws, message):
event = json.loads(message)
if event.get("type") == "audio":
audio_data = base64.b64decode(event["audio"])
self.play_audio(audio_data)
elif event.get("type") == "transcript":
print(f"User: {event['transcript']}")
elif event.get("type") == "agent_response":
print(f"Agent: {event['agent_response']}")
def on_error(self, ws, error):
print(f"Error: {error}")
def on_close(self, ws, close_status_code, close_msg):
self.stop_recording()
def start_recording(self):
self.stream = self.audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024,
stream_callback=self.audio_callback
)
self.stream.start_stream()
def audio_callback(self, in_data, frame_count, time_info, status):
if self.ws:
message = json.dumps({
"type": "audio",
"audio": base64.b64encode(in_data).decode()
})
self.ws.send(message)
return (in_data, pyaudio.paContinue)
def play_audio(self, audio_data):
if not self.stream:
self.stream = self.audio.open(
format=pyaudio.paInt16,
channels=1,
rate=22050,
output=True
)
self.stream.write(audio_data)
def stop_recording(self):
if self.stream:
self.stream.stop_stream()
self.stream.close()
def connect(self):
self.ws = websocket.WebSocketApp(
"wss://api.elevenlabs.io/v1/convai/conversation",
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
header={"xi-api-key": self.api_key}
)
self.ws.run_forever()
# 使用示例
agent_config = {
"agent": {
"prompt": {"text": "You are a helpful assistant."},
"first_message": "Hello!",
"language": "en"
}
}
conv = VoiceConversation("your_api_key", agent_config)
conv.connect()
最佳实践 #
连接管理 #
text
┌─────────────────────────────────────────────────────────────┐
│ 连接管理建议 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 连接复用: │
│ ├── 保持长连接 │
│ ├── 避免频繁断开重连 │
│ └── 使用心跳保持连接 │
│ │
│ 错误恢复: │
│ ├── 实现自动重连 │
│ ├── 指数退避策略 │
│ └── 保存状态恢复 │
│ │
│ 资源清理: │
│ ├── 正确关闭连接 │
│ ├── 释放音频资源 │
│ └── 清理缓冲区 │
│ │
└─────────────────────────────────────────────────────────────┘
性能优化 #
python
# 音频缓冲
class AudioBuffer:
def __init__(self, max_size=1024 * 1024):
self.buffer = bytearray()
self.max_size = max_size
def add(self, data):
if len(self.buffer) + len(data) > self.max_size:
self.buffer = self.buffer[-self.max_size + len(data):]
self.buffer.extend(data)
def get(self):
return bytes(self.buffer)
下一步 #
最后更新:2026-04-05