Files
digital_human_backend/function/miniprogram_api.py
2025-09-05 00:43:20 +08:00

524 lines
17 KiB
Python

"""
数字人小程序API路由系统
提供完整的文件上传、异步任务处理、数字人生成等功能
"""
from flask import Flask, request, jsonify, send_file
import os
import logging
from typing import Dict, Any
import json
from datetime import datetime
# 导入我们的模块
import api
from async_video_api import task_manager, TaskType, TaskStatus
from file_upload import file_manager, save_uploaded_file, get_uploaded_file_info, list_uploaded_files
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB 最大上传大小
logger = logging.getLogger(__name__)
# ================================
# 文件上传相关API
# ================================
@app.route('/api/upload/video', methods=['POST'])
def upload_video():
"""上传视频文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'video', custom_name)
return jsonify({
"success": True,
"message": "视频上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Video upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/upload/audio', methods=['POST'])
def upload_audio():
"""上传音频文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'audio', custom_name)
return jsonify({
"success": True,
"message": "音频上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Audio upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/upload/image', methods=['POST'])
def upload_image():
"""上传图片文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'image', custom_name)
return jsonify({
"success": True,
"message": "图片上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Image upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/files/list', methods=['GET'])
def list_files():
"""列出已上传的文件"""
try:
file_type = request.args.get('type') # video, audio, image 或 None (全部)
files = list_uploaded_files(file_type)
return jsonify({
"success": True,
"files": files,
"total": len(files)
})
except Exception as e:
logger.error(f"List files failed: {str(e)}")
return jsonify({"error": f"获取文件列表失败: {str(e)}"}), 500
# ================================
# 异步任务相关API
# ================================
@app.route('/api/tasks/voice/generate', methods=['POST'])
def create_voice_generation_task():
"""创建语音生成任务"""
try:
data = request.get_json()
required_fields = ['text']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.VOICE_GENERATION,
{
'text': data['text'],
'reference_audio': data.get('reference_audio'),
'reference_text': data.get('reference_text', ''),
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "语音生成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create voice generation task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/digital-human/create', methods=['POST'])
def create_digital_human_task():
"""创建数字人生成任务"""
try:
data = request.get_json()
required_fields = ['speech_text', 'sample_video', 'sample_voice']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.DIGITAL_HUMAN_CREATION,
{
'speech_text': data['speech_text'],
'sample_video': data['sample_video'],
'sample_voice': data['sample_voice'],
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "数字人生成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create digital human task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/template/create', methods=['POST'])
def create_template_task():
"""创建数字人模板任务"""
try:
data = request.get_json()
required_fields = ['person_image', 'background_image']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.TEMPLATE_CREATION,
{
'person_image': data['person_image'],
'background_image': data['background_image'],
'title_text': data.get('title_text', ''),
'title_position': data.get('title_position', [50, 50]),
'title_font_size': data.get('title_font_size', 48),
'video_length': data.get('video_length', 10.0)
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "数字人模板创建任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create template task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/video/compose', methods=['POST'])
def create_video_composition_task():
"""创建视频合成任务"""
try:
data = request.get_json()
required_fields = ['template_id', 'audio_file']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.VIDEO_COMPOSITION,
{
'template_id': data['template_id'],
'audio_file': data['audio_file'],
'text_content': data.get('text_content', ''),
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "视频合成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create video composition task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/task/process', methods=['POST'])
def process_task():
"""处理指定任务"""
try:
# 支持 form-data 和 application/json 两种方式
if request.is_json:
task_id = request.json.get('task_id')
else:
task_id = request.form.get('task_id')
if not task_id:
return jsonify({"error": "缺少任务ID"}), 400
# 调用任务处理函数
task_manager._process_task(task_id)
return jsonify({
"success": True,
"message": f"任务 {task_id} 正在处理或已完成"
})
except Exception as e:
logger.error(f"Task processing failed: {str(e)}")
return jsonify({"error": f"任务处理失败: {str(e)}"}), 500
@app.route('/api/tasks/audio/extract', methods=['POST'])
def create_audio_extraction_task():
"""创建音频提取任务"""
try:
data = request.get_json()
required_fields = ['video_file']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.AUDIO_EXTRACTION,
{
'video_file': data['video_file']
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "音频提取任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create audio extraction task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/<task_id>/status', methods=['GET'])
def get_task_status(task_id):
"""获取任务状态"""
try:
task = task_manager.get_task(task_id)
if not task:
return jsonify({"error": "任务不存在"}), 404
return jsonify({
"success": True,
"task_id": task_id,
"status": task.status.value,
"progress": task.progress,
"result": task.result,
"error_message": task.error_message,
"created_at": task.created_at.isoformat() if task.created_at else None,
"updated_at": task.updated_at.isoformat() if task.updated_at else None
})
except Exception as e:
logger.error(f"Get task status failed: {str(e)}")
return jsonify({"error": f"获取任务状态失败: {str(e)}"}), 500
@app.route('/api/tasks/<task_id>/cancel', methods=['POST'])
def cancel_task(task_id):
"""取消任务"""
try:
task = task_manager.get_task(task_id)
if not task:
return jsonify({"error": "任务不存在"}), 404
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
return jsonify({"error": "任务已完成或已取消"}), 400
task_manager.update_task_status(task_id, TaskStatus.CANCELLED)
return jsonify({
"success": True,
"message": "任务已取消"
})
except Exception as e:
logger.error(f"Cancel task failed: {str(e)}")
return jsonify({"error": f"取消任务失败: {str(e)}"}), 500
# ================================
# 文件下载相关API
# ================================
@app.route('/api/download/upload/<file_type>/<filename>', methods=['GET'])
def download_uploaded_file(file_type, filename):
"""下载上传的文件"""
try:
file_info = get_uploaded_file_info(file_type, filename)
if not file_info:
return jsonify({"error": "文件不存在"}), 404
return send_file(
file_info['file_path'],
as_attachment=True,
download_name=filename,
mimetype=file_info.get('mime_type')
)
except Exception as e:
logger.error(f"Download uploaded file failed: {str(e)}")
return jsonify({"error": f"下载文件失败: {str(e)}"}), 500
@app.route('/api/download/generated/video/<uuid>', methods=['GET'])
def download_generated_video(uuid):
"""下载生成的视频"""
try:
return api.download_generated_video(uuid)
except Exception as e:
logger.error(f"Download generated video failed: {str(e)}")
return jsonify({"error": f"下载生成视频失败: {str(e)}"}), 500
@app.route('/api/download/generated/audio/<uuid>', methods=['GET'])
def download_generated_audio(uuid):
"""下载生成的音频"""
try:
return api.download_generated_audio(uuid)
except Exception as e:
logger.error(f"Download generated audio failed: {str(e)}")
return jsonify({"error": f"下载生成音频失败: {str(e)}"}), 500
@app.route('/api/download/template/<template_id>', methods=['GET'])
def download_template(template_id):
"""下载模板文件"""
try:
template_info = task_manager._get_template_info(template_id)
if not template_info:
return jsonify({"error": "模板不存在"}), 404
template_path = template_info.get('template_path')
if not template_path or not os.path.exists(template_path):
return jsonify({"error": "模板文件不存在"}), 404
return send_file(
template_path,
as_attachment=True,
download_name=f"template_{template_id}.mp4",
mimetype='video/mp4'
)
except Exception as e:
logger.error(f"Download template failed: {str(e)}")
return jsonify({"error": f"下载模板失败: {str(e)}"}), 500
# ================================
# 模板管理相关API
# ================================
@app.route('/api/templates/list', methods=['GET'])
def list_templates():
"""列出所有模板"""
try:
templates_dir = "/mnt/docker/resource/templates"
if not os.path.exists(templates_dir):
return jsonify({"success": True, "templates": [], "total": 0})
templates = []
for filename in os.listdir(templates_dir):
if filename.endswith('.json'):
template_id = filename[:-5] # 移除.json扩展名
template_info = task_manager._get_template_info(template_id)
if template_info:
template_info['template_id'] = template_id
template_info['download_url'] = f"/api/download/template/{template_id}"
templates.append(template_info)
return jsonify({
"success": True,
"templates": templates,
"total": len(templates)
})
except Exception as e:
logger.error(f"List templates failed: {str(e)}")
return jsonify({"error": f"获取模板列表失败: {str(e)}"}), 500
@app.route('/api/templates/<template_id>', methods=['GET'])
def get_template_info(template_id):
"""获取模板信息"""
try:
template_info = task_manager._get_template_info(template_id)
if not template_info:
return jsonify({"error": "模板不存在"}), 404
template_info['template_id'] = template_id
template_info['download_url'] = f"/api/download/template/{template_id}"
return jsonify({
"success": True,
"template": template_info
})
except Exception as e:
logger.error(f"Get template info failed: {str(e)}")
return jsonify({"error": f"获取模板信息失败: {str(e)}"}), 500
# ================================
# 系统状态相关API
# ================================
@app.route('/api/system/status', methods=['GET'])
def get_system_status():
"""获取系统状态"""
try:
# 统计任务状态
task_stats = {}
for task in task_manager.tasks.values():
status = task.status.value
task_stats[status] = task_stats.get(status, 0) + 1
# 统计文件数量
file_stats = {}
for file_type in ['video', 'audio', 'image']:
files = list_uploaded_files(file_type)
file_stats[file_type] = len(files)
return jsonify({
"success": True,
"system_status": "running",
"task_stats": task_stats,
"file_stats": file_stats,
"total_tasks": len(task_manager.tasks),
"worker_running": task_manager.is_running
})
except Exception as e:
logger.error(f"Get system status failed: {str(e)}")
return jsonify({"error": f"获取系统状态失败: {str(e)}"}), 500
# ================================
# 错误处理
# ================================
@app.errorhandler(413)
def too_large(e):
return jsonify({"error": "文件过大"}), 413
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "接口不存在"}), 404
@app.errorhandler(500)
def internal_error(e):
return jsonify({"error": "服务器内部错误"}), 500
if __name__ == '__main__':
# 启动时清理旧的临时文件
file_manager.cleanup_temp_files()
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=True)