合成视频.解析进度.异步不阻塞
事件生成器代码
事件生成器代码
python
import subprocess
import re
import json
import asyncio
# 事件生成器,用于实时返回 ffmpeg 执行进度
async def ffmpeg_event_stream(image_path, audio_path, video_path):
cmd = [
'ffmpeg',
'-loop',
'1',
'-framerate',
'25',
'-i',
image_path,
'-i',
audio_path,
'-vf',
"scale=1920:1080,format=yuv420p",
'-c:v',
'libx264',
'-c:a',
'aac',
'-shortest',
'-preset',
'veryfast',
# '-preset', 'slow',
'-crf',
'18',
'-pix_fmt',
'yuv420p',
'-y', # 覆盖已存在文件
'-movflags',
'+faststart', # 优化播放
video_path,
]
import subprocess
current_time = 0
# 使用 asyncio.to_thread 在单独的线程中运行阻塞的 ffmpeg 进程
process = await asyncio.to_thread(
subprocess.Popen,
cmd,
stderr=subprocess.PIPE,
universal_newlines=True,
encoding='utf-8',
errors='replace',
)
while True:
# 使用 asyncio.to_thread 在单独的线程中运行 readline,避免阻塞事件循环
output = await asyncio.to_thread(process.stderr.readline)
if output == '' and process.poll() is not None:
# 进程结束
# 进程结束,返回视频路径
msg_json = {
"current_time": current_time,
"done": True,
"message": "视频创建完成",
"video_path": video_path,
}
yield msg_json
break
if output:
# 尝试从 ffmpeg 输出中解析时间信息
match = re.search(r"time=([0-9:.]+)", output)
if match:
time_str = match.group(1)
# 将时间字符串转换为秒数
hours, minutes, seconds = map(float, time_str.split(':'))
current_time = hours * 3600 + minutes * 60 + seconds
msg_json = {"current_time": current_time, "message": output}
yield msg_json调用事件生成器
- fastapi 调用
调用事件生成器代码
python
def get_audio_duration(audio_path):
"""
获取音频文件的时长
audio_path: 音频文件路径
"""
import soundfile as sf
with sf.SoundFile(audio_path) as f:
duration = f.frames / f.samplerate # 时长(秒)
return duration
@router.post("/sse/video")
async def slide_video_sse():
"""
生成视频文件
"""
# 图片地址、音频地址、视频地址
image_path = 'demo.jpg'
audio_path = 'demo.mp3'
video_path = "demo.mp4"
# 读音频长度
audio_duration = get_audio_duration(audio_path)
# 创建一个队列用于通信
queue = asyncio.Queue()
# 定义一个异步函数来执行 ffmpeg 命令
async def run_ffmpeg_task():
try:
# 推送开始消息
await queue.put(json.dumps({
"status": "started",
"message": "视频生成已开始",
"video_path": video_path
}))
# 执行 ffmpeg 命令
async for event in ffmpeg_event_stream(image_path, audio_path, video_path):
done = event.get('done', False)
if done:
msg_json = {
"progress": 100,
"current_time": event.get('current_time', 0),
'audio_duration': audio_duration,
"done": True,
"message": "视频生成完成",
"video_path": video_path
}
logger.info(msg_json)
await queue.put(json.dumps(msg_json))
break
# 计算进度
progress = (
(event.get('current_time', 0) / audio_duration) * 100
if audio_duration
else 0
)
progress = int(progress)
msg_json = {
"progress": progress,
"current_time": event.get('current_time', 0),
'audio_duration': audio_duration,
"done": False,
"message": f"视频生成进度: {progress}%"
}
logger.info(msg_json)
await queue.put(json.dumps(msg_json))
except Exception as e:
logger.error(f"视频生成失败: {str(e)}")
await queue.put(json.dumps({
"status": "error",
"message": f"视频生成失败: {str(e)}",
"done": True
}))
finally:
# 发送结束信号
await queue.put("done")
# 启动 ffmpeg 任务但不等待
asyncio.create_task(run_ffmpeg_task())
# 立即返回流式响应,不等待视频生成完成
async def event_stream():
while True:
event = await queue.get()
yield f"data: {event}\n\n"
if event == "done":
break
return StreamingResponse(event_stream(), media_type="text/event-stream")