This commit is contained in:
2025-09-05 00:40:39 +08:00
commit d092984a85
4 changed files with 1233 additions and 0 deletions

279
file_upload.py Normal file
View File

@@ -0,0 +1,279 @@
"""
文件上传和管理模块
支持多种文件类型的上传、验证和存储
"""
import os
import uuid
from werkzeug.utils import secure_filename
from flask import request, jsonify
import mimetypes
from typing import Dict, List, Optional, Tuple
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class FileManager:
"""文件管理器 - 支持多目录文件保存"""
# 允许的文件扩展名
ALLOWED_EXTENSIONS = {
'video': {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'},
'audio': {'.wav', '.mp3', '.aac', '.flac', '.ogg', '.m4a'},
'image': {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'}
}
# 文件大小限制 (MB)
MAX_FILE_SIZES = {
'video': 500, # 500MB
'audio': 50, # 50MB
'image': 10 # 10MB
}
def __init__(self, upload_dir: str = "/mnt/docker/resource/uploads"):
self.upload_dir = upload_dir
# Docker容器挂载目录配置
self.docker_dirs = {
'voice_data': os.path.expanduser("~/heygem_data/voice/data"),
'face2face_data': os.path.expanduser("~/heygem_data/face2face/temp")
}
self._ensure_upload_directories()
def _ensure_upload_directories(self):
"""确保所有上传目录存在"""
# 确保本地上传目录存在
for file_type in ['video', 'audio', 'image', 'temp']:
dir_path = os.path.join(self.upload_dir, file_type)
os.makedirs(dir_path, exist_ok=True)
# 确保Docker挂载目录存在
for dir_path in self.docker_dirs.values():
os.makedirs(dir_path, exist_ok=True)
logger.info(f"Ensured Docker mount directory exists: {dir_path}")
def copy_audio_for_tts(self, filename: str, uuid: str) -> str:
"""为TTS服务复制音频文件"""
import shutil
source_path = os.path.join(self.upload_dir, 'audio', filename)
if not os.path.exists(source_path):
raise FileNotFoundError(f"Source audio file not found: {source_path}")
# 复制到TTS服务目录使用UUID命名
tts_filename = f"{uuid}.wav"
tts_dest = os.path.join(self.docker_dirs['voice_data'], tts_filename)
try:
shutil.copy2(source_path, tts_dest)
logger.info(f"Copied audio for TTS: {source_path} -> {tts_dest}")
return tts_filename
except Exception as e:
logger.error(f"Failed to copy audio for TTS: {e}")
raise
def copy_files_for_face2face(self, video_filename: str, audio_filename: str, uuid: str) -> Tuple[str, str]:
"""为Face2Face服务复制视频和音频文件"""
import shutil
# 复制视频文件
video_source = os.path.join(self.upload_dir, 'video', video_filename)
if not os.path.exists(video_source):
raise FileNotFoundError(f"Source video file not found: {video_source}")
video_ext = os.path.splitext(video_filename)[1]
face2face_video = f"{uuid}{video_ext}"
video_dest = os.path.join(self.docker_dirs['face2face_data'], face2face_video)
# 复制音频文件
audio_source = os.path.join(self.upload_dir, 'audio', audio_filename)
if not os.path.exists(audio_source):
raise FileNotFoundError(f"Source audio file not found: {audio_source}")
audio_ext = os.path.splitext(audio_filename)[1]
face2face_audio = f"{uuid}{audio_ext}"
audio_dest = os.path.join(self.docker_dirs['face2face_data'], face2face_audio)
try:
shutil.copy2(video_source, video_dest)
shutil.copy2(audio_source, audio_dest)
logger.info(f"Copied files for Face2Face: video={face2face_video}, audio={face2face_audio}")
return face2face_video, face2face_audio
except Exception as e:
logger.error(f"Failed to copy files for Face2Face: {e}")
raise
def copy_generated_file_to_resource(self, source_path: str, filename: str, file_type: str = 'output') -> str:
"""将生成的文件复制到资源目录"""
import shutil
# 确保资源目录存在
resource_dir = "/mnt/docker/resource"
os.makedirs(resource_dir, exist_ok=True)
dest_path = os.path.join(resource_dir, filename)
try:
shutil.copy2(source_path, dest_path)
logger.info(f"Copied generated file to resource: {source_path} -> {dest_path}")
return dest_path
except Exception as e:
logger.error(f"Failed to copy generated file: {e}")
raise
def validate_file(self, file, file_type: str) -> Tuple[bool, str]:
"""验证文件"""
if not file or not file.filename:
return False, "没有选择文件"
# 检查文件扩展名
filename = secure_filename(file.filename)
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in self.ALLOWED_EXTENSIONS.get(file_type, set()):
return False, f"不支持的{file_type}文件格式: {file_ext}"
# 检查文件大小
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0) # 重置文件指针
max_size = self.MAX_FILE_SIZES.get(file_type, 10) * 1024 * 1024 # 转换为字节
if file_size > max_size:
return False, f"文件大小超出限制 ({self.MAX_FILE_SIZES.get(file_type)}MB)"
return True, "文件验证通过"
def save_file(self, file, file_type: str, custom_filename: str = None) -> Dict[str, str]:
is_valid, message = self.validate_file(file, file_type)
if not is_valid:
raise ValueError(message)
# 确保目录存在
save_dir = os.path.join(self.upload_dir, file_type)
os.makedirs(save_dir, exist_ok=True)
# 生成文件名
if custom_filename:
filename = secure_filename(custom_filename)
else:
if not file.filename:
raise ValueError("上传的文件缺少文件名")
original_filename = secure_filename(file.filename)
file_ext = os.path.splitext(original_filename)[1]
filename = f"{uuid.uuid4().hex}{file_ext}"
# 拼接完整路径
file_path = os.path.join(save_dir, filename)
print(file_path)
# 保存文件
file.save(file_path)
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
logger.info(f"Saved {file_type} file to backup storage: {filename} ({file_size} bytes)")
return {
"filename": filename,
"file_path": file_path,
"file_type": file_type,
"file_size": file_size,
"mime_type": mime_type,
"relative_path": f"uploads/{file_type}/{filename}",
"download_url": f"/download/upload/{file_type}/{filename}"
}
def delete_file(self, file_path: str) -> bool:
"""删除文件"""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted file: {file_path}")
return True
return False
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {str(e)}")
return False
def get_file_info(self, file_type: str, filename: str) -> Optional[Dict[str, str]]:
"""获取文件信息"""
file_path = os.path.join(self.upload_dir, file_type, filename)
if not os.path.exists(file_path):
return None
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
return {
"filename": filename,
"file_path": file_path,
"file_type": file_type,
"file_size": file_size,
"mime_type": mime_type,
"relative_path": f"uploads/{file_type}/{filename}",
"download_url": f"/download/upload/{file_type}/{filename}"
}
def list_files(self, file_type: str = None) -> List[Dict[str, str]]:
"""列出文件"""
files = []
if file_type:
file_types = [file_type]
else:
file_types = ['video', 'audio', 'image']
for ft in file_types:
type_dir = os.path.join(self.upload_dir, ft)
if os.path.exists(type_dir):
for filename in os.listdir(type_dir):
file_info = self.get_file_info(ft, filename)
if file_info:
files.append(file_info)
return files
def cleanup_temp_files(self, older_than_hours: int = 24):
"""清理临时文件"""
import time
temp_dir = os.path.join(self.upload_dir, 'temp')
if not os.path.exists(temp_dir):
return
current_time = time.time()
cutoff_time = current_time - (older_than_hours * 3600)
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
if os.path.isfile(file_path):
file_mtime = os.path.getmtime(file_path)
if file_mtime < cutoff_time:
self.delete_file(file_path)
logger.info(f"Cleaned up temp file: {filename}")
# 全局文件管理器实例
file_manager = FileManager()
def save_uploaded_file(file, file_type: str, custom_filename: str = None) -> Dict[str, str]:
"""保存上传的文件"""
return file_manager.save_file(file, file_type, custom_filename)
def get_uploaded_file_info(file_type: str, filename: str) -> Optional[Dict[str, str]]:
"""获取上传文件信息"""
return file_manager.get_file_info(file_type, filename)
def list_uploaded_files(file_type: str = None) -> List[Dict[str, str]]:
"""列出上传的文件"""
return file_manager.list_files(file_type)
def validate_uploaded_file(file, file_type: str) -> Tuple[bool, str]:
"""验证上传的文件"""
return file_manager.validate_file(file, file_type)