Files
digital_human_backend/function/async_video_api.py
2025-09-05 00:43:20 +08:00

369 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
异步视频处理和数字人合成API模块
支持文件上传、异步任务处理、模板管理等功能
"""
import asyncio
import uuid as uuid_lib
from typing import Optional, Dict, Any, List
import os
import json
import time
from enum import Enum
from dataclasses import dataclass
import threading
import queue
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def resolve_file_path(filename: str, file_type: str) -> str:
"""
解析文件路径支持Docker容器挂载目录
Args:
filename: 文件名(可能包含路径)
file_type: 文件类型 ('audio', 'video', 'image')
Returns:
可访问的文件路径
"""
# 如果是完整路径,直接返回
if os.path.isabs(filename) and os.path.exists(filename):
return filename
# 仅文件名的情况,尝试多个可能的路径
base_filename = os.path.basename(filename)
# 定义搜索路径优先级
search_paths = []
if file_type == 'audio':
search_paths = [
f"/mnt/docker/resource/uploads/audio/{base_filename}",
f"{os.path.expanduser('~/heygem_data/voice/data')}/{base_filename}",
f"/mnt/docker/resource/{base_filename}",
f"/mnt/docker/code/data/temp/{base_filename}"
]
elif file_type == 'video':
search_paths = [
f"/mnt/docker/resource/uploads/video/{base_filename}",
f"{os.path.expanduser('~/heygem_data/face2face/temp')}/{base_filename}",
f"/mnt/docker/resource/{base_filename}",
f"/mnt/docker/video_resource/{base_filename}"
]
elif file_type == 'image':
search_paths = [
f"/mnt/docker/resource/uploads/image/{base_filename}",
f"/mnt/docker/resource/{base_filename}"
]
# 查找第一个存在的文件
for path in search_paths:
if os.path.exists(path):
logger.info(f"Resolved {file_type} file: {filename} -> {path}")
return path
# 如果都没找到,返回原始文件名(让调用者处理错误)
logger.warning(f"Could not resolve {file_type} file path: {filename}")
return filename
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待中
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消
class TaskType(Enum):
"""任务类型枚举"""
VOICE_GENERATION = "voice_generation"
DIGITAL_HUMAN_CREATION = "digital_human_creation"
VIDEO_COMPOSITION = "video_composition"
TEMPLATE_CREATION = "template_creation"
AUDIO_EXTRACTION = "audio_extraction"
@dataclass
class Task:
"""任务数据类"""
task_id: str
task_type: TaskType
status: TaskStatus
progress: int = 0
result: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
created_at: datetime = None
updated_at: datetime = None
user_id: Optional[str] = None
input_data: Optional[Dict[str, Any]] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
self.updated_at = self.created_at
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.task_queue = queue.Queue()
self.worker_thread = None
self.is_running = False
def start_worker(self):
"""启动工作线程"""
if not self.is_running:
self.is_running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
logger.info("Task worker started")
def stop_worker(self):
"""停止工作线程"""
self.is_running = False
if self.worker_thread:
self.worker_thread.join()
logger.info("Task worker stopped")
def create_task(self, task_type: TaskType, input_data: Dict[str, Any], user_id: str = None) -> str:
"""创建新任务"""
task_id = str(uuid_lib.uuid4())
task = Task(
task_id=task_id,
task_type=task_type,
status=TaskStatus.PENDING,
input_data=input_data,
user_id=user_id
)
self.tasks[task_id] = task
self.task_queue.put(task_id)
logger.info(f"Created task {task_id} of type {task_type.value}")
return task_id
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务信息"""
return self.tasks.get(task_id)
def update_task_status(self, task_id: str, status: TaskStatus, progress: int = None, result: Dict[str, Any] = None, error_message: str = None):
"""更新任务状态"""
if task_id in self.tasks:
task = self.tasks[task_id]
task.status = status
if progress is not None:
task.progress = progress
if result is not None:
task.result = result
if error_message is not None:
task.error_message = error_message
task.updated_at = datetime.now()
logger.info(f"Updated task {task_id}: status={status.value}, progress={progress}")
def _worker_loop(self):
"""工作线程主循环"""
while self.is_running:
try:
task_id = self.task_queue.get(timeout=1)
self._process_task(task_id)
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in worker loop: {str(e)}")
def _process_task(self, task_id: str):
"""处理具体任务"""
task = self.get_task(task_id)
if not task:
logger.error(f"Task {task_id} not found")
return
try:
self.update_task_status(task_id, TaskStatus.PROCESSING, 10)
if task.task_type == TaskType.VOICE_GENERATION:
result = self._process_voice_generation(task)
elif task.task_type == TaskType.DIGITAL_HUMAN_CREATION:
result = self._process_digital_human_creation(task)
elif task.task_type == TaskType.VIDEO_COMPOSITION:
result = self._process_video_composition(task)
elif task.task_type == TaskType.TEMPLATE_CREATION:
result = self._process_template_creation(task)
elif task.task_type == TaskType.AUDIO_EXTRACTION:
result = self._process_audio_extraction(task)
else:
raise ValueError(f"Unknown task type: {task.task_type}")
self.update_task_status(task_id, TaskStatus.COMPLETED, 100, result)
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
self.update_task_status(task_id, TaskStatus.FAILED, error_message=str(e))
def _process_voice_generation(self, task: Task) -> Dict[str, Any]:
"""处理语音生成任务"""
import digital_human_api
input_data = task.input_data
text = input_data['text']
reference_audio = input_data.get('reference_audio')
reference_text = input_data.get('reference_text', '')
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 30)
# 如果需要先训练声音模型
if reference_audio and not reference_text:
voice_result = digital_human_api.train_voice_v2(reference_audio)
reference_audio = voice_result.get('asr_format_audio_url')
reference_text = voice_result.get('reference_audio_text', '')
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 60)
# 生成语音
audio_path = digital_human_api.generate_voice_v2(text, reference_audio, reference_text, uuid)
return {
"audio_path": audio_path,
"audio_url": f"/download/generated/audio/{uuid}",
"uuid": uuid,
"text": text
}
def _process_digital_human_creation(self, task: Task) -> Dict[str, Any]:
"""处理数字人创建任务"""
import digital_human_api
input_data = task.input_data
speech_text = input_data['speech_text']
sample_video = input_data['sample_video']
sample_voice = input_data['sample_voice']
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 20)
# 生成数字人(直接传递文件名)
result = digital_human_api.generate_digital_human_v2(
speech_text,
sample_video,
sample_voice,
uuid
)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 80)
return {
"digital_human_result": result,
"video_url": f"/download/generated/video/{uuid}",
"audio_url": f"/download/generated/audio/{uuid}",
"uuid": uuid,
"speech_text": speech_text
}
def _process_video_composition(self, task: Task) -> Dict[str, Any]:
"""处理视频合成任务"""
import api
input_data = task.input_data
template_id = input_data['template_id']
audio_file = input_data['audio_file']
text_content = input_data.get('text_content', '')
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 40)
# 获取模板信息
template_info = self._get_template_info(template_id)
if not template_info:
raise ValueError(f"Template {template_id} not found")
# 使用模板合成视频
result = api.generate_video(template_info['video_path'], audio_file, uuid)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 90)
return {
"composition_result": result,
"video_url": f"/download/generated/video/{uuid}",
"template_id": template_id,
"uuid": uuid
}
def _process_template_creation(self, task: Task) -> Dict[str, Any]:
"""处理模板创建任务"""
import api
input_data = task.input_data
person_image = input_data['person_image']
title_text = input_data.get('title_text', '')
title_position = tuple(input_data.get('title_position', [50, 50]))
title_font_size = input_data.get('title_font_size', 48)
background_image = input_data['background_image']
video_length = input_data.get('video_length', 10.0)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 50)
# 生成带标题的模板视频
template_path = api.generate_video_with_title(
person_image, title_text, title_position,
title_font_size, background_image, video_length
)
# 保存模板信息
template_id = self._save_template_info({
"template_path": template_path,
"title_text": title_text,
"person_image": person_image,
"background_image": background_image,
"created_at": datetime.now().isoformat()
})
return {
"template_id": template_id,
"template_path": template_path,
"template_url": f"/download/template/{template_id}",
"title_text": title_text
}
def _process_audio_extraction(self, task: Task) -> Dict[str, Any]:
"""处理音频提取任务"""
import api
input_data = task.input_data
video_file = input_data['video_file']
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 50)
# 提取文本
extracted_text = api.speech_to_text(video_file)
return {
"extracted_text": extracted_text,
"video_file": video_file
}
def _get_template_info(self, template_id: str) -> Optional[Dict[str, Any]]:
"""获取模板信息"""
template_file = f"/mnt/docker/resource/templates/{template_id}.json"
if os.path.exists(template_file):
with open(template_file, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def _save_template_info(self, template_data: Dict[str, Any]) -> str:
"""保存模板信息"""
template_id = str(uuid_lib.uuid4())
template_dir = "/mnt/docker/resource/templates"
os.makedirs(template_dir, exist_ok=True)
template_file = os.path.join(template_dir, f"{template_id}.json")
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(template_data, f, ensure_ascii=False, indent=2)
return template_id
# 全局任务管理器实例
task_manager = TaskManager()
task_manager.start_worker()