Files
digital_human_backend/file_upload.py
2025-09-05 00:40:39 +08:00

280 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文件上传和管理模块
支持多种文件类型的上传、验证和存储
"""
import os
import uuid
from werkzeug.utils import secure_filename
from flask import request, jsonify
import mimetypes
from typing import Dict, List, Optional, Tuple
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class FileManager:
"""文件管理器 - 支持多目录文件保存"""
# 允许的文件扩展名
ALLOWED_EXTENSIONS = {
'video': {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'},
'audio': {'.wav', '.mp3', '.aac', '.flac', '.ogg', '.m4a'},
'image': {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'}
}
# 文件大小限制 (MB)
MAX_FILE_SIZES = {
'video': 500, # 500MB
'audio': 50, # 50MB
'image': 10 # 10MB
}
def __init__(self, upload_dir: str = "/mnt/docker/resource/uploads"):
self.upload_dir = upload_dir
# Docker容器挂载目录配置
self.docker_dirs = {
'voice_data': os.path.expanduser("~/heygem_data/voice/data"),
'face2face_data': os.path.expanduser("~/heygem_data/face2face/temp")
}
self._ensure_upload_directories()
def _ensure_upload_directories(self):
"""确保所有上传目录存在"""
# 确保本地上传目录存在
for file_type in ['video', 'audio', 'image', 'temp']:
dir_path = os.path.join(self.upload_dir, file_type)
os.makedirs(dir_path, exist_ok=True)
# 确保Docker挂载目录存在
for dir_path in self.docker_dirs.values():
os.makedirs(dir_path, exist_ok=True)
logger.info(f"Ensured Docker mount directory exists: {dir_path}")
def copy_audio_for_tts(self, filename: str, uuid: str) -> str:
"""为TTS服务复制音频文件"""
import shutil
source_path = os.path.join(self.upload_dir, 'audio', filename)
if not os.path.exists(source_path):
raise FileNotFoundError(f"Source audio file not found: {source_path}")
# 复制到TTS服务目录使用UUID命名
tts_filename = f"{uuid}.wav"
tts_dest = os.path.join(self.docker_dirs['voice_data'], tts_filename)
try:
shutil.copy2(source_path, tts_dest)
logger.info(f"Copied audio for TTS: {source_path} -> {tts_dest}")
return tts_filename
except Exception as e:
logger.error(f"Failed to copy audio for TTS: {e}")
raise
def copy_files_for_face2face(self, video_filename: str, audio_filename: str, uuid: str) -> Tuple[str, str]:
"""为Face2Face服务复制视频和音频文件"""
import shutil
# 复制视频文件
video_source = os.path.join(self.upload_dir, 'video', video_filename)
if not os.path.exists(video_source):
raise FileNotFoundError(f"Source video file not found: {video_source}")
video_ext = os.path.splitext(video_filename)[1]
face2face_video = f"{uuid}{video_ext}"
video_dest = os.path.join(self.docker_dirs['face2face_data'], face2face_video)
# 复制音频文件
audio_source = os.path.join(self.upload_dir, 'audio', audio_filename)
if not os.path.exists(audio_source):
raise FileNotFoundError(f"Source audio file not found: {audio_source}")
audio_ext = os.path.splitext(audio_filename)[1]
face2face_audio = f"{uuid}{audio_ext}"
audio_dest = os.path.join(self.docker_dirs['face2face_data'], face2face_audio)
try:
shutil.copy2(video_source, video_dest)
shutil.copy2(audio_source, audio_dest)
logger.info(f"Copied files for Face2Face: video={face2face_video}, audio={face2face_audio}")
return face2face_video, face2face_audio
except Exception as e:
logger.error(f"Failed to copy files for Face2Face: {e}")
raise
def copy_generated_file_to_resource(self, source_path: str, filename: str, file_type: str = 'output') -> str:
"""将生成的文件复制到资源目录"""
import shutil
# 确保资源目录存在
resource_dir = "/mnt/docker/resource"
os.makedirs(resource_dir, exist_ok=True)
dest_path = os.path.join(resource_dir, filename)
try:
shutil.copy2(source_path, dest_path)
logger.info(f"Copied generated file to resource: {source_path} -> {dest_path}")
return dest_path
except Exception as e:
logger.error(f"Failed to copy generated file: {e}")
raise
def validate_file(self, file, file_type: str) -> Tuple[bool, str]:
"""验证文件"""
if not file or not file.filename:
return False, "没有选择文件"
# 检查文件扩展名
filename = secure_filename(file.filename)
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in self.ALLOWED_EXTENSIONS.get(file_type, set()):
return False, f"不支持的{file_type}文件格式: {file_ext}"
# 检查文件大小
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0) # 重置文件指针
max_size = self.MAX_FILE_SIZES.get(file_type, 10) * 1024 * 1024 # 转换为字节
if file_size > max_size:
return False, f"文件大小超出限制 ({self.MAX_FILE_SIZES.get(file_type)}MB)"
return True, "文件验证通过"
def save_file(self, file, file_type: str, custom_filename: str = None) -> Dict[str, str]:
is_valid, message = self.validate_file(file, file_type)
if not is_valid:
raise ValueError(message)
# 确保目录存在
save_dir = os.path.join(self.upload_dir, file_type)
os.makedirs(save_dir, exist_ok=True)
# 生成文件名
if custom_filename:
filename = secure_filename(custom_filename)
else:
if not file.filename:
raise ValueError("上传的文件缺少文件名")
original_filename = secure_filename(file.filename)
file_ext = os.path.splitext(original_filename)[1]
filename = f"{uuid.uuid4().hex}{file_ext}"
# 拼接完整路径
file_path = os.path.join(save_dir, filename)
print(file_path)
# 保存文件
file.save(file_path)
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
logger.info(f"Saved {file_type} file to backup storage: {filename} ({file_size} bytes)")
return {
"filename": filename,
"file_path": file_path,
"file_type": file_type,
"file_size": file_size,
"mime_type": mime_type,
"relative_path": f"uploads/{file_type}/{filename}",
"download_url": f"/download/upload/{file_type}/{filename}"
}
def delete_file(self, file_path: str) -> bool:
"""删除文件"""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted file: {file_path}")
return True
return False
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {str(e)}")
return False
def get_file_info(self, file_type: str, filename: str) -> Optional[Dict[str, str]]:
"""获取文件信息"""
file_path = os.path.join(self.upload_dir, file_type, filename)
if not os.path.exists(file_path):
return None
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
return {
"filename": filename,
"file_path": file_path,
"file_type": file_type,
"file_size": file_size,
"mime_type": mime_type,
"relative_path": f"uploads/{file_type}/{filename}",
"download_url": f"/download/upload/{file_type}/{filename}"
}
def list_files(self, file_type: str = None) -> List[Dict[str, str]]:
"""列出文件"""
files = []
if file_type:
file_types = [file_type]
else:
file_types = ['video', 'audio', 'image']
for ft in file_types:
type_dir = os.path.join(self.upload_dir, ft)
if os.path.exists(type_dir):
for filename in os.listdir(type_dir):
file_info = self.get_file_info(ft, filename)
if file_info:
files.append(file_info)
return files
def cleanup_temp_files(self, older_than_hours: int = 24):
"""清理临时文件"""
import time
temp_dir = os.path.join(self.upload_dir, 'temp')
if not os.path.exists(temp_dir):
return
current_time = time.time()
cutoff_time = current_time - (older_than_hours * 3600)
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
if os.path.isfile(file_path):
file_mtime = os.path.getmtime(file_path)
if file_mtime < cutoff_time:
self.delete_file(file_path)
logger.info(f"Cleaned up temp file: {filename}")
# 全局文件管理器实例
file_manager = FileManager()
def save_uploaded_file(file, file_type: str, custom_filename: str = None) -> Dict[str, str]:
"""保存上传的文件"""
return file_manager.save_file(file, file_type, custom_filename)
def get_uploaded_file_info(file_type: str, filename: str) -> Optional[Dict[str, str]]:
"""获取上传文件信息"""
return file_manager.get_file_info(file_type, filename)
def list_uploaded_files(file_type: str = None) -> List[Dict[str, str]]:
"""列出上传的文件"""
return file_manager.list_files(file_type)
def validate_uploaded_file(file, file_type: str) -> Tuple[bool, str]:
"""验证上传的文件"""
return file_manager.validate_file(file, file_type)