Skip to content

合成视频.解析进度.异步不阻塞

事件生成器代码

事件生成器代码
python
import subprocess
import re
import json
import asyncio

# 事件生成器,用于实时返回 ffmpeg 执行进度
async def ffmpeg_event_stream(image_path, audio_path, video_path):
    cmd = [
        'ffmpeg',
        '-loop',
        '1',
        '-framerate',
        '25',
        '-i',
        image_path,
        '-i',
        audio_path,
        '-vf',
        "scale=1920:1080,format=yuv420p",
        '-c:v',
        'libx264',
        '-c:a',
        'aac',
        '-shortest',
        '-preset',
        'veryfast',
        # '-preset', 'slow',
        '-crf',
        '18',
        '-pix_fmt',
        'yuv420p',
        '-y',  # 覆盖已存在文件
        '-movflags',
        '+faststart',  # 优化播放
        video_path,
    ]
    import subprocess

    current_time = 0

    # 使用 asyncio.to_thread 在单独的线程中运行阻塞的 ffmpeg 进程
    process = await asyncio.to_thread(
        subprocess.Popen,
        cmd,
        stderr=subprocess.PIPE,
        universal_newlines=True,
        encoding='utf-8',
        errors='replace',
    )

    while True:
        # 使用 asyncio.to_thread 在单独的线程中运行 readline,避免阻塞事件循环
        output = await asyncio.to_thread(process.stderr.readline)
        if output == '' and process.poll() is not None:
            # 进程结束
            # 进程结束,返回视频路径
            msg_json = {
                "current_time": current_time,
                "done": True,
                "message": "视频创建完成",
                "video_path": video_path,
            }
            yield msg_json
            break
        if output:
            # 尝试从 ffmpeg 输出中解析时间信息
            match = re.search(r"time=([0-9:.]+)", output)
            if match:
                time_str = match.group(1)
                # 将时间字符串转换为秒数
                hours, minutes, seconds = map(float, time_str.split(':'))
                current_time = hours * 3600 + minutes * 60 + seconds
                msg_json = {"current_time": current_time, "message": output}
                yield msg_json

调用事件生成器

  • fastapi 调用
调用事件生成器代码
python

def get_audio_duration(audio_path):
    """
    获取音频文件的时长
    audio_path: 音频文件路径
    """
    import soundfile as sf

    with sf.SoundFile(audio_path) as f:
        duration = f.frames / f.samplerate  # 时长(秒)

    return duration


@router.post("/sse/video")
async def slide_video_sse():
    """
    生成视频文件
    """
    # 图片地址、音频地址、视频地址
    image_path = 'demo.jpg'
    audio_path = 'demo.mp3'
    video_path = "demo.mp4"

    # 读音频长度
    audio_duration = get_audio_duration(audio_path)

    # 创建一个队列用于通信
    queue = asyncio.Queue()

    # 定义一个异步函数来执行 ffmpeg 命令
    async def run_ffmpeg_task():
        try:
            
            # 推送开始消息
            await queue.put(json.dumps({
                "status": "started",
                "message": "视频生成已开始",
                "video_path": video_path
            }))
            
            # 执行 ffmpeg 命令
            async for event in ffmpeg_event_stream(image_path, audio_path, video_path):
                done = event.get('done', False)
                if done:
                    msg_json = {
                        "progress": 100,
                        "current_time": event.get('current_time', 0),
                        'audio_duration': audio_duration,
                        "done": True,
                        "message": "视频生成完成",
                        "video_path": video_path
                    }
                    logger.info(msg_json)
                    await queue.put(json.dumps(msg_json))
                    break

                # 计算进度
                progress = (
                    (event.get('current_time', 0) / audio_duration) * 100
                    if audio_duration
                    else 0
                )
                progress = int(progress)
                
                msg_json = {
                    "progress": progress,
                    "current_time": event.get('current_time', 0),
                    'audio_duration': audio_duration,
                    "done": False,
                    "message": f"视频生成进度: {progress}%"
                }
                logger.info(msg_json)
                await queue.put(json.dumps(msg_json))
        except Exception as e:
            logger.error(f"视频生成失败: {str(e)}")
            await queue.put(json.dumps({
                "status": "error",
                "message": f"视频生成失败: {str(e)}",
                "done": True
            }))
        finally:
            # 发送结束信号
            await queue.put("done")
    
    # 启动 ffmpeg 任务但不等待
    asyncio.create_task(run_ffmpeg_task())
    
    # 立即返回流式响应,不等待视频生成完成
    async def event_stream():
        while True:
            event = await queue.get()
            yield f"data: {event}\n\n"
            if event == "done":
                break
    
    return StreamingResponse(event_stream(), media_type="text/event-stream")