API 服务 #
服务概述 #
Coqui TTS 提供了内置的 HTTP 服务器,可以快速搭建 TTS API 服务。
text
┌─────────────────────────────────────────────────────────────┐
│ TTS API 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Client │ ←→ │ TTS Server │ ←→ │ Model │ │
│ │ 客户端 │ │ API 服务 │ │ 模型 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ HTTP 请求 REST API 语音合成 │
│ │
└─────────────────────────────────────────────────────────────┘
启动服务 #
基础启动 #
bash
# 使用默认配置启动
tts-server --model_name tts_models/en/ljspeech/vits
# 指定端口
tts-server --model_name tts_models/en/ljspeech/vits --port 5002
# 指定主机地址
tts-server --model_name tts_models/en/ljspeech/vits --host 0.0.0.0 --port 5002
使用 GPU #
bash
# 指定 GPU
CUDA_VISIBLE_DEVICES=0 tts-server --model_name tts_models/en/ljspeech/vits
# 多 GPU(负载均衡)
tts-server --model_name tts_models/en/ljspeech/vits --gpus 0,1
完整启动参数 #
bash
tts-server \
--model_name tts_models/en/ljspeech/vits \
--vocoder_name vocoder_models/en/ljspeech/hifigan_v2 \
--host 0.0.0.0 \
--port 5002 \
--use_cuda true \
--debug false
API 接口 #
核心接口 #
text
┌─────────────────────────────────────────────────────────────┐
│ API 接口列表 │
├─────────────────────────────────────────────────────────────┤
│ │
│ GET / 服务信息 │
│ GET /api/tts 文本转语音 │
│ GET /api/tts?text=xxx 合成指定文本 │
│ POST /api/tts POST 方式合成 │
│ GET /api/speakers 获取说话人列表 │
│ GET /api/languages 获取语言列表 │
│ GET /api/models 获取模型信息 │
│ │
└─────────────────────────────────────────────────────────────┘
GET /api/tts #
bash
# 基础请求
curl "http://localhost:5002/api/tts?text=Hello%20world" --output output.wav
# 指定说话人
curl "http://localhost:5002/api/tts?text=Hello&speaker_id=0" --output output.wav
# 指定语言
curl "http://localhost:5002/api/tts?text=你好&language_id=zh-cn" --output output.wav
# 声音克隆
curl "http://localhost:5002/api/tts?text=Hello&speaker_wav=reference.wav" --output output.wav
POST /api/tts #
bash
# POST 请求
curl -X POST "http://localhost:5002/api/tts" \
-H "Content-Type: application/json" \
-d '{"text": "Hello world", "speaker_id": 0}' \
--output output.wav
获取说话人列表 #
bash
curl "http://localhost:5002/api/speakers"
# 响应示例
# ["speaker_0", "speaker_1", "speaker_2"]
获取语言列表 #
bash
curl "http://localhost:5002/api/languages"
# 响应示例
# ["en", "zh-cn", "ja", "ko", "fr", "de", "es"]
Python 客户端 #
基础客户端 #
python
import requests
from pathlib import Path
class TTSClient:
def __init__(self, base_url="http://localhost:5002"):
self.base_url = base_url
def synthesize(self, text, output_path="output.wav", **kwargs):
params = {"text": text}
params.update(kwargs)
response = requests.get(f"{self.base_url}/api/tts", params=params)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
return output_path
else:
raise Exception(f"合成失败: {response.text}")
def get_speakers(self):
response = requests.get(f"{self.base_url}/api/speakers")
return response.json()
def get_languages(self):
response = requests.get(f"{self.base_url}/api/languages")
return response.json()
# 使用
client = TTSClient()
client.synthesize("Hello world", "hello.wav")
print(client.get_speakers())
高级客户端 #
python
import requests
import base64
from typing import Optional, List
class AdvancedTTSClient:
def __init__(self, base_url="http://localhost:5002"):
self.base_url = base_url
self.session = requests.Session()
def synthesize(
self,
text: str,
speaker_id: Optional[int] = None,
language_id: Optional[str] = None,
speaker_wav: Optional[str] = None,
style_wav: Optional[str] = None,
output_path: str = "output.wav"
) -> str:
params = {"text": text}
if speaker_id is not None:
params["speaker_id"] = speaker_id
if language_id is not None:
params["language_id"] = language_id
if speaker_wav is not None:
params["speaker_wav"] = speaker_wav
if style_wav is not None:
params["style_wav"] = style_wav
response = self.session.get(
f"{self.base_url}/api/tts",
params=params,
timeout=60
)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
return output_path
else:
raise Exception(f"合成失败: {response.status_code} - {response.text}")
def synthesize_base64(self, text: str, **kwargs) -> str:
"""返回 base64 编码的音频"""
output_path = self.synthesize(text, "temp.wav", **kwargs)
with open(output_path, "rb") as f:
audio_base64 = base64.b64encode(f.read()).decode()
import os
os.remove(output_path)
return audio_base64
def batch_synthesize(
self,
texts: List[str],
output_dir: str = "output"
) -> List[str]:
from pathlib import Path
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
results = []
for i, text in enumerate(texts):
output_path = output_dir / f"audio_{i:03d}.wav"
self.synthesize(text, str(output_path))
results.append(str(output_path))
return results
def health_check(self) -> bool:
try:
response = self.session.get(f"{self.base_url}/", timeout=5)
return response.status_code == 200
except:
return False
# 使用
client = AdvancedTTSClient()
# 检查服务状态
if client.health_check():
print("服务正常运行")
# 合成语音
client.synthesize(
text="Hello, this is a test.",
speaker_id=0,
output_path="test.wav"
)
# 批量合成
texts = ["First sentence.", "Second sentence.", "Third sentence."]
results = client.batch_synthesize(texts, "batch_output")
自定义 API 服务 #
使用 FastAPI #
python
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional
import torch
from TTS.api import TTS
import tempfile
import io
app = FastAPI(title="TTS API", version="1.0.0")
# 全局模型
tts = None
device = None
class SynthesisRequest(BaseModel):
text: str
speaker_id: Optional[int] = None
language_id: Optional[str] = None
speaker_wav: Optional[str] = None
@app.on_event("startup")
async def load_model():
global tts, device
device = "cuda" if torch.cuda.is_available() else "cpu"
tts = TTS("tts_models/en/ljspeech/vits").to(device)
print(f"模型已加载,使用设备: {device}")
@app.get("/")
async def root():
return {
"service": "TTS API",
"status": "running",
"device": device
}
@app.get("/api/speakers")
async def get_speakers():
return tts.speakers if tts.speakers else []
@app.get("/api/languages")
async def get_languages():
return tts.languages if tts.languages else []
@app.get("/api/tts")
async def synthesize_get(
text: str = Query(..., description="要合成的文本"),
speaker_id: Optional[int] = Query(None),
language_id: Optional[str] = Query(None)
):
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
tts.tts_to_file(
text=text,
speaker=speaker_id,
language=language_id,
file_path=f.name
)
return FileResponse(
f.name,
media_type="audio/wav",
filename="output.wav"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/tts")
async def synthesize_post(request: SynthesisRequest):
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
tts.tts_to_file(
text=request.text,
speaker=request.speaker_id,
language=request.language_id,
file_path=f.name
)
return FileResponse(
f.name,
media_type="audio/wav",
filename="output.wav"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/tts/stream")
async def synthesize_stream(request: SynthesisRequest):
"""流式返回音频"""
try:
wav = tts.tts(text=request.text)
# 转换为字节流
import soundfile as sf
buffer = io.BytesIO()
sf.write(buffer, wav, 22050, format="WAV")
buffer.seek(0)
return StreamingResponse(
buffer,
media_type="audio/wav"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 启动命令: uvicorn api_server:app --host 0.0.0.0 --port 8000
使用 Flask #
python
from flask import Flask, request, jsonify, send_file
import torch
from TTS.api import TTS
import tempfile
import io
import soundfile as sf
app = Flask(__name__)
# 全局模型
tts = None
@app.before_first_request
def load_model():
global tts
device = "cuda" if torch.cuda.is_available() else "cpu"
tts = TTS("tts_models/en/ljspeech/vits").to(device)
@app.route("/")
def index():
return jsonify({
"service": "TTS API",
"status": "running"
})
@app.route("/api/tts", methods=["GET", "POST"])
def synthesize():
if request.method == "GET":
text = request.args.get("text", "")
speaker_id = request.args.get("speaker_id", type=int)
language_id = request.args.get("language_id")
else:
data = request.get_json()
text = data.get("text", "")
speaker_id = data.get("speaker_id")
language_id = data.get("language_id")
if not text:
return jsonify({"error": "Text is required"}), 400
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
tts.tts_to_file(
text=text,
speaker=speaker_id,
language=language_id,
file_path=f.name
)
return send_file(
f.name,
mimetype="audio/wav",
as_attachment=True,
download_name="output.wav"
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/speakers")
def get_speakers():
return jsonify(tts.speakers if tts.speakers else [])
@app.route("/api/languages")
def get_languages():
return jsonify(tts.languages if tts.languages else [])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5002)
高级功能 #
请求队列 #
python
from fastapi import FastAPI
from fastapi.responses import FileResponse
import torch
from TTS.api import TTS
import asyncio
from queue import Queue
from threading import Thread
import tempfile
app = FastAPI()
class TTSWorker:
def __init__(self, model_name, num_workers=2):
self.queue = Queue()
self.results = {}
self.request_id = 0
self.tts = TTS(model_name)
for _ in range(num_workers):
Thread(target=self._worker, daemon=True).start()
def _worker(self):
while True:
request_id, text, params = self.queue.get()
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
self.tts.tts_to_file(
text=text,
file_path=f.name,
**params
)
self.results[request_id] = f.name
except Exception as e:
self.results[request_id] = str(e)
def submit(self, text, **params):
self.request_id += 1
self.queue.put((self.request_id, text, params))
return self.request_id
def get_result(self, request_id):
if request_id in self.results:
return self.results.pop(request_id)
return None
worker = TTSWorker("tts_models/en/ljspeech/vits")
@app.post("/api/tts/async")
async def synthesize_async(text: str):
request_id = worker.submit(text)
return {"request_id": request_id}
@app.get("/api/tts/result/{request_id}")
async def get_result(request_id: int):
result = worker.get_result(request_id)
if result is None:
return {"status": "pending"}
elif isinstance(result, str) and result.endswith(".wav"):
return FileResponse(result, media_type="audio/wav")
else:
return {"status": "error", "message": result}
缓存机制 #
python
import hashlib
from functools import lru_cache
from pathlib import Path
import json
class TTSCache:
def __init__(self, cache_dir="tts_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(self, text, **kwargs):
key_data = {"text": text, **kwargs}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, text, **kwargs):
key = self._get_cache_key(text, **kwargs)
cache_path = self.cache_dir / f"{key}.wav"
if cache_path.exists():
return str(cache_path)
return None
def set(self, text, audio_path, **kwargs):
key = self._get_cache_key(text, **kwargs)
cache_path = self.cache_dir / f"{key}.wav"
import shutil
shutil.copy(audio_path, cache_path)
return str(cache_path)
# 在 API 中使用缓存
cache = TTSCache()
@app.get("/api/tts")
async def synthesize_cached(text: str):
# 检查缓存
cached = cache.get(text)
if cached:
return FileResponse(cached, media_type="audio/wav")
# 合成新音频
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
tts.tts_to_file(text=text, file_path=f.name)
cache.set(text, f.name)
return FileResponse(f.name, media_type="audio/wav")
下一步 #
掌握了 API 服务后,继续学习 实战案例,了解完整的项目实现!
最后更新:2026-04-05