批量处理 #
批量处理概述 #
批量处理是处理大量音频文件的常见需求,需要考虑效率、可靠性和结果管理。
text
┌─────────────────────────────────────────────────────────────┐
│ 批量处理流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 输入阶段: │
│ ├── 扫描音频文件 │
│ ├── 过滤和排序 │
│ └── 创建任务队列 │
│ │
│ 处理阶段: │
│ ├── 并行/串行处理 │
│ ├── 进度跟踪 │
│ └── 错误处理 │
│ │
│ 输出阶段: │
│ ├── 结果收集 │
│ ├── 格式转换 │
│ └── 持久化存储 │
│ │
└─────────────────────────────────────────────────────────────┘
基本批量处理 #
简单批量转录 #
python
import whisper
import os
model = whisper.load_model("base")
audio_dir = "./audio_files"
output_dir = "./transcripts"
os.makedirs(output_dir, exist_ok=True)
audio_extensions = (".mp3", ".wav", ".m4a", ".flac", ".ogg")
audio_files = [f for f in os.listdir(audio_dir) if f.lower().endswith(audio_extensions)]
for audio_file in audio_files:
audio_path = os.path.join(audio_dir, audio_file)
print(f"处理: {audio_file}")
result = model.transcribe(audio_path, language="zh")
output_filename = os.path.splitext(audio_file)[0] + ".txt"
output_path = os.path.join(output_dir, output_filename)
with open(output_path, "w", encoding="utf-8") as f:
f.write(result["text"])
print(f"完成: {output_filename}")
print(f"\n共处理 {len(audio_files)} 个文件")
带进度显示 #
python
import whisper
import os
from tqdm import tqdm
model = whisper.load_model("base")
audio_dir = "./audio_files"
output_dir = "./transcripts"
os.makedirs(output_dir, exist_ok=True)
audio_files = [f for f in os.listdir(audio_dir) if f.endswith((".mp3", ".wav", ".m4a"))]
for audio_file in tqdm(audio_files, desc="转录进度"):
audio_path = os.path.join(audio_dir, audio_file)
result = model.transcribe(audio_path, language="zh")
output_path = os.path.join(output_dir, f"{os.path.splitext(audio_file)[0]}.txt")
with open(output_path, "w", encoding="utf-8") as f:
f.write(result["text"])
print("批量处理完成!")
并行处理 #
多进程处理 #
python
import whisper
import os
from multiprocessing import Pool, cpu_count
from functools import partial
def transcribe_file(audio_path, output_dir, model_size="base"):
try:
model = whisper.load_model(model_size)
result = model.transcribe(audio_path, language="zh")
filename = os.path.basename(audio_path)
output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
with open(output_path, "w", encoding="utf-8") as f:
f.write(result["text"])
return {"file": filename, "status": "success", "text_length": len(result["text"])}
except Exception as e:
return {"file": os.path.basename(audio_path), "status": "error", "error": str(e)}
def batch_transcribe_parallel(audio_dir, output_dir, model_size="base", num_workers=None):
os.makedirs(output_dir, exist_ok=True)
audio_files = [
os.path.join(audio_dir, f)
for f in os.listdir(audio_dir)
if f.endswith((".mp3", ".wav", ".m4a"))
]
if num_workers is None:
num_workers = min(cpu_count(), len(audio_files))
transcribe_func = partial(transcribe_file, output_dir=output_dir, model_size=model_size)
with Pool(num_workers) as pool:
results = list(pool.imap(transcribe_func, audio_files))
return results
results = batch_transcribe_parallel(
"./audio_files",
"./transcripts",
model_size="base",
num_workers=4
)
success_count = sum(1 for r in results if r["status"] == "success")
error_count = sum(1 for r in results if r["status"] == "error")
print(f"成功: {success_count}, 失败: {error_count}")
for r in results:
if r["status"] == "error":
print(f"错误 - {r['file']}: {r['error']}")
使用 ThreadPoolExecutor #
python
import whisper
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def transcribe_file(audio_path, output_dir, model_size="base"):
model = whisper.load_model(model_size)
result = model.transcribe(audio_path, language="zh")
filename = os.path.basename(audio_path)
output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
with open(output_path, "w", encoding="utf-8") as f:
f.write(result["text"])
return {"file": filename, "text": result["text"]}
def batch_transcribe_threaded(audio_dir, output_dir, model_size="base", max_workers=4):
os.makedirs(output_dir, exist_ok=True)
audio_files = [
os.path.join(audio_dir, f)
for f in os.listdir(audio_dir)
if f.endswith((".mp3", ".wav", ".m4a"))
]
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(transcribe_file, f, output_dir, model_size): f
for f in audio_files
}
for future in tqdm(as_completed(futures), total=len(futures), desc="处理中"):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"错误: {e}")
return results
results = batch_transcribe_threaded("./audio_files", "./transcripts", "base", 4)
错误处理 #
完善的错误处理 #
python
import whisper
import os
import json
import traceback
from datetime import datetime
class BatchProcessor:
def __init__(self, model_size="base", output_dir="./output"):
self.model = whisper.load_model(model_size)
self.output_dir = output_dir
self.log_file = os.path.join(output_dir, "processing_log.json")
os.makedirs(output_dir, exist_ok=True)
def process_file(self, audio_path):
result = {
"file": audio_path,
"timestamp": datetime.now().isoformat(),
"status": "pending"
}
try:
if not os.path.exists(audio_path):
raise FileNotFoundError(f"文件不存在: {audio_path}")
file_size = os.path.getsize(audio_path)
if file_size == 0:
raise ValueError(f"文件为空: {audio_path}")
transcription = self.model.transcribe(audio_path, language="zh")
result["status"] = "success"
result["text"] = transcription["text"]
result["language"] = transcription["language"]
result["segments_count"] = len(transcription["segments"])
result["duration"] = transcription["segments"][-1]["end"] if transcription["segments"] else 0
self._save_transcript(audio_path, transcription)
except FileNotFoundError as e:
result["status"] = "error"
result["error_type"] = "file_not_found"
result["error_message"] = str(e)
except ValueError as e:
result["status"] = "error"
result["error_type"] = "invalid_file"
result["error_message"] = str(e)
except Exception as e:
result["status"] = "error"
result["error_type"] = "transcription_error"
result["error_message"] = str(e)
result["traceback"] = traceback.format_exc()
return result
def _save_transcript(self, audio_path, transcription):
filename = os.path.basename(audio_path)
output_path = os.path.join(self.output_dir, f"{os.path.splitext(filename)[0]}.txt")
with open(output_path, "w", encoding="utf-8") as f:
f.write(transcription["text"])
def process_batch(self, audio_files, resume=False):
results = []
processed_files = set()
if resume and os.path.exists(self.log_file):
with open(self.log_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
entry = json.loads(line)
if entry["status"] == "success":
processed_files.add(entry["file"])
for audio_path in audio_files:
if audio_path in processed_files:
print(f"跳过已处理: {audio_path}")
continue
print(f"处理: {audio_path}")
result = self.process_file(audio_path)
results.append(result)
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
return results
def get_summary(self, results):
total = len(results)
success = sum(1 for r in results if r["status"] == "success")
errors = sum(1 for r in results if r["status"] == "error")
error_types = {}
for r in results:
if r["status"] == "error":
error_type = r.get("error_type", "unknown")
error_types[error_type] = error_types.get(error_type, 0) + 1
return {
"total": total,
"success": success,
"errors": errors,
"success_rate": success / total if total > 0 else 0,
"error_types": error_types
}
processor = BatchProcessor("base", "./output")
audio_files = [
os.path.join("./audio_files", f)
for f in os.listdir("./audio_files")
if f.endswith((".mp3", ".wav"))
]
results = processor.process_batch(audio_files, resume=True)
summary = processor.get_summary(results)
print(f"\n处理摘要:")
print(f"总数: {summary['total']}")
print(f"成功: {summary['success']}")
print(f"失败: {summary['errors']}")
print(f"成功率: {summary['success_rate']:.1%}")
结果导出 #
多格式导出 #
python
import whisper
import os
import json
import csv
def export_results(results, output_dir, formats=["txt", "json", "csv", "srt"]):
os.makedirs(output_dir, exist_ok=True)
if "txt" in formats:
txt_dir = os.path.join(output_dir, "txt")
os.makedirs(txt_dir, exist_ok=True)
for result in results:
if result["status"] == "success":
filename = os.path.splitext(os.path.basename(result["file"]))[0]
with open(os.path.join(txt_dir, f"{filename}.txt"), "w", encoding="utf-8") as f:
f.write(result["text"])
if "json" in formats:
json_path = os.path.join(output_dir, "results.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
if "csv" in formats:
csv_path = os.path.join(output_dir, "results.csv")
with open(csv_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["file", "status", "language", "duration", "text"])
for result in results:
writer.writerow([
result["file"],
result["status"],
result.get("language", ""),
result.get("duration", ""),
result.get("text", "")
])
if "srt" in formats:
srt_dir = os.path.join(output_dir, "srt")
os.makedirs(srt_dir, exist_ok=True)
for result in results:
if result["status"] == "success" and "segments" in result:
filename = os.path.splitext(os.path.basename(result["file"]))[0]
write_srt(result["segments"], os.path.join(srt_dir, f"{filename}.srt"))
def write_srt(segments, output_path):
def format_timestamp(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
with open(output_path, "w", encoding="utf-8") as f:
for i, segment in enumerate(segments, 1):
start = format_timestamp(segment["start"])
end = format_timestamp(segment["end"])
text = segment["text"].strip()
f.write(f"{i}\n")
f.write(f"{start} --> {end}\n")
f.write(f"{text}\n\n")
数据库存储 #
python
import whisper
import sqlite3
import os
from datetime import datetime
class TranscriptDatabase:
def __init__(self, db_path="transcripts.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS transcripts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
filepath TEXT NOT NULL,
text TEXT,
language TEXT,
duration REAL,
segments_count INTEGER,
status TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def save_transcript(self, filename, filepath, text, language, duration, segments_count, status):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO transcripts (filename, filepath, text, language, duration, segments_count, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (filename, filepath, text, language, duration, segments_count, status))
conn.commit()
conn.close()
def get_transcript(self, filename):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM transcripts WHERE filename = ?", (filename,))
result = cursor.fetchone()
conn.close()
return result
def get_all_transcripts(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM transcripts ORDER BY created_at DESC")
results = cursor.fetchall()
conn.close()
return results
db = TranscriptDatabase()
model = whisper.load_model("base")
result = model.transcribe("audio.mp3")
db.save_transcript(
filename="audio.mp3",
filepath="/path/to/audio.mp3",
text=result["text"],
language=result["language"],
duration=result["segments"][-1]["end"] if result["segments"] else 0,
segments_count=len(result["segments"]),
status="success"
)
使用 faster-whisper 批量处理 #
python
from faster_whisper import WhisperModel
import os
from tqdm import tqdm
model = WhisperModel("base", device="cuda", compute_type="float16")
def batch_transcribe_faster(audio_dir, output_dir):
os.makedirs(output_dir, exist_ok=True)
audio_files = [
f for f in os.listdir(audio_dir)
if f.endswith((".mp3", ".wav", ".m4a"))
]
for audio_file in tqdm(audio_files, desc="转录中"):
audio_path = os.path.join(audio_dir, audio_file)
segments, info = model.transcribe(audio_path, language="zh")
text = " ".join(segment.text for segment in segments)
output_path = os.path.join(output_dir, f"{os.path.splitext(audio_file)[0]}.txt")
with open(output_path, "w", encoding="utf-8") as f:
f.write(text)
batch_transcribe_faster("./audio_files", "./transcripts")
下一步 #
掌握了批量处理后,继续学习 API 集成 了解如何构建语音识别服务!
最后更新:2026-04-05