""" 数字人小程序API路由系统 提供完整的文件上传、异步任务处理、数字人生成等功能 """ from flask import Flask, request, jsonify, send_file import os import logging from typing import Dict, Any import json from datetime import datetime # 导入我们的模块 import api from async_video_api import task_manager, TaskType, TaskStatus from file_upload import file_manager, save_uploaded_file, get_uploaded_file_info, list_uploaded_files app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB 最大上传大小 logger = logging.getLogger(__name__) # ================================ # 文件上传相关API # ================================ @app.route('/api/upload/video', methods=['POST']) def upload_video(): """上传视频文件""" try: if 'file' not in request.files: return jsonify({"error": "没有文件"}), 400 file = request.files['file'] custom_name = request.form.get('custom_name') file_info = save_uploaded_file(file, 'video', custom_name) return jsonify({ "success": True, "message": "视频上传成功", "file_info": file_info }) except Exception as e: logger.error(f"Video upload failed: {str(e)}") return jsonify({"error": f"上传失败: {str(e)}"}), 500 @app.route('/api/upload/audio', methods=['POST']) def upload_audio(): """上传音频文件""" try: if 'file' not in request.files: return jsonify({"error": "没有文件"}), 400 file = request.files['file'] custom_name = request.form.get('custom_name') file_info = save_uploaded_file(file, 'audio', custom_name) return jsonify({ "success": True, "message": "音频上传成功", "file_info": file_info }) except Exception as e: logger.error(f"Audio upload failed: {str(e)}") return jsonify({"error": f"上传失败: {str(e)}"}), 500 @app.route('/api/upload/image', methods=['POST']) def upload_image(): """上传图片文件""" try: if 'file' not in request.files: return jsonify({"error": "没有文件"}), 400 file = request.files['file'] custom_name = request.form.get('custom_name') file_info = save_uploaded_file(file, 'image', custom_name) return jsonify({ "success": True, "message": "图片上传成功", "file_info": file_info }) except Exception as e: logger.error(f"Image upload failed: {str(e)}") return jsonify({"error": f"上传失败: {str(e)}"}), 500 @app.route('/api/files/list', methods=['GET']) def list_files(): """列出已上传的文件""" try: file_type = request.args.get('type') # video, audio, image 或 None (全部) files = list_uploaded_files(file_type) return jsonify({ "success": True, "files": files, "total": len(files) }) except Exception as e: logger.error(f"List files failed: {str(e)}") return jsonify({"error": f"获取文件列表失败: {str(e)}"}), 500 # ================================ # 异步任务相关API # ================================ @app.route('/api/tasks/voice/generate', methods=['POST']) def create_voice_generation_task(): """创建语音生成任务""" try: data = request.get_json() required_fields = ['text'] if not all(field in data for field in required_fields): return jsonify({"error": "缺少必要字段"}), 400 # 创建任务 task_id = task_manager.create_task( TaskType.VOICE_GENERATION, { 'text': data['text'], 'reference_audio': data.get('reference_audio'), 'reference_text': data.get('reference_text', ''), 'uuid': data.get('uuid') }, user_id=data.get('user_id') ) return jsonify({ "success": True, "task_id": task_id, "message": "语音生成任务已创建", "status_url": f"/api/tasks/{task_id}/status" }) except Exception as e: logger.error(f"Create voice generation task failed: {str(e)}") return jsonify({"error": f"创建任务失败: {str(e)}"}), 500 @app.route('/api/tasks/digital-human/create', methods=['POST']) def create_digital_human_task(): """创建数字人生成任务""" try: data = request.get_json() required_fields = ['speech_text', 'sample_video', 'sample_voice'] if not all(field in data for field in required_fields): return jsonify({"error": "缺少必要字段"}), 400 # 创建任务 task_id = task_manager.create_task( TaskType.DIGITAL_HUMAN_CREATION, { 'speech_text': data['speech_text'], 'sample_video': data['sample_video'], 'sample_voice': data['sample_voice'], 'uuid': data.get('uuid') }, user_id=data.get('user_id') ) return jsonify({ "success": True, "task_id": task_id, "message": "数字人生成任务已创建", "status_url": f"/api/tasks/{task_id}/status" }) except Exception as e: logger.error(f"Create digital human task failed: {str(e)}") return jsonify({"error": f"创建任务失败: {str(e)}"}), 500 @app.route('/api/tasks/template/create', methods=['POST']) def create_template_task(): """创建数字人模板任务""" try: data = request.get_json() required_fields = ['person_image', 'background_image'] if not all(field in data for field in required_fields): return jsonify({"error": "缺少必要字段"}), 400 # 创建任务 task_id = task_manager.create_task( TaskType.TEMPLATE_CREATION, { 'person_image': data['person_image'], 'background_image': data['background_image'], 'title_text': data.get('title_text', ''), 'title_position': data.get('title_position', [50, 50]), 'title_font_size': data.get('title_font_size', 48), 'video_length': data.get('video_length', 10.0) }, user_id=data.get('user_id') ) return jsonify({ "success": True, "task_id": task_id, "message": "数字人模板创建任务已创建", "status_url": f"/api/tasks/{task_id}/status" }) except Exception as e: logger.error(f"Create template task failed: {str(e)}") return jsonify({"error": f"创建任务失败: {str(e)}"}), 500 @app.route('/api/tasks/video/compose', methods=['POST']) def create_video_composition_task(): """创建视频合成任务""" try: data = request.get_json() required_fields = ['template_id', 'audio_file'] if not all(field in data for field in required_fields): return jsonify({"error": "缺少必要字段"}), 400 # 创建任务 task_id = task_manager.create_task( TaskType.VIDEO_COMPOSITION, { 'template_id': data['template_id'], 'audio_file': data['audio_file'], 'text_content': data.get('text_content', ''), 'uuid': data.get('uuid') }, user_id=data.get('user_id') ) return jsonify({ "success": True, "task_id": task_id, "message": "视频合成任务已创建", "status_url": f"/api/tasks/{task_id}/status" }) except Exception as e: logger.error(f"Create video composition task failed: {str(e)}") return jsonify({"error": f"创建任务失败: {str(e)}"}), 500 @app.route('/api/task/process', methods=['POST']) def process_task(): """处理指定任务""" try: # 支持 form-data 和 application/json 两种方式 if request.is_json: task_id = request.json.get('task_id') else: task_id = request.form.get('task_id') if not task_id: return jsonify({"error": "缺少任务ID"}), 400 # 调用任务处理函数 task_manager._process_task(task_id) return jsonify({ "success": True, "message": f"任务 {task_id} 正在处理或已完成" }) except Exception as e: logger.error(f"Task processing failed: {str(e)}") return jsonify({"error": f"任务处理失败: {str(e)}"}), 500 @app.route('/api/tasks/audio/extract', methods=['POST']) def create_audio_extraction_task(): """创建音频提取任务""" try: data = request.get_json() required_fields = ['video_file'] if not all(field in data for field in required_fields): return jsonify({"error": "缺少必要字段"}), 400 # 创建任务 task_id = task_manager.create_task( TaskType.AUDIO_EXTRACTION, { 'video_file': data['video_file'] }, user_id=data.get('user_id') ) return jsonify({ "success": True, "task_id": task_id, "message": "音频提取任务已创建", "status_url": f"/api/tasks/{task_id}/status" }) except Exception as e: logger.error(f"Create audio extraction task failed: {str(e)}") return jsonify({"error": f"创建任务失败: {str(e)}"}), 500 @app.route('/api/tasks//status', methods=['GET']) def get_task_status(task_id): """获取任务状态""" try: task = task_manager.get_task(task_id) if not task: return jsonify({"error": "任务不存在"}), 404 return jsonify({ "success": True, "task_id": task_id, "status": task.status.value, "progress": task.progress, "result": task.result, "error_message": task.error_message, "created_at": task.created_at.isoformat() if task.created_at else None, "updated_at": task.updated_at.isoformat() if task.updated_at else None }) except Exception as e: logger.error(f"Get task status failed: {str(e)}") return jsonify({"error": f"获取任务状态失败: {str(e)}"}), 500 @app.route('/api/tasks//cancel', methods=['POST']) def cancel_task(task_id): """取消任务""" try: task = task_manager.get_task(task_id) if not task: return jsonify({"error": "任务不存在"}), 404 if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]: return jsonify({"error": "任务已完成或已取消"}), 400 task_manager.update_task_status(task_id, TaskStatus.CANCELLED) return jsonify({ "success": True, "message": "任务已取消" }) except Exception as e: logger.error(f"Cancel task failed: {str(e)}") return jsonify({"error": f"取消任务失败: {str(e)}"}), 500 # ================================ # 文件下载相关API # ================================ @app.route('/api/download/upload//', methods=['GET']) def download_uploaded_file(file_type, filename): """下载上传的文件""" try: file_info = get_uploaded_file_info(file_type, filename) if not file_info: return jsonify({"error": "文件不存在"}), 404 return send_file( file_info['file_path'], as_attachment=True, download_name=filename, mimetype=file_info.get('mime_type') ) except Exception as e: logger.error(f"Download uploaded file failed: {str(e)}") return jsonify({"error": f"下载文件失败: {str(e)}"}), 500 @app.route('/api/download/generated/video/', methods=['GET']) def download_generated_video(uuid): """下载生成的视频""" try: return api.download_generated_video(uuid) except Exception as e: logger.error(f"Download generated video failed: {str(e)}") return jsonify({"error": f"下载生成视频失败: {str(e)}"}), 500 @app.route('/api/download/generated/audio/', methods=['GET']) def download_generated_audio(uuid): """下载生成的音频""" try: return api.download_generated_audio(uuid) except Exception as e: logger.error(f"Download generated audio failed: {str(e)}") return jsonify({"error": f"下载生成音频失败: {str(e)}"}), 500 @app.route('/api/download/template/', methods=['GET']) def download_template(template_id): """下载模板文件""" try: template_info = task_manager._get_template_info(template_id) if not template_info: return jsonify({"error": "模板不存在"}), 404 template_path = template_info.get('template_path') if not template_path or not os.path.exists(template_path): return jsonify({"error": "模板文件不存在"}), 404 return send_file( template_path, as_attachment=True, download_name=f"template_{template_id}.mp4", mimetype='video/mp4' ) except Exception as e: logger.error(f"Download template failed: {str(e)}") return jsonify({"error": f"下载模板失败: {str(e)}"}), 500 # ================================ # 模板管理相关API # ================================ @app.route('/api/templates/list', methods=['GET']) def list_templates(): """列出所有模板""" try: templates_dir = "/mnt/docker/resource/templates" if not os.path.exists(templates_dir): return jsonify({"success": True, "templates": [], "total": 0}) templates = [] for filename in os.listdir(templates_dir): if filename.endswith('.json'): template_id = filename[:-5] # 移除.json扩展名 template_info = task_manager._get_template_info(template_id) if template_info: template_info['template_id'] = template_id template_info['download_url'] = f"/api/download/template/{template_id}" templates.append(template_info) return jsonify({ "success": True, "templates": templates, "total": len(templates) }) except Exception as e: logger.error(f"List templates failed: {str(e)}") return jsonify({"error": f"获取模板列表失败: {str(e)}"}), 500 @app.route('/api/templates/', methods=['GET']) def get_template_info(template_id): """获取模板信息""" try: template_info = task_manager._get_template_info(template_id) if not template_info: return jsonify({"error": "模板不存在"}), 404 template_info['template_id'] = template_id template_info['download_url'] = f"/api/download/template/{template_id}" return jsonify({ "success": True, "template": template_info }) except Exception as e: logger.error(f"Get template info failed: {str(e)}") return jsonify({"error": f"获取模板信息失败: {str(e)}"}), 500 # ================================ # 系统状态相关API # ================================ @app.route('/api/system/status', methods=['GET']) def get_system_status(): """获取系统状态""" try: # 统计任务状态 task_stats = {} for task in task_manager.tasks.values(): status = task.status.value task_stats[status] = task_stats.get(status, 0) + 1 # 统计文件数量 file_stats = {} for file_type in ['video', 'audio', 'image']: files = list_uploaded_files(file_type) file_stats[file_type] = len(files) return jsonify({ "success": True, "system_status": "running", "task_stats": task_stats, "file_stats": file_stats, "total_tasks": len(task_manager.tasks), "worker_running": task_manager.is_running }) except Exception as e: logger.error(f"Get system status failed: {str(e)}") return jsonify({"error": f"获取系统状态失败: {str(e)}"}), 500 # ================================ # 错误处理 # ================================ @app.errorhandler(413) def too_large(e): return jsonify({"error": "文件过大"}), 413 @app.errorhandler(404) def not_found(e): return jsonify({"error": "接口不存在"}), 404 @app.errorhandler(500) def internal_error(e): return jsonify({"error": "服务器内部错误"}), 500 if __name__ == '__main__': # 启动时清理旧的临时文件 file_manager.cleanup_temp_files() # 启动Flask应用 app.run(host='0.0.0.0', port=5000, debug=True)