批量处理 #

批量处理概述 #

批量处理是处理大量音频文件的常见需求,需要考虑效率、可靠性和结果管理。

text
┌─────────────────────────────────────────────────────────────┐
│                    批量处理流程                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  输入阶段:                                                   │
│  ├── 扫描音频文件                                           │
│  ├── 过滤和排序                                             │
│  └── 创建任务队列                                           │
│                                                             │
│  处理阶段:                                                   │
│  ├── 并行/串行处理                                          │
│  ├── 进度跟踪                                               │
│  └── 错误处理                                               │
│                                                             │
│  输出阶段:                                                   │
│  ├── 结果收集                                               │
│  ├── 格式转换                                               │
│  └── 持久化存储                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本批量处理 #

简单批量转录 #

python
import whisper
import os

model = whisper.load_model("base")

audio_dir = "./audio_files"
output_dir = "./transcripts"
os.makedirs(output_dir, exist_ok=True)

audio_extensions = (".mp3", ".wav", ".m4a", ".flac", ".ogg")
audio_files = [f for f in os.listdir(audio_dir) if f.lower().endswith(audio_extensions)]

for audio_file in audio_files:
    audio_path = os.path.join(audio_dir, audio_file)
    
    print(f"处理: {audio_file}")
    
    result = model.transcribe(audio_path, language="zh")
    
    output_filename = os.path.splitext(audio_file)[0] + ".txt"
    output_path = os.path.join(output_dir, output_filename)
    
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(result["text"])
    
    print(f"完成: {output_filename}")

print(f"\n共处理 {len(audio_files)} 个文件")

带进度显示 #

python
import whisper
import os
from tqdm import tqdm

model = whisper.load_model("base")

audio_dir = "./audio_files"
output_dir = "./transcripts"
os.makedirs(output_dir, exist_ok=True)

audio_files = [f for f in os.listdir(audio_dir) if f.endswith((".mp3", ".wav", ".m4a"))]

for audio_file in tqdm(audio_files, desc="转录进度"):
    audio_path = os.path.join(audio_dir, audio_file)
    
    result = model.transcribe(audio_path, language="zh")
    
    output_path = os.path.join(output_dir, f"{os.path.splitext(audio_file)[0]}.txt")
    
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(result["text"])

print("批量处理完成!")

并行处理 #

多进程处理 #

python
import whisper
import os
from multiprocessing import Pool, cpu_count
from functools import partial

def transcribe_file(audio_path, output_dir, model_size="base"):
    try:
        model = whisper.load_model(model_size)
        
        result = model.transcribe(audio_path, language="zh")
        
        filename = os.path.basename(audio_path)
        output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
        
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(result["text"])
        
        return {"file": filename, "status": "success", "text_length": len(result["text"])}
    
    except Exception as e:
        return {"file": os.path.basename(audio_path), "status": "error", "error": str(e)}

def batch_transcribe_parallel(audio_dir, output_dir, model_size="base", num_workers=None):
    os.makedirs(output_dir, exist_ok=True)
    
    audio_files = [
        os.path.join(audio_dir, f) 
        for f in os.listdir(audio_dir) 
        if f.endswith((".mp3", ".wav", ".m4a"))
    ]
    
    if num_workers is None:
        num_workers = min(cpu_count(), len(audio_files))
    
    transcribe_func = partial(transcribe_file, output_dir=output_dir, model_size=model_size)
    
    with Pool(num_workers) as pool:
        results = list(pool.imap(transcribe_func, audio_files))
    
    return results

results = batch_transcribe_parallel(
    "./audio_files",
    "./transcripts",
    model_size="base",
    num_workers=4
)

success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")

print(f"成功: {success_count}, 失败: {error_count}")

for r in results:
    if r["status"] == "error":
        print(f"错误 - {r['file']}: {r['error']}")

使用 ThreadPoolExecutor #

python
import whisper
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def transcribe_file(audio_path, output_dir, model_size="base"):
    model = whisper.load_model(model_size)
    
    result = model.transcribe(audio_path, language="zh")
    
    filename = os.path.basename(audio_path)
    output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
    
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(result["text"])
    
    return {"file": filename, "text": result["text"]}

def batch_transcribe_threaded(audio_dir, output_dir, model_size="base", max_workers=4):
    os.makedirs(output_dir, exist_ok=True)
    
    audio_files = [
        os.path.join(audio_dir, f) 
        for f in os.listdir(audio_dir) 
        if f.endswith((".mp3", ".wav", ".m4a"))
    ]
    
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(transcribe_file, f, output_dir, model_size): f 
            for f in audio_files
        }
        
        for future in tqdm(as_completed(futures), total=len(futures), desc="处理中"):
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"错误: {e}")
    
    return results

results = batch_transcribe_threaded("./audio_files", "./transcripts", "base", 4)

错误处理 #

完善的错误处理 #

python
import whisper
import os
import json
import traceback
from datetime import datetime

class BatchProcessor:
    def __init__(self, model_size="base", output_dir="./output"):
        self.model = whisper.load_model(model_size)
        self.output_dir = output_dir
        self.log_file = os.path.join(output_dir, "processing_log.json")
        os.makedirs(output_dir, exist_ok=True)
    
    def process_file(self, audio_path):
        result = {
            "file": audio_path,
            "timestamp": datetime.now().isoformat(),
            "status": "pending"
        }
        
        try:
            if not os.path.exists(audio_path):
                raise FileNotFoundError(f"文件不存在: {audio_path}")
            
            file_size = os.path.getsize(audio_path)
            if file_size == 0:
                raise ValueError(f"文件为空: {audio_path}")
            
            transcription = self.model.transcribe(audio_path, language="zh")
            
            result["status"] = "success"
            result["text"] = transcription["text"]
            result["language"] = transcription["language"]
            result["segments_count"] = len(transcription["segments"])
            result["duration"] = transcription["segments"][-1]["end"] if transcription["segments"] else 0
            
            self._save_transcript(audio_path, transcription)
            
        except FileNotFoundError as e:
            result["status"] = "error"
            result["error_type"] = "file_not_found"
            result["error_message"] = str(e)
        
        except ValueError as e:
            result["status"] = "error"
            result["error_type"] = "invalid_file"
            result["error_message"] = str(e)
        
        except Exception as e:
            result["status"] = "error"
            result["error_type"] = "transcription_error"
            result["error_message"] = str(e)
            result["traceback"] = traceback.format_exc()
        
        return result
    
    def _save_transcript(self, audio_path, transcription):
        filename = os.path.basename(audio_path)
        output_path = os.path.join(self.output_dir, f"{os.path.splitext(filename)[0]}.txt")
        
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(transcription["text"])
    
    def process_batch(self, audio_files, resume=False):
        results = []
        processed_files = set()
        
        if resume and os.path.exists(self.log_file):
            with open(self.log_file, "r", encoding="utf-8") as f:
                for line in f:
                    if line.strip():
                        entry = json.loads(line)
                        if entry["status"] == "success":
                            processed_files.add(entry["file"])
        
        for audio_path in audio_files:
            if audio_path in processed_files:
                print(f"跳过已处理: {audio_path}")
                continue
            
            print(f"处理: {audio_path}")
            result = self.process_file(audio_path)
            results.append(result)
            
            with open(self.log_file, "a", encoding="utf-8") as f:
                f.write(json.dumps(result, ensure_ascii=False) + "\n")
        
        return results
    
    def get_summary(self, results):
        total = len(results)
        success = sum(1 for r in results if r["status"] == "success")
        errors = sum(1 for r in results if r["status"] == "error")
        
        error_types = {}
        for r in results:
            if r["status"] == "error":
                error_type = r.get("error_type", "unknown")
                error_types[error_type] = error_types.get(error_type, 0) + 1
        
        return {
            "total": total,
            "success": success,
            "errors": errors,
            "success_rate": success / total if total > 0 else 0,
            "error_types": error_types
        }

processor = BatchProcessor("base", "./output")

audio_files = [
    os.path.join("./audio_files", f) 
    for f in os.listdir("./audio_files") 
    if f.endswith((".mp3", ".wav"))
]

results = processor.process_batch(audio_files, resume=True)

summary = processor.get_summary(results)
print(f"\n处理摘要:")
print(f"总数: {summary['total']}")
print(f"成功: {summary['success']}")
print(f"失败: {summary['errors']}")
print(f"成功率: {summary['success_rate']:.1%}")

结果导出 #

多格式导出 #

python
import whisper
import os
import json
import csv

def export_results(results, output_dir, formats=["txt", "json", "csv", "srt"]):
    os.makedirs(output_dir, exist_ok=True)
    
    if "txt" in formats:
        txt_dir = os.path.join(output_dir, "txt")
        os.makedirs(txt_dir, exist_ok=True)
        for result in results:
            if result["status"] == "success":
                filename = os.path.splitext(os.path.basename(result["file"]))[0]
                with open(os.path.join(txt_dir, f"{filename}.txt"), "w", encoding="utf-8") as f:
                    f.write(result["text"])
    
    if "json" in formats:
        json_path = os.path.join(output_dir, "results.json")
        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
    
    if "csv" in formats:
        csv_path = os.path.join(output_dir, "results.csv")
        with open(csv_path, "w", encoding="utf-8", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["file", "status", "language", "duration", "text"])
            for result in results:
                writer.writerow([
                    result["file"],
                    result["status"],
                    result.get("language", ""),
                    result.get("duration", ""),
                    result.get("text", "")
                ])
    
    if "srt" in formats:
        srt_dir = os.path.join(output_dir, "srt")
        os.makedirs(srt_dir, exist_ok=True)
        for result in results:
            if result["status"] == "success" and "segments" in result:
                filename = os.path.splitext(os.path.basename(result["file"]))[0]
                write_srt(result["segments"], os.path.join(srt_dir, f"{filename}.srt"))

def write_srt(segments, output_path):
    def format_timestamp(seconds):
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        millis = int((seconds % 1) * 1000)
        return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
    
    with open(output_path, "w", encoding="utf-8") as f:
        for i, segment in enumerate(segments, 1):
            start = format_timestamp(segment["start"])
            end = format_timestamp(segment["end"])
            text = segment["text"].strip()
            
            f.write(f"{i}\n")
            f.write(f"{start} --> {end}\n")
            f.write(f"{text}\n\n")

数据库存储 #

python
import whisper
import sqlite3
import os
from datetime import datetime

class TranscriptDatabase:
    def __init__(self, db_path="transcripts.db"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS transcripts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                filename TEXT NOT NULL,
                filepath TEXT NOT NULL,
                text TEXT,
                language TEXT,
                duration REAL,
                segments_count INTEGER,
                status TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        conn.commit()
        conn.close()
    
    def save_transcript(self, filename, filepath, text, language, duration, segments_count, status):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO transcripts (filename, filepath, text, language, duration, segments_count, status)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (filename, filepath, text, language, duration, segments_count, status))
        
        conn.commit()
        conn.close()
    
    def get_transcript(self, filename):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM transcripts WHERE filename = ?", (filename,))
        result = cursor.fetchone()
        
        conn.close()
        return result
    
    def get_all_transcripts(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("SELECT * FROM transcripts ORDER BY created_at DESC")
        results = cursor.fetchall()
        
        conn.close()
        return results

db = TranscriptDatabase()

model = whisper.load_model("base")
result = model.transcribe("audio.mp3")

db.save_transcript(
    filename="audio.mp3",
    filepath="/path/to/audio.mp3",
    text=result["text"],
    language=result["language"],
    duration=result["segments"][-1]["end"] if result["segments"] else 0,
    segments_count=len(result["segments"]),
    status="success"
)

使用 faster-whisper 批量处理 #

python
from faster_whisper import WhisperModel
import os
from tqdm import tqdm

model = WhisperModel("base", device="cuda", compute_type="float16")

def batch_transcribe_faster(audio_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    audio_files = [
        f for f in os.listdir(audio_dir) 
        if f.endswith((".mp3", ".wav", ".m4a"))
    ]
    
    for audio_file in tqdm(audio_files, desc="转录中"):
        audio_path = os.path.join(audio_dir, audio_file)
        
        segments, info = model.transcribe(audio_path, language="zh")
        
        text = " ".join(segment.text for segment in segments)
        
        output_path = os.path.join(output_dir, f"{os.path.splitext(audio_file)[0]}.txt")
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(text)

batch_transcribe_faster("./audio_files", "./transcripts")

下一步 #

掌握了批量处理后,继续学习 API 集成 了解如何构建语音识别服务!

最后更新:2026-04-05