Files
2025-09-05 00:40:39 +08:00

487 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Digital Human API Module
处理语音生成、视频生成、数字人创建等功能的API模块
"""
import requests
from PIL import Image, ImageDraw, ImageFont
from moviepy import ImageClip, VideoFileClip, TextClip, CompositeVideoClip
import tempfile
import os
import subprocess
import whisper
from flask import send_file, abort
import mimetypes
import logging
from typing import Optional, Dict, Any, Tuple, List
import json
from pathlib import Path
import time
# 假设此模块用于处理文件上传和管理
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 配置常量
class Config:
"""服务和文件路径配置"""
# 服务端口配置
VOICE_SERVICE_URL = "http://127.0.0.1:18180"
VIDEO_SERVICE_URL = "http://127.0.0.1:8383"
# 文件路径配置 (使用os.path.expanduser处理~符号)
RESOURCE_DIR = os.path.expanduser("/mnt/docker/resource")
TEMP_DIR = os.path.expanduser("/mnt/docker/code/data/temp")
VOICE_DATA_DIR = os.path.expanduser("~/heygem_data/voice/data")
FACE2FACE_TEMP_DIR = os.path.expanduser("~/heygem_data/face2face/temp")
# 支持的文件格式
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm']
AUDIO_EXTENSIONS = ['.wav', '.mp3', '.aac', '.flac', '.ogg', '.m4a', '.wma']
# 默认参数
DEFAULT_VOICE_PARAMS = {
"format": "wav",
"topP": 0.7,
"max_new_tokens": 1024,
"chunk_length": 100,
"repetition_penalty": 1.2,
"temperature": 0.7,
"need_asr": False,
"streaming": False,
"is_fixed_seed": 0,
"is_norm": 0,
}
class APIException(Exception):
"""自定义API异常类"""
def __init__(self, message: str, status_code: int = 500):
self.message = message
self.status_code = status_code
super().__init__(self.message)
def _make_request(url: str, data: Dict[str, Any], method: str = "POST") -> Dict[str, Any]:
"""
统一的HTTP请求处理函数
"""
try:
response = None
if method.upper() == "POST":
response = requests.post(url, json=data, timeout=6000000)
else:
response = requests.get(url, params=data, timeout=600000)
response.raise_for_status() # 检查HTTP状态码
if response.headers.get('content-type', '').startswith('application/json'):
return response.json()
return {"content": response.content}
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP request failed to {url}: {e.response.status_code} - {e.response.text}")
raise APIException(f"Request failed: {e.response.text}", e.response.status_code)
except requests.exceptions.RequestException as e:
logger.error(f"Network error during request to {url}: {str(e)}")
raise APIException(f"Network error: {str(e)}", 500)
def _ensure_directory(directory: str) -> str:
"""
确保目录存在
"""
expanded_dir = os.path.expanduser(directory)
os.makedirs(expanded_dir, exist_ok=True)
return expanded_dir
def _copy_file(source: str, destination: str) -> None:
"""
安全地复制文件
"""
try:
subprocess.run(["cp", source, destination], check=True)
logger.info(f"File copied from {source} to {destination}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to copy file: {str(e)}")
raise APIException(f"Failed to copy file: {str(e)}", 500)
except Exception as e:
logger.error(f"Unexpected error during file copy: {str(e)}")
raise APIException(f"Unexpected error during file copy: {str(e)}", 500)
def _convert_file_format(input_path: str, output_format: str) -> str:
"""
使用ffmpeg将文件转换为指定格式。
Args:
input_path: 输入文件路径
output_format: 目标格式,例如 "mp4""wav"
Returns:
转换后的新文件路径
Raises:
APIException: 转换失败时抛出异常
"""
input_filename = os.path.basename(input_path)
output_filename = f"{os.path.splitext(input_filename)[0]}.{output_format}"
output_path = os.path.join(tempfile.gettempdir(), output_filename)
logger.info(f"Converting {input_path} to {output_format} format...")
try:
if output_format == "wav":
command = ["ffmpeg", "-y", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "16000", output_path]
elif output_format == "mp4":
command = ["ffmpeg", "-y", "-i", input_path, "-c", "copy", output_path]
else:
raise ValueError(f"Unsupported output format: {output_format}")
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"File successfully converted to: {output_path}")
return output_path
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg conversion failed: {e.stderr.decode('utf-8')}")
raise APIException(f"File conversion failed: {e.stderr.decode('utf-8')}", 500)
except Exception as e:
logger.error(f"Unexpected error during file conversion: {str(e)}")
raise APIException(f"Unexpected error during file conversion: {str(e)}", 500)
def _split_video_and_audio(video_path: str, audio_path: str) -> str:
"""
将视频分离为静音视频,并将音频保存到指定路径。
Args:
video_path: 输入视频路径
audio_path: 输出音频路径
Returns:
静音视频的临时文件路径
Raises:
APIException: 分离失败时抛出异常
"""
try:
# 提取音频
video_clip = VideoFileClip(video_path)
video_clip.audio.write_audiofile(audio_path, verbose=False, logger=None)
# 创建静音视频
silent_video_path = tempfile.mktemp(suffix=".mp4")
video_clip.write_videofile(silent_video_path, audio=False, verbose=False, logger=None)
video_clip.close()
logger.info(f"Video separated. Audio saved to {audio_path}, silent video to {silent_video_path}")
return silent_video_path
except Exception as e:
logger.error(f"Failed to split video and audio: {str(e)}")
raise APIException(f"Failed to split video and audio: {str(e)}", 500)
def _preprocess_voice(voice_file_name: str) -> Dict[str, Any]:
"""
调用语音服务进行预处理和训练
"""
logger.info(f"Preprocessing voice model with file: {voice_file_name}")
url = f"{Config.VOICE_SERVICE_URL}/v1/preprocess_and_tran"
request_body = {
"format": "wav",
"reference_audio": voice_file_name,
"lang": "zh"
}
return _make_request(url, request_body)
def _synthesize_voice(text: str, uuid: str, preprocess_result: Dict[str, Any]) -> str:
"""
调用语音服务进行语音合成
"""
logger.info(f"Synthesizing voice for UUID: {uuid}")
url = f"{Config.VOICE_SERVICE_URL}/v1/invoke"
request_body = {
"speaker": uuid,
"text": text,
"reference_audio": preprocess_result.get('asr_format_audio_url'),
"reference_text": preprocess_result.get('reference_audio_text'),
**Config.DEFAULT_VOICE_PARAMS
}
response = requests.post(url, json=request_body, timeout=60)
response.raise_for_status()
output_filename = f"{uuid}output.wav"
output_path = os.path.join(Config.TEMP_DIR, output_filename)
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"Generated voice saved to: {output_path}")
return output_path
def _submit_video_generation(audio_file_name: str, video_file_name: str, task_id: str) -> Dict[str, Any]:
"""
调用视频服务提交视频合成任务
"""
logger.info(f"Submitting video generation for task: {task_id}")
url = f"{Config.VIDEO_SERVICE_URL}/easy/submit"
request_body = {
"audio_url": audio_file_name,
"video_url": video_file_name,
"code": task_id,
"chaofen": 0,
"watermark_switch": 0,
"pn": 1
}
return _make_request(url, request_body)
def generate_digital_human(speech_text: str, sample_video: str, sample_voice: str, uuid: str) -> Dict[str, Any]:
"""
生成数字人视频的主函数。
Args:
speech_text: 语音文本
sample_video: 样本视频文件名
sample_voice: 样本语音文件名
uuid: 唯一标识符
Returns:
生成结果包含任务ID
Raises:
APIException: 生成失败时抛出异常
"""
logger.info(f"Starting digital human generation for UUID: {uuid}")
# 步骤1: 确保音频文件位于正确的目录并转换为wav格式
try:
_ensure_directory(Config.VOICE_DATA_DIR)
voice_source_path = os.path.join(Config.RESOURCE_DIR, 'uploads', "audio", sample_voice)
# 转换为.wav格式
converted_voice_path = _convert_file_format(voice_source_path, "wav")
converted_voice_filename = os.path.basename(converted_voice_path)
# 复制转换后的文件到工作目录
voice_dest_path = os.path.join(Config.VOICE_DATA_DIR, converted_voice_filename)
_copy_file(converted_voice_path, voice_dest_path)
voice_preprocess_result = _preprocess_voice(converted_voice_filename)
# 步骤2: 生成新的音频
generated_audio_path = _synthesize_voice(speech_text, uuid, voice_preprocess_result)
# 步骤3: 确保视频文件位于正确的目录并转换为mp4格式
_ensure_directory(Config.FACE2FACE_TEMP_DIR)
video_source_path = os.path.join(Config.RESOURCE_DIR, "uploads", "video", sample_video)
# 转换为.mp4格式
converted_video_path = _convert_file_format(video_source_path, "mp4")
converted_video_filename = os.path.basename(converted_video_path)
# 复制转换后的文件到工作目录
video_dest_path = os.path.join(Config.FACE2FACE_TEMP_DIR, converted_video_filename)
_copy_file(converted_video_path, video_dest_path)
# 步骤4: 复制生成的音频到视频服务的工作目录
generated_audio_filename = os.path.basename(generated_audio_path)
video_service_audio_path = os.path.join(Config.FACE2FACE_TEMP_DIR, generated_audio_filename)
_copy_file(generated_audio_path, video_service_audio_path)
# 步骤5: 提交视频合成任务
task_code = f"{uuid}_video_{int(time.time())}" # 生成一个独立的任务ID
result = _submit_video_generation(generated_audio_filename, converted_video_filename, task_code)
logger.info(f"Digital human generation submitted successfully with task code: {task_code}")
return result
except APIException:
raise
except Exception as e:
logger.error(f"Failed to generate digital human: {str(e)}")
raise APIException(f"Failed to generate digital human: {str(e)}", 500)
def get_video_generate_process(task_id: str) -> Dict[str, Any]:
"""
获取视频生成进度
"""
logger.info(f"Checking video generation progress for task: {task_id}")
url = f"{Config.VIDEO_SERVICE_URL}/easy/query"
return _make_request(url, {"code": task_id}, method="GET")
def download_generated_video(uuid: str, task_id: Optional[str] = None) -> Any:
"""
下载生成的视频文件
"""
try:
# 尝试不同的可能路径和命名模式
possible_paths = [
# 兼容旧路径
os.path.join(Config.RESOURCE_DIR, f"{uuid}output.mp4"),
os.path.join(Config.TEMP_DIR, f"{uuid}output.mp4"),
# 新路径模式
os.path.join(Config.FACE2FACE_TEMP_DIR, f"{task_id}_result.mp4") if task_id else None,
os.path.join(Config.FACE2FACE_TEMP_DIR, f"{uuid}_video_*_result.mp4")
]
file_path = None
for path in possible_paths:
if not path: continue
expanded_path = os.path.expanduser(path)
if '*' in expanded_path:
# 处理通配符
for p in Path(os.path.dirname(expanded_path)).glob(os.path.basename(expanded_path)):
file_path = str(p)
break
if os.path.exists(expanded_path):
file_path = expanded_path
if file_path:
break
if not file_path:
raise APIException(f"Generated video not found for UUID: {uuid}", 404)
filename = os.path.basename(file_path)
logger.info(f"Downloading generated video: {filename}")
return send_file(
file_path,
as_attachment=True,
download_name=filename,
mimetype='video/mp4'
)
except APIException:
raise
except Exception as e:
logger.error(f"Error downloading generated video for UUID {uuid}: {str(e)}")
raise APIException(f"Error downloading generated video: {str(e)}", 500)
# ... (其他保持不变的函数,如 download_audio, list_available_files 等)
def list_available_files(directory: str = None, file_type: str = "all") -> Dict[str, Any]:
"""
列出系统中可用的文件
Args:
directory: 目录路径,默认使用配置中的资源目录
file_type: 文件类型过滤器 ("video", "audio", "all")
Returns:
包含文件信息的字典
Raises:
APIException: 列举失败时抛出异常
"""
if directory is None:
directory = Config.RESOURCE_DIR
try:
if not os.path.exists(directory):
raise APIException(f"Directory '{directory}' not found", 404)
files_info = []
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
file_ext = os.path.splitext(filename)[1].lower()
is_video = file_ext in Config.VIDEO_EXTENSIONS
is_audio = file_ext in Config.AUDIO_EXTENSIONS
# 根据文件类型过滤
if file_type == "video" and not is_video:
continue
elif file_type == "audio" and not is_audio:
continue
elif file_type == "all" and not (is_video or is_audio):
continue
# 获取文件信息
file_stats = os.stat(file_path)
file_info = {
"filename": filename,
"size": file_stats.st_size,
"type": "video" if is_video else "audio" if is_audio else "other",
"modified": file_stats.st_mtime,
"download_url_video": f"/download/video/{filename}" if is_video else None,
"download_url_audio": f"/download/audio/{filename}" if is_audio else None
}
files_info.append(file_info)
logger.info(f"Listed {len(files_info)} files from {directory}")
return {
"directory": directory,
"total_files": len(files_info),
"files": files_info
}
except APIException:
raise
except Exception as e:
logger.error(f"Error listing files in {directory}: {str(e)}")
raise APIException(f"Error listing files: {str(e)}", 500)
def find_generated_files(uuid: str) -> List[Dict[str, Any]]:
"""
查找指定UUID的生成文件
Args:
uuid: 唯一标识符
Returns:
生成文件信息列表
"""
generated_files = []
# 检查生成的音频文件
audio_paths = [
f"{Config.RESOURCE_DIR}/{uuid}output.wav",
f"{Config.TEMP_DIR}/{uuid}output.wav",
f"{Config.VOICE_DATA_DIR}/{uuid}output.wav"
]
for path in audio_paths:
expanded_path = os.path.expanduser(path)
if os.path.exists(expanded_path):
file_stats = os.stat(expanded_path)
generated_files.append({
"type": "audio",
"filename": os.path.basename(expanded_path),
"path": expanded_path,
"size": file_stats.st_size,
"modified": file_stats.st_mtime,
"download_url": f"/download/generated/audio/{uuid}"
})
break
# 检查生成的视频文件
video_paths = [
f"{Config.RESOURCE_DIR}/{uuid}_output.mp4",
f"{Config.RESOURCE_DIR}/{uuid}output.mp4",
f"{Config.TEMP_DIR}/{uuid}output.mp4",
f"{Config.FACE2FACE_TEMP_DIR}/{uuid}_result.mp4",
f"{Config.FACE2FACE_TEMP_DIR}/{uuid}result.mp4"
]
for path in video_paths:
expanded_path = os.path.expanduser(path)
if os.path.exists(expanded_path):
file_stats = os.stat(expanded_path)
generated_files.append({
"type": "video",
"filename": os.path.basename(expanded_path),
"path": expanded_path,
"size": file_stats.st_size,
"modified": file_stats.st_mtime,
"download_url": f"/download/generated/video/{uuid}"
})
break
return generated_files