487 lines
17 KiB
Python
487 lines
17 KiB
Python
"""
|
||
Digital Human API Module
|
||
处理语音生成、视频生成、数字人创建等功能的API模块
|
||
"""
|
||
|
||
import requests
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
from moviepy import ImageClip, VideoFileClip, TextClip, CompositeVideoClip
|
||
import tempfile
|
||
import os
|
||
import subprocess
|
||
import whisper
|
||
from flask import send_file, abort
|
||
import mimetypes
|
||
import logging
|
||
from typing import Optional, Dict, Any, Tuple, List
|
||
import json
|
||
from pathlib import Path
|
||
import time
|
||
# 假设此模块用于处理文件上传和管理
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 配置常量
|
||
class Config:
|
||
"""服务和文件路径配置"""
|
||
# 服务端口配置
|
||
VOICE_SERVICE_URL = "http://127.0.0.1:18180"
|
||
VIDEO_SERVICE_URL = "http://127.0.0.1:8383"
|
||
|
||
# 文件路径配置 (使用os.path.expanduser处理~符号)
|
||
RESOURCE_DIR = os.path.expanduser("/mnt/docker/resource")
|
||
TEMP_DIR = os.path.expanduser("/mnt/docker/code/data/temp")
|
||
VOICE_DATA_DIR = os.path.expanduser("~/heygem_data/voice/data")
|
||
FACE2FACE_TEMP_DIR = os.path.expanduser("~/heygem_data/face2face/temp")
|
||
|
||
# 支持的文件格式
|
||
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm']
|
||
AUDIO_EXTENSIONS = ['.wav', '.mp3', '.aac', '.flac', '.ogg', '.m4a', '.wma']
|
||
|
||
# 默认参数
|
||
DEFAULT_VOICE_PARAMS = {
|
||
"format": "wav",
|
||
"topP": 0.7,
|
||
"max_new_tokens": 1024,
|
||
"chunk_length": 100,
|
||
"repetition_penalty": 1.2,
|
||
"temperature": 0.7,
|
||
"need_asr": False,
|
||
"streaming": False,
|
||
"is_fixed_seed": 0,
|
||
"is_norm": 0,
|
||
}
|
||
|
||
class APIException(Exception):
|
||
"""自定义API异常类"""
|
||
def __init__(self, message: str, status_code: int = 500):
|
||
self.message = message
|
||
self.status_code = status_code
|
||
super().__init__(self.message)
|
||
|
||
def _make_request(url: str, data: Dict[str, Any], method: str = "POST") -> Dict[str, Any]:
|
||
"""
|
||
统一的HTTP请求处理函数
|
||
"""
|
||
try:
|
||
response = None
|
||
if method.upper() == "POST":
|
||
response = requests.post(url, json=data, timeout=6000000)
|
||
else:
|
||
response = requests.get(url, params=data, timeout=600000)
|
||
|
||
response.raise_for_status() # 检查HTTP状态码
|
||
|
||
if response.headers.get('content-type', '').startswith('application/json'):
|
||
return response.json()
|
||
return {"content": response.content}
|
||
|
||
except requests.exceptions.HTTPError as e:
|
||
logger.error(f"HTTP request failed to {url}: {e.response.status_code} - {e.response.text}")
|
||
raise APIException(f"Request failed: {e.response.text}", e.response.status_code)
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Network error during request to {url}: {str(e)}")
|
||
raise APIException(f"Network error: {str(e)}", 500)
|
||
|
||
|
||
def _ensure_directory(directory: str) -> str:
|
||
"""
|
||
确保目录存在
|
||
"""
|
||
expanded_dir = os.path.expanduser(directory)
|
||
os.makedirs(expanded_dir, exist_ok=True)
|
||
return expanded_dir
|
||
|
||
|
||
def _copy_file(source: str, destination: str) -> None:
|
||
"""
|
||
安全地复制文件
|
||
"""
|
||
try:
|
||
subprocess.run(["cp", source, destination], check=True)
|
||
logger.info(f"File copied from {source} to {destination}")
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"Failed to copy file: {str(e)}")
|
||
raise APIException(f"Failed to copy file: {str(e)}", 500)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error during file copy: {str(e)}")
|
||
raise APIException(f"Unexpected error during file copy: {str(e)}", 500)
|
||
|
||
|
||
def _convert_file_format(input_path: str, output_format: str) -> str:
|
||
"""
|
||
使用ffmpeg将文件转换为指定格式。
|
||
|
||
Args:
|
||
input_path: 输入文件路径
|
||
output_format: 目标格式,例如 "mp4" 或 "wav"
|
||
|
||
Returns:
|
||
转换后的新文件路径
|
||
|
||
Raises:
|
||
APIException: 转换失败时抛出异常
|
||
"""
|
||
input_filename = os.path.basename(input_path)
|
||
output_filename = f"{os.path.splitext(input_filename)[0]}.{output_format}"
|
||
output_path = os.path.join(tempfile.gettempdir(), output_filename)
|
||
|
||
logger.info(f"Converting {input_path} to {output_format} format...")
|
||
|
||
try:
|
||
if output_format == "wav":
|
||
command = ["ffmpeg", "-y", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "16000", output_path]
|
||
elif output_format == "mp4":
|
||
command = ["ffmpeg", "-y", "-i", input_path, "-c", "copy", output_path]
|
||
else:
|
||
raise ValueError(f"Unsupported output format: {output_format}")
|
||
|
||
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info(f"File successfully converted to: {output_path}")
|
||
return output_path
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"FFmpeg conversion failed: {e.stderr.decode('utf-8')}")
|
||
raise APIException(f"File conversion failed: {e.stderr.decode('utf-8')}", 500)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error during file conversion: {str(e)}")
|
||
raise APIException(f"Unexpected error during file conversion: {str(e)}", 500)
|
||
|
||
|
||
def _split_video_and_audio(video_path: str, audio_path: str) -> str:
|
||
"""
|
||
将视频分离为静音视频,并将音频保存到指定路径。
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
audio_path: 输出音频路径
|
||
|
||
Returns:
|
||
静音视频的临时文件路径
|
||
|
||
Raises:
|
||
APIException: 分离失败时抛出异常
|
||
"""
|
||
try:
|
||
# 提取音频
|
||
video_clip = VideoFileClip(video_path)
|
||
video_clip.audio.write_audiofile(audio_path, verbose=False, logger=None)
|
||
|
||
# 创建静音视频
|
||
silent_video_path = tempfile.mktemp(suffix=".mp4")
|
||
video_clip.write_videofile(silent_video_path, audio=False, verbose=False, logger=None)
|
||
|
||
video_clip.close()
|
||
logger.info(f"Video separated. Audio saved to {audio_path}, silent video to {silent_video_path}")
|
||
return silent_video_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to split video and audio: {str(e)}")
|
||
raise APIException(f"Failed to split video and audio: {str(e)}", 500)
|
||
|
||
|
||
def _preprocess_voice(voice_file_name: str) -> Dict[str, Any]:
|
||
"""
|
||
调用语音服务进行预处理和训练
|
||
"""
|
||
logger.info(f"Preprocessing voice model with file: {voice_file_name}")
|
||
url = f"{Config.VOICE_SERVICE_URL}/v1/preprocess_and_tran"
|
||
request_body = {
|
||
"format": "wav",
|
||
"reference_audio": voice_file_name,
|
||
"lang": "zh"
|
||
}
|
||
return _make_request(url, request_body)
|
||
|
||
|
||
def _synthesize_voice(text: str, uuid: str, preprocess_result: Dict[str, Any]) -> str:
|
||
"""
|
||
调用语音服务进行语音合成
|
||
"""
|
||
logger.info(f"Synthesizing voice for UUID: {uuid}")
|
||
url = f"{Config.VOICE_SERVICE_URL}/v1/invoke"
|
||
request_body = {
|
||
"speaker": uuid,
|
||
"text": text,
|
||
"reference_audio": preprocess_result.get('asr_format_audio_url'),
|
||
"reference_text": preprocess_result.get('reference_audio_text'),
|
||
**Config.DEFAULT_VOICE_PARAMS
|
||
}
|
||
|
||
response = requests.post(url, json=request_body, timeout=60)
|
||
response.raise_for_status()
|
||
|
||
output_filename = f"{uuid}output.wav"
|
||
output_path = os.path.join(Config.TEMP_DIR, output_filename)
|
||
with open(output_path, "wb") as f:
|
||
f.write(response.content)
|
||
logger.info(f"Generated voice saved to: {output_path}")
|
||
return output_path
|
||
|
||
|
||
def _submit_video_generation(audio_file_name: str, video_file_name: str, task_id: str) -> Dict[str, Any]:
|
||
"""
|
||
调用视频服务提交视频合成任务
|
||
"""
|
||
logger.info(f"Submitting video generation for task: {task_id}")
|
||
url = f"{Config.VIDEO_SERVICE_URL}/easy/submit"
|
||
request_body = {
|
||
"audio_url": audio_file_name,
|
||
"video_url": video_file_name,
|
||
"code": task_id,
|
||
"chaofen": 0,
|
||
"watermark_switch": 0,
|
||
"pn": 1
|
||
}
|
||
return _make_request(url, request_body)
|
||
|
||
|
||
def generate_digital_human(speech_text: str, sample_video: str, sample_voice: str, uuid: str) -> Dict[str, Any]:
|
||
"""
|
||
生成数字人视频的主函数。
|
||
|
||
Args:
|
||
speech_text: 语音文本
|
||
sample_video: 样本视频文件名
|
||
sample_voice: 样本语音文件名
|
||
uuid: 唯一标识符
|
||
|
||
Returns:
|
||
生成结果,包含任务ID
|
||
|
||
Raises:
|
||
APIException: 生成失败时抛出异常
|
||
"""
|
||
logger.info(f"Starting digital human generation for UUID: {uuid}")
|
||
|
||
# 步骤1: 确保音频文件位于正确的目录并转换为wav格式
|
||
try:
|
||
_ensure_directory(Config.VOICE_DATA_DIR)
|
||
voice_source_path = os.path.join(Config.RESOURCE_DIR, 'uploads', "audio", sample_voice)
|
||
|
||
# 转换为.wav格式
|
||
converted_voice_path = _convert_file_format(voice_source_path, "wav")
|
||
converted_voice_filename = os.path.basename(converted_voice_path)
|
||
|
||
# 复制转换后的文件到工作目录
|
||
voice_dest_path = os.path.join(Config.VOICE_DATA_DIR, converted_voice_filename)
|
||
_copy_file(converted_voice_path, voice_dest_path)
|
||
|
||
voice_preprocess_result = _preprocess_voice(converted_voice_filename)
|
||
|
||
# 步骤2: 生成新的音频
|
||
generated_audio_path = _synthesize_voice(speech_text, uuid, voice_preprocess_result)
|
||
|
||
# 步骤3: 确保视频文件位于正确的目录并转换为mp4格式
|
||
_ensure_directory(Config.FACE2FACE_TEMP_DIR)
|
||
video_source_path = os.path.join(Config.RESOURCE_DIR, "uploads", "video", sample_video)
|
||
|
||
# 转换为.mp4格式
|
||
converted_video_path = _convert_file_format(video_source_path, "mp4")
|
||
converted_video_filename = os.path.basename(converted_video_path)
|
||
|
||
# 复制转换后的文件到工作目录
|
||
video_dest_path = os.path.join(Config.FACE2FACE_TEMP_DIR, converted_video_filename)
|
||
_copy_file(converted_video_path, video_dest_path)
|
||
|
||
# 步骤4: 复制生成的音频到视频服务的工作目录
|
||
generated_audio_filename = os.path.basename(generated_audio_path)
|
||
video_service_audio_path = os.path.join(Config.FACE2FACE_TEMP_DIR, generated_audio_filename)
|
||
_copy_file(generated_audio_path, video_service_audio_path)
|
||
|
||
# 步骤5: 提交视频合成任务
|
||
task_code = f"{uuid}_video_{int(time.time())}" # 生成一个独立的任务ID
|
||
result = _submit_video_generation(generated_audio_filename, converted_video_filename, task_code)
|
||
|
||
logger.info(f"Digital human generation submitted successfully with task code: {task_code}")
|
||
return result
|
||
|
||
except APIException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate digital human: {str(e)}")
|
||
raise APIException(f"Failed to generate digital human: {str(e)}", 500)
|
||
|
||
|
||
def get_video_generate_process(task_id: str) -> Dict[str, Any]:
|
||
"""
|
||
获取视频生成进度
|
||
"""
|
||
logger.info(f"Checking video generation progress for task: {task_id}")
|
||
url = f"{Config.VIDEO_SERVICE_URL}/easy/query"
|
||
return _make_request(url, {"code": task_id}, method="GET")
|
||
|
||
def download_generated_video(uuid: str, task_id: Optional[str] = None) -> Any:
|
||
"""
|
||
下载生成的视频文件
|
||
"""
|
||
try:
|
||
# 尝试不同的可能路径和命名模式
|
||
possible_paths = [
|
||
# 兼容旧路径
|
||
os.path.join(Config.RESOURCE_DIR, f"{uuid}output.mp4"),
|
||
os.path.join(Config.TEMP_DIR, f"{uuid}output.mp4"),
|
||
# 新路径模式
|
||
os.path.join(Config.FACE2FACE_TEMP_DIR, f"{task_id}_result.mp4") if task_id else None,
|
||
os.path.join(Config.FACE2FACE_TEMP_DIR, f"{uuid}_video_*_result.mp4")
|
||
]
|
||
|
||
file_path = None
|
||
for path in possible_paths:
|
||
if not path: continue
|
||
expanded_path = os.path.expanduser(path)
|
||
if '*' in expanded_path:
|
||
# 处理通配符
|
||
for p in Path(os.path.dirname(expanded_path)).glob(os.path.basename(expanded_path)):
|
||
file_path = str(p)
|
||
break
|
||
if os.path.exists(expanded_path):
|
||
file_path = expanded_path
|
||
if file_path:
|
||
break
|
||
|
||
if not file_path:
|
||
raise APIException(f"Generated video not found for UUID: {uuid}", 404)
|
||
|
||
filename = os.path.basename(file_path)
|
||
logger.info(f"Downloading generated video: {filename}")
|
||
|
||
return send_file(
|
||
file_path,
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
mimetype='video/mp4'
|
||
)
|
||
|
||
except APIException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Error downloading generated video for UUID {uuid}: {str(e)}")
|
||
raise APIException(f"Error downloading generated video: {str(e)}", 500)
|
||
|
||
# ... (其他保持不变的函数,如 download_audio, list_available_files 等)
|
||
|
||
def list_available_files(directory: str = None, file_type: str = "all") -> Dict[str, Any]:
|
||
"""
|
||
列出系统中可用的文件
|
||
|
||
Args:
|
||
directory: 目录路径,默认使用配置中的资源目录
|
||
file_type: 文件类型过滤器 ("video", "audio", "all")
|
||
|
||
Returns:
|
||
包含文件信息的字典
|
||
|
||
Raises:
|
||
APIException: 列举失败时抛出异常
|
||
"""
|
||
if directory is None:
|
||
directory = Config.RESOURCE_DIR
|
||
|
||
try:
|
||
if not os.path.exists(directory):
|
||
raise APIException(f"Directory '{directory}' not found", 404)
|
||
|
||
files_info = []
|
||
|
||
for filename in os.listdir(directory):
|
||
file_path = os.path.join(directory, filename)
|
||
|
||
if os.path.isfile(file_path):
|
||
file_ext = os.path.splitext(filename)[1].lower()
|
||
is_video = file_ext in Config.VIDEO_EXTENSIONS
|
||
is_audio = file_ext in Config.AUDIO_EXTENSIONS
|
||
|
||
# 根据文件类型过滤
|
||
if file_type == "video" and not is_video:
|
||
continue
|
||
elif file_type == "audio" and not is_audio:
|
||
continue
|
||
elif file_type == "all" and not (is_video or is_audio):
|
||
continue
|
||
|
||
# 获取文件信息
|
||
file_stats = os.stat(file_path)
|
||
file_info = {
|
||
"filename": filename,
|
||
"size": file_stats.st_size,
|
||
"type": "video" if is_video else "audio" if is_audio else "other",
|
||
"modified": file_stats.st_mtime,
|
||
"download_url_video": f"/download/video/{filename}" if is_video else None,
|
||
"download_url_audio": f"/download/audio/{filename}" if is_audio else None
|
||
}
|
||
files_info.append(file_info)
|
||
|
||
logger.info(f"Listed {len(files_info)} files from {directory}")
|
||
return {
|
||
"directory": directory,
|
||
"total_files": len(files_info),
|
||
"files": files_info
|
||
}
|
||
|
||
except APIException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Error listing files in {directory}: {str(e)}")
|
||
raise APIException(f"Error listing files: {str(e)}", 500)
|
||
|
||
|
||
def find_generated_files(uuid: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
查找指定UUID的生成文件
|
||
|
||
Args:
|
||
uuid: 唯一标识符
|
||
|
||
Returns:
|
||
生成文件信息列表
|
||
"""
|
||
generated_files = []
|
||
|
||
# 检查生成的音频文件
|
||
audio_paths = [
|
||
f"{Config.RESOURCE_DIR}/{uuid}output.wav",
|
||
f"{Config.TEMP_DIR}/{uuid}output.wav",
|
||
f"{Config.VOICE_DATA_DIR}/{uuid}output.wav"
|
||
]
|
||
|
||
for path in audio_paths:
|
||
expanded_path = os.path.expanduser(path)
|
||
if os.path.exists(expanded_path):
|
||
file_stats = os.stat(expanded_path)
|
||
generated_files.append({
|
||
"type": "audio",
|
||
"filename": os.path.basename(expanded_path),
|
||
"path": expanded_path,
|
||
"size": file_stats.st_size,
|
||
"modified": file_stats.st_mtime,
|
||
"download_url": f"/download/generated/audio/{uuid}"
|
||
})
|
||
break
|
||
|
||
# 检查生成的视频文件
|
||
video_paths = [
|
||
f"{Config.RESOURCE_DIR}/{uuid}_output.mp4",
|
||
f"{Config.RESOURCE_DIR}/{uuid}output.mp4",
|
||
f"{Config.TEMP_DIR}/{uuid}output.mp4",
|
||
f"{Config.FACE2FACE_TEMP_DIR}/{uuid}_result.mp4",
|
||
f"{Config.FACE2FACE_TEMP_DIR}/{uuid}result.mp4"
|
||
]
|
||
|
||
for path in video_paths:
|
||
expanded_path = os.path.expanduser(path)
|
||
if os.path.exists(expanded_path):
|
||
file_stats = os.stat(expanded_path)
|
||
generated_files.append({
|
||
"type": "video",
|
||
"filename": os.path.basename(expanded_path),
|
||
"path": expanded_path,
|
||
"size": file_stats.st_size,
|
||
"modified": file_stats.st_mtime,
|
||
"download_url": f"/download/generated/video/{uuid}"
|
||
})
|
||
break
|
||
|
||
return generated_files |