API 服务 #

服务概述 #

Coqui TTS 提供了内置的 HTTP 服务器,可以快速搭建 TTS API 服务。

text
┌─────────────────────────────────────────────────────────────┐
│                     TTS API 架构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   Client    │ ←→ │  TTS Server │ ←→ │    Model    │     │
│  │   客户端     │    │   API 服务   │    │    模型     │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│         │                  │                  │             │
│         │                  │                  │             │
│         ▼                  ▼                  ▼             │
│    HTTP 请求          REST API           语音合成           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

启动服务 #

基础启动 #

bash
# 使用默认配置启动
tts-server --model_name tts_models/en/ljspeech/vits

# 指定端口
tts-server --model_name tts_models/en/ljspeech/vits --port 5002

# 指定主机地址
tts-server --model_name tts_models/en/ljspeech/vits --host 0.0.0.0 --port 5002

使用 GPU #

bash
# 指定 GPU
CUDA_VISIBLE_DEVICES=0 tts-server --model_name tts_models/en/ljspeech/vits

# 多 GPU(负载均衡)
tts-server --model_name tts_models/en/ljspeech/vits --gpus 0,1

完整启动参数 #

bash
tts-server \
    --model_name tts_models/en/ljspeech/vits \
    --vocoder_name vocoder_models/en/ljspeech/hifigan_v2 \
    --host 0.0.0.0 \
    --port 5002 \
    --use_cuda true \
    --debug false

API 接口 #

核心接口 #

text
┌─────────────────────────────────────────────────────────────┐
│                     API 接口列表                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  GET  /                     服务信息                        │
│  GET  /api/tts              文本转语音                       │
│  GET  /api/tts?text=xxx     合成指定文本                     │
│  POST /api/tts              POST 方式合成                    │
│  GET  /api/speakers         获取说话人列表                   │
│  GET  /api/languages        获取语言列表                     │
│  GET  /api/models           获取模型信息                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

GET /api/tts #

bash
# 基础请求
curl "http://localhost:5002/api/tts?text=Hello%20world" --output output.wav

# 指定说话人
curl "http://localhost:5002/api/tts?text=Hello&speaker_id=0" --output output.wav

# 指定语言
curl "http://localhost:5002/api/tts?text=你好&language_id=zh-cn" --output output.wav

# 声音克隆
curl "http://localhost:5002/api/tts?text=Hello&speaker_wav=reference.wav" --output output.wav

POST /api/tts #

bash
# POST 请求
curl -X POST "http://localhost:5002/api/tts" \
    -H "Content-Type: application/json" \
    -d '{"text": "Hello world", "speaker_id": 0}' \
    --output output.wav

获取说话人列表 #

bash
curl "http://localhost:5002/api/speakers"

# 响应示例
# ["speaker_0", "speaker_1", "speaker_2"]

获取语言列表 #

bash
curl "http://localhost:5002/api/languages"

# 响应示例
# ["en", "zh-cn", "ja", "ko", "fr", "de", "es"]

Python 客户端 #

基础客户端 #

python
import requests
from pathlib import Path

class TTSClient:
    def __init__(self, base_url="http://localhost:5002"):
        self.base_url = base_url
    
    def synthesize(self, text, output_path="output.wav", **kwargs):
        params = {"text": text}
        params.update(kwargs)
        
        response = requests.get(f"{self.base_url}/api/tts", params=params)
        
        if response.status_code == 200:
            with open(output_path, "wb") as f:
                f.write(response.content)
            return output_path
        else:
            raise Exception(f"合成失败: {response.text}")
    
    def get_speakers(self):
        response = requests.get(f"{self.base_url}/api/speakers")
        return response.json()
    
    def get_languages(self):
        response = requests.get(f"{self.base_url}/api/languages")
        return response.json()

# 使用
client = TTSClient()
client.synthesize("Hello world", "hello.wav")
print(client.get_speakers())

高级客户端 #

python
import requests
import base64
from typing import Optional, List

class AdvancedTTSClient:
    def __init__(self, base_url="http://localhost:5002"):
        self.base_url = base_url
        self.session = requests.Session()
    
    def synthesize(
        self,
        text: str,
        speaker_id: Optional[int] = None,
        language_id: Optional[str] = None,
        speaker_wav: Optional[str] = None,
        style_wav: Optional[str] = None,
        output_path: str = "output.wav"
    ) -> str:
        params = {"text": text}
        
        if speaker_id is not None:
            params["speaker_id"] = speaker_id
        if language_id is not None:
            params["language_id"] = language_id
        if speaker_wav is not None:
            params["speaker_wav"] = speaker_wav
        if style_wav is not None:
            params["style_wav"] = style_wav
        
        response = self.session.get(
            f"{self.base_url}/api/tts",
            params=params,
            timeout=60
        )
        
        if response.status_code == 200:
            with open(output_path, "wb") as f:
                f.write(response.content)
            return output_path
        else:
            raise Exception(f"合成失败: {response.status_code} - {response.text}")
    
    def synthesize_base64(self, text: str, **kwargs) -> str:
        """返回 base64 编码的音频"""
        output_path = self.synthesize(text, "temp.wav", **kwargs)
        
        with open(output_path, "rb") as f:
            audio_base64 = base64.b64encode(f.read()).decode()
        
        import os
        os.remove(output_path)
        
        return audio_base64
    
    def batch_synthesize(
        self,
        texts: List[str],
        output_dir: str = "output"
    ) -> List[str]:
        from pathlib import Path
        output_dir = Path(output_dir)
        output_dir.mkdir(exist_ok=True)
        
        results = []
        for i, text in enumerate(texts):
            output_path = output_dir / f"audio_{i:03d}.wav"
            self.synthesize(text, str(output_path))
            results.append(str(output_path))
        
        return results
    
    def health_check(self) -> bool:
        try:
            response = self.session.get(f"{self.base_url}/", timeout=5)
            return response.status_code == 200
        except:
            return False

# 使用
client = AdvancedTTSClient()

# 检查服务状态
if client.health_check():
    print("服务正常运行")

# 合成语音
client.synthesize(
    text="Hello, this is a test.",
    speaker_id=0,
    output_path="test.wav"
)

# 批量合成
texts = ["First sentence.", "Second sentence.", "Third sentence."]
results = client.batch_synthesize(texts, "batch_output")

自定义 API 服务 #

使用 FastAPI #

python
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional
import torch
from TTS.api import TTS
import tempfile
import io

app = FastAPI(title="TTS API", version="1.0.0")

# 全局模型
tts = None
device = None

class SynthesisRequest(BaseModel):
    text: str
    speaker_id: Optional[int] = None
    language_id: Optional[str] = None
    speaker_wav: Optional[str] = None

@app.on_event("startup")
async def load_model():
    global tts, device
    device = "cuda" if torch.cuda.is_available() else "cpu"
    tts = TTS("tts_models/en/ljspeech/vits").to(device)
    print(f"模型已加载,使用设备: {device}")

@app.get("/")
async def root():
    return {
        "service": "TTS API",
        "status": "running",
        "device": device
    }

@app.get("/api/speakers")
async def get_speakers():
    return tts.speakers if tts.speakers else []

@app.get("/api/languages")
async def get_languages():
    return tts.languages if tts.languages else []

@app.get("/api/tts")
async def synthesize_get(
    text: str = Query(..., description="要合成的文本"),
    speaker_id: Optional[int] = Query(None),
    language_id: Optional[str] = Query(None)
):
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            tts.tts_to_file(
                text=text,
                speaker=speaker_id,
                language=language_id,
                file_path=f.name
            )
            return FileResponse(
                f.name,
                media_type="audio/wav",
                filename="output.wav"
            )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/tts")
async def synthesize_post(request: SynthesisRequest):
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            tts.tts_to_file(
                text=request.text,
                speaker=request.speaker_id,
                language=request.language_id,
                file_path=f.name
            )
            return FileResponse(
                f.name,
                media_type="audio/wav",
                filename="output.wav"
            )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/tts/stream")
async def synthesize_stream(request: SynthesisRequest):
    """流式返回音频"""
    try:
        wav = tts.tts(text=request.text)
        
        # 转换为字节流
        import soundfile as sf
        buffer = io.BytesIO()
        sf.write(buffer, wav, 22050, format="WAV")
        buffer.seek(0)
        
        return StreamingResponse(
            buffer,
            media_type="audio/wav"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# 启动命令: uvicorn api_server:app --host 0.0.0.0 --port 8000

使用 Flask #

python
from flask import Flask, request, jsonify, send_file
import torch
from TTS.api import TTS
import tempfile
import io
import soundfile as sf

app = Flask(__name__)

# 全局模型
tts = None

@app.before_first_request
def load_model():
    global tts
    device = "cuda" if torch.cuda.is_available() else "cpu"
    tts = TTS("tts_models/en/ljspeech/vits").to(device)

@app.route("/")
def index():
    return jsonify({
        "service": "TTS API",
        "status": "running"
    })

@app.route("/api/tts", methods=["GET", "POST"])
def synthesize():
    if request.method == "GET":
        text = request.args.get("text", "")
        speaker_id = request.args.get("speaker_id", type=int)
        language_id = request.args.get("language_id")
    else:
        data = request.get_json()
        text = data.get("text", "")
        speaker_id = data.get("speaker_id")
        language_id = data.get("language_id")
    
    if not text:
        return jsonify({"error": "Text is required"}), 400
    
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            tts.tts_to_file(
                text=text,
                speaker=speaker_id,
                language=language_id,
                file_path=f.name
            )
            return send_file(
                f.name,
                mimetype="audio/wav",
                as_attachment=True,
                download_name="output.wav"
            )
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route("/api/speakers")
def get_speakers():
    return jsonify(tts.speakers if tts.speakers else [])

@app.route("/api/languages")
def get_languages():
    return jsonify(tts.languages if tts.languages else [])

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5002)

高级功能 #

请求队列 #

python
from fastapi import FastAPI
from fastapi.responses import FileResponse
import torch
from TTS.api import TTS
import asyncio
from queue import Queue
from threading import Thread
import tempfile

app = FastAPI()

class TTSWorker:
    def __init__(self, model_name, num_workers=2):
        self.queue = Queue()
        self.results = {}
        self.request_id = 0
        self.tts = TTS(model_name)
        
        for _ in range(num_workers):
            Thread(target=self._worker, daemon=True).start()
    
    def _worker(self):
        while True:
            request_id, text, params = self.queue.get()
            try:
                with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
                    self.tts.tts_to_file(
                        text=text,
                        file_path=f.name,
                        **params
                    )
                    self.results[request_id] = f.name
            except Exception as e:
                self.results[request_id] = str(e)
    
    def submit(self, text, **params):
        self.request_id += 1
        self.queue.put((self.request_id, text, params))
        return self.request_id
    
    def get_result(self, request_id):
        if request_id in self.results:
            return self.results.pop(request_id)
        return None

worker = TTSWorker("tts_models/en/ljspeech/vits")

@app.post("/api/tts/async")
async def synthesize_async(text: str):
    request_id = worker.submit(text)
    return {"request_id": request_id}

@app.get("/api/tts/result/{request_id}")
async def get_result(request_id: int):
    result = worker.get_result(request_id)
    if result is None:
        return {"status": "pending"}
    elif isinstance(result, str) and result.endswith(".wav"):
        return FileResponse(result, media_type="audio/wav")
    else:
        return {"status": "error", "message": result}

缓存机制 #

python
import hashlib
from functools import lru_cache
from pathlib import Path
import json

class TTSCache:
    def __init__(self, cache_dir="tts_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
    
    def _get_cache_key(self, text, **kwargs):
        key_data = {"text": text, **kwargs}
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get(self, text, **kwargs):
        key = self._get_cache_key(text, **kwargs)
        cache_path = self.cache_dir / f"{key}.wav"
        if cache_path.exists():
            return str(cache_path)
        return None
    
    def set(self, text, audio_path, **kwargs):
        key = self._get_cache_key(text, **kwargs)
        cache_path = self.cache_dir / f"{key}.wav"
        import shutil
        shutil.copy(audio_path, cache_path)
        return str(cache_path)

# 在 API 中使用缓存
cache = TTSCache()

@app.get("/api/tts")
async def synthesize_cached(text: str):
    # 检查缓存
    cached = cache.get(text)
    if cached:
        return FileResponse(cached, media_type="audio/wav")
    
    # 合成新音频
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
        tts.tts_to_file(text=text, file_path=f.name)
        cache.set(text, f.name)
        return FileResponse(f.name, media_type="audio/wav")

下一步 #

掌握了 API 服务后,继续学习 实战案例,了解完整的项目实现!

最后更新:2026-04-05