EdgeTTS 语音合成
示例代码
示例代码
./edgeTTS_demo.py
py
"""
EdgeTTS 演示服务
环境要求:
- Python 3.8+
- 依赖包: fastapi, uvicorn, edge-tts
安装依赖:
pip install fastapi uvicorn edge-tts
启动服务:
uvicorn edgeTTS.demo:app --reload --port 8003
使用示例:
1. WebSocket测试:
- 连接地址: ws://localhost:8003/ws/edge_tts
- 【或者】连接地址: ws://127.0.0.1:8003/ws/edge_tts
- 发送任意文本消息测试连接
2. HTTP接口测试:
- POST请求: http://localhost:8003/edge_tts
- 参数: {"text": "要转换的文本", "format": "stream"或"json"}
"""
# 标准库导入
import asyncio
import base64
import os
import tempfile
import uuid
from typing import Dict, List
import json
# 第三方库导入
import edge_tts
# FastAPI相关导入
from fastapi import (
BackgroundTasks,
FastAPI,
Response,
WebSocket,
WebSocketDisconnect
)
from fastapi.responses import JSONResponse, StreamingResponse
app = FastAPI()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
"""初始化连接管理器"""
self.active_connections: Dict[str, WebSocket] = {}
self.client_user_map: Dict[str, str] = {}
async def connect(self, websocket: WebSocket, client_id: str = None) -> str:
await websocket.accept()
if client_id is None:
client_id = str(uuid.uuid4())
self.active_connections[client_id] = websocket
return client_id
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
if client_id in self.client_user_map:
del self.client_user_map[client_id]
async def send_personal_message(self, message: str, client_id: str):
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
def bind_user(self, client_id: str, user_id: str):
self.client_user_map[client_id] = user_id
async def send_to_user(self, message: str, user_id: str):
"""
通过user_id发送消息到对应的客户端
:param message: 要发送的消息内容
:param user_id: 目标用户ID
"""
for client_id, ws_user_id in self.client_user_map.items():
if ws_user_id == user_id and client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
async def broadcast_message(self, message: str):
"""
向所有活跃连接广播消息
:param message: 要广播的消息内容
"""
for client_id, websocket in self.active_connections.items():
await websocket.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint_test(websocket: WebSocket, user_id: str = None):
await websocket.accept()
print(f"WebSocket 连接建立,user_id: {user_id}")
try:
while True:
data = await websocket.receive_text()
# 这里可以处理接收到的消息
await websocket.send_text(data)
except WebSocketDisconnect:
print(f"Client disconnected")
except Exception as e:
print(f"An error occurred for client : {str(e)}")
@app.websocket("/ws/edge_tts")
async def websocket_endpoint(websocket: WebSocket, user_id: str = None):
client_id = await manager.connect(websocket)
manager.bind_user(client_id, user_id)
try:
while True:
data = await websocket.receive_text()
# 这里可以处理接收到的消息
await manager.send_personal_message(f"Message received: {data}", client_id)
except WebSocketDisconnect:
manager.disconnect(client_id)
print(f"Client {client_id} disconnected")
from pydantic import BaseModel
import pathlib
async def tts_batch(user_id: str, texts: List[str], output_dir: str = None, return_format: str = "path") -> List[str]:
"""
批量将文本转换为语音
:param texts: 要转换的文本数组
:param output_dir: 可选输出目录路径,如果为None则按当前时分秒生成目录
:param return_format: 返回格式,可选值: path(文件路径), stream(文件流), base64(base64编码)
:return: 根据return_format返回对应格式的数据列表
"""
if output_dir is None:
# 生成批次号目录
import datetime
now = datetime.datetime.now()
date_str = now.strftime("%Y%m%d")
time_str = now.strftime("%H%M%S")
batch_dir = f"{date_str}/{time_str}"
output_dir = os.path.join(output_dir, batch_dir)
print(f"output_dir: {output_dir}")
# 创建输出目录
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
print(f"输出目录: {output_dir}")
results = []
for i, text in enumerate(texts):
try:
output_path = os.path.join(output_dir, f"{i}.wav")
result = await tts_one(text, output_path, return_format)
res_json = {
'status': 'success',
'result': result,
'text': text
}
results.append(res_json)
await manager.send_to_user(json.dumps(res_json), user_id)
await manager.broadcast_message('result')
except Exception as e:
results.append({"status": "error", "error": str(e), "text": text})
return results
async def tts_one(text: str, output_path: str = None, return_format: str = "path"):
"""
将文本转换为语音并返回指定格式
:param text: 要转换的文本
:param output_path: 可选的文件输出路径,如果为None则按日期时间生成路径
:param return_format: 返回格式,可选值: path(文件路径), stream(文件流), base64(base64编码)
:return: 根据return_format返回对应格式的数据
"""
try:
print(f"output_path - 1: {output_path}")
communicate = edge_tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
# 确保输出目录存在,如果不存在,则创建
if output_path is None:
import datetime
now = datetime.datetime.now()
date_str = now.strftime("%Y%m%d")
time_str = now.strftime("%H%M%S")
output_dir = os.path.join("test", date_str)
output_path = os.path.join(output_dir, f"tts_{time_str}.wav")
print(f"output_path - 2: {output_path}")
pathlib.Path(output_path).parent.mkdir(parents=True, exist_ok=True)
await communicate.save(str(output_path))
# 根据返回格式处理
if return_format == "path":
return output_path
elif return_format == "stream":
def iterfile():
with open(output_path, "rb") as f:
while chunk := f.read(1024 * 1024):
yield chunk
return StreamingResponse(iterfile(), media_type="audio/wav")
elif return_format == "base64":
with open(output_path, "rb") as f:
audio_data = f.read()
base64_data = base64.b64encode(audio_data).decode('utf-8')
return { "file_path": output_path, "audio": base64_data}
else:
raise ValueError(f"不支持的返回格式: {return_format}")
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
from pydantic import BaseModel
class TextToSpeechRequest(BaseModel):
text: str
format: str = "stream"
class AsyncTextToSpeechRequest(BaseModel):
user_id: str
texts: List[str]
@app.post("/edge_tts")
async def text_to_speech(request: TextToSpeechRequest):
"""
将文本转换为语音
:param text: 要转换的文本
:param format: 返回格式,stream(音频流)或json(base64编码)
"""
try:
if request.format == "stream":
audio_stream = await tts_one(request.text, None, "stream")
return audio_stream
else:
res = await tts_one(request.text, None, request.format)
return JSONResponse(content=res)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.post("/async_edge_tts")
async def async_text_to_speech(request: AsyncTextToSpeechRequest, background_tasks: BackgroundTasks):
"""
异步批量语音合成接口
:param request: 包含要转换的文本数组的请求体
:param user_id: 可选的用户ID,用于WebSocket通知
:return: 任务ID
"""
# 生成批次号目录
import datetime
now = datetime.datetime.now()
date_str = now.strftime("%Y%m%d")
time_str = now.strftime("%H%M%S")
batch_dir = f"{date_str}/{time_str}"
output_dir = os.path.join('./test', batch_dir)
print(f"output_dir: {output_dir}")
background_tasks.add_task(tts_batch, request.user_id, request.texts, output_dir)
return JSONResponse({"code": 200, "msg": "任务处理中", "output_dir": output_dir})
if __name__ == "__main__":
import uvicorn
uvicorn.run("edgeTTS_demo:app", host="0.0.0.0", port=8003, reload=True, log_level='info')
# 也可以执行命令来启动
# uvicorn edgeTTS_demo:app --reload --port 8003Docker Compose 部署
docker-compose.yml
./docker-compose.yml
yml
version: '3.8'
services:
edge-tts:
build:
context: .
dockerfile: Dockerfile.edgetts
ports:
- "8003:8003"
volumes:
- ./:/app
# - ./test:/app/test
environment:
- TZ=Asia/Shanghai
restart: unless-stopped使用说明:
- 确保已安装Docker和Docker Compose
- 在项目目录下创建docker-compose.yml文件
- 执行
docker-compose up -d启动服务 - 服务将在 8003 端口运行
Dockerfile.edgetts
./Dockerfile.edgetts
edgetts
# 使用官方Python镜像作为基础镜像
FROM python:3.8-slim
# 设置工作目录
WORKDIR /app
# 复制当前目录内容到容器中的/app目录
# COPY . /app
# 安装依赖
RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ \
&& pip install --no-cache-dir fastapi uvicorn edge-tts websockets pathlib
# 暴露端口
EXPOSE 8003
# 启动命令
CMD ["uvicorn", "edgeTTS_demo:app", "--host", "0.0.0.0", "--port", "8003", "--reload"]