function script

This commit is contained in:
2025-09-05 00:43:20 +08:00
parent 96c7f5b347
commit 6c6b19e03e
5 changed files with 1864 additions and 0 deletions

View File

@@ -0,0 +1,528 @@
# 数字人小程序API文档
## 概述
这套API系统专为数字人小程序设计提供完整的文件上传、异步任务处理、数字人生成、模板管理等功能。
## 基础URL
```
http://aigc.yanhan.cn:5000/api
```
## 认证
目前系统支持可选的用户ID识别后续可扩展为完整的认证系统。
---
## 📁 文件上传API
### 1. 上传视频文件
**POST** `/upload/video`
**请求格式:** `multipart/form-data`
**参数:**
- `file` (file, 必需): 视频文件 (支持mp4, avi, mov等格式最大500MB)
- `custom_name` (string, 可选): 自定义文件名
**响应示例:**
```json
{
"success": true,
"message": "视频上传成功",
"file_info": {
"filename": "abc123.mp4",
"file_path": "/mnt/docker/resource/uploads/video/abc123.mp4",
"file_type": "video",
"file_size": 15728640,
"mime_type": "video/mp4",
"relative_path": "uploads/video/abc123.mp4",
"download_url": "/download/upload/video/abc123.mp4"
}
}
```
### 2. 上传音频文件
**POST** `/upload/audio`
**请求格式:** `multipart/form-data`
**参数:**
- `file` (file, 必需): 音频文件 (支持wav, mp3, aac等格式最大50MB)
- `custom_name` (string, 可选): 自定义文件名
### 3. 上传图片文件
**POST** `/upload/image`
**请求格式:** `multipart/form-data`
**参数:**
- `file` (file, 必需): 图片文件 (支持jpg, png, gif等格式最大10MB)
- `custom_name` (string, 可选): 自定义文件名
### 4. 列出已上传文件
**GET** `/files/list?type={file_type}`
**参数:**
- `type` (string, 可选): 文件类型过滤 (`video`, `audio`, `image`)
**响应示例:**
```json
{
"success": true,
"files": [
{
"filename": "sample.mp4",
"file_type": "video",
"file_size": 15728640,
"download_url": "/download/upload/video/sample.mp4"
}
],
"total": 1
}
```
---
## 🎯 异步任务API
### 1. 创建语音生成任务
**POST** `/tasks/voice/generate`
**请求体:**
```json
{
"text": "要转换的文本内容",
"reference_audio": "参考音频文件名(可选)",
"reference_text": "参考音频对应文本(可选)",
"uuid": "自定义UUID可选",
"user_id": "用户ID可选"
}
```
**响应示例:**
```json
{
"success": true,
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "语音生成任务已创建",
"status_url": "/api/tasks/550e8400-e29b-41d4-a716-446655440000/status"
}
```
### 2. 创建数字人生成任务
**POST** `/tasks/digital-human/create`
**请求体:**
```json
{
"speech_text": "数字人要说的文本",
"sample_video": "样本视频文件名",
"sample_voice": "样本音频文件名",
"uuid": "自定义UUID可选",
"user_id": "用户ID可选"
}
```
### 3. 创建数字人模板任务
**POST** `/tasks/template/create`
**请求体:**
```json
{
"person_image": "人物图片文件名",
"background_image": "背景图片文件名",
"title_text": "标题文字(可选)",
"title_position": [50, 50],
"title_font_size": 48,
"video_length": 10.0,
"user_id": "用户ID可选"
}
```
### 4. 创建视频合成任务
**POST** `/tasks/video/compose`
**请求体:**
```json
{
"template_id": "模板ID",
"audio_file": "音频文件名",
"text_content": "文本内容(可选)",
"uuid": "自定义UUID可选",
"user_id": "用户ID可选"
}
```
### 5. 创建音频提取任务
**POST** `/tasks/audio/extract`
**请求体:**
```json
{
"video_file": "视频文件名",
"user_id": "用户ID可选"
}
```
### 6. 查询任务状态
**GET** `/tasks/{task_id}/status`
**响应示例:**
```json
{
"success": true,
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"progress": 100,
"result": {
"audio_path": "/path/to/generated/audio.wav",
"audio_url": "/download/generated/audio/abc123",
"uuid": "abc123"
},
"error_message": null,
"created_at": "2025-08-04T10:30:00",
"updated_at": "2025-08-04T10:35:00"
}
```
**任务状态说明:**
- `pending`: 等待处理
- `processing`: 正在处理
- `completed`: 已完成
- `failed`: 处理失败
- `cancelled`: 已取消
### 7. 取消任务
**POST** `/tasks/{task_id}/cancel`
---
## 📥 文件下载API
### 1. 下载上传的文件
**GET** `/download/upload/{file_type}/{filename}`
### 2. 下载生成的视频
**GET** `/download/generated/video/{uuid}`
### 3. 下载生成的音频
**GET** `/download/generated/audio/{uuid}`
### 4. 下载模板文件
**GET** `/download/template/{template_id}`
---
## 🎨 模板管理API
### 1. 列出所有模板
**GET** `/templates/list`
**响应示例:**
```json
{
"success": true,
"templates": [
{
"template_id": "template123",
"title_text": "示例标题",
"person_image": "person.jpg",
"background_image": "bg.jpg",
"created_at": "2025-08-04T10:00:00",
"download_url": "/api/download/template/template123"
}
],
"total": 1
}
```
### 2. 获取模板信息
**GET** `/templates/{template_id}`
---
## 📊 系统状态API
### 获取系统状态
**GET** `/system/status`
**响应示例:**
```json
{
"success": true,
"system_status": "running",
"task_stats": {
"pending": 2,
"processing": 1,
"completed": 15,
"failed": 1
},
"file_stats": {
"video": 10,
"audio": 8,
"image": 12
},
"total_tasks": 19,
"worker_running": true
}
```
---
## 🔄 典型使用流程
### 流程1基础语音生成
1. **上传音频文件**(作为声音模板)
```bash
POST /api/upload/audio
```
2. **创建语音生成任务**
```bash
POST /api/tasks/voice/generate
{
"text": "你好,欢迎使用数字人系统",
"reference_audio": "uploaded_audio.wav"
}
```
3. **查询任务状态**
```bash
GET /api/tasks/{task_id}/status
```
4. **下载生成的音频**
```bash
GET /api/download/generated/audio/{uuid}
```
### 流程2数字人视频生成
1. **上传素材文件**
```bash
POST /api/upload/video # 上传样本视频
POST /api/upload/audio # 上传样本音频
```
2. **创建数字人生成任务**
```bash
POST /api/tasks/digital-human/create
{
"speech_text": "大家好,我是数字人小助手",
"sample_video": "sample_video.mp4",
"sample_voice": "sample_audio.wav"
}
```
进行任务
POST /api/task/process
{
"task_id": 任务ID
}
3. **监控任务进度**
```bash
GET /api/tasks/{task_id}/status
```
4. **下载生成的数字人视频**
```bash
GET /api/download/generated/video/{uuid}
```
### 流程3模板化视频制作
1. **上传图片素材**
```bash
POST /api/upload/image # 上传人物图片
POST /api/upload/image # 上传背景图片
```
2. **创建数字人模板**
```bash
POST /api/tasks/template/create
{
"person_image": "person.jpg",
"background_image": "background.jpg",
"title_text": "AI助手",
"title_position": [100, 50],
"video_length": 15.0
}
```
3. **使用模板合成视频**
```bash
POST /api/tasks/video/compose
{
"template_id": "template123",
"audio_file": "voice_content.wav"
}
```
---
## ⚠️ 错误处理
所有API都遵循统一的错误响应格式
```json
{
"error": "错误描述信息"
}
```
**常见HTTP状态码**
- `200`: 成功
- `400`: 请求参数错误
- `404`: 资源不存在
- `413`: 文件过大
- `500`: 服务器内部错误
---
## 📱 小程序集成建议
### 1. 文件上传组件
```javascript
// 小程序文件上传示例
wx.chooseMedia({
count: 1,
mediaType: ['video'],
success(res) {
const tempFilePath = res.tempFiles[0].tempFilePath;
wx.uploadFile({
url: 'https://your-api.com/api/upload/video',
filePath: tempFilePath,
name: 'file',
success(uploadRes) {
const data = JSON.parse(uploadRes.data);
console.log('上传成功', data.file_info);
}
});
}
});
```
### 2. 任务状态轮询
```javascript
// 任务状态查询
function checkTaskStatus(taskId) {
return new Promise((resolve, reject) => {
const timer = setInterval(() => {
wx.request({
url: `https://your-api.com/api/tasks/${taskId}/status`,
success(res) {
const { status, progress, result } = res.data;
if (status === 'completed') {
clearInterval(timer);
resolve(result);
} else if (status === 'failed') {
clearInterval(timer);
reject(new Error('任务失败'));
}
// 更新进度条
console.log(`任务进度: ${progress}%`);
}
});
}, 2000); // 每2秒查询一次
});
}
```
### 3. 完整的数字人生成流程
```javascript
// 完整流程示例
async function createDigitalHuman(speechText, videoFile, audioFile) {
try {
// 1. 上传文件
const videoInfo = await uploadFile(videoFile, 'video');
const audioInfo = await uploadFile(audioFile, 'audio');
// 2. 创建任务
const taskResponse = await wx.request({
url: 'https://your-api.com/api/tasks/digital-human/create',
method: 'POST',
data: {
speech_text: speechText,
sample_video: videoInfo.filename,
sample_voice: audioInfo.filename
}
});
// 3. 等待完成
const result = await checkTaskStatus(taskResponse.data.task_id);
// 4. 下载结果
const downloadUrl = result.video_url;
console.log('数字人视频生成完成:', downloadUrl);
return result;
} catch (error) {
console.error('生成失败:', error);
throw error;
}
}
```
---
## 🚀 部署说明
### 启动API服务
```bash
cd /mnt/docker/code
python miniprogram_api.py
```
服务将在 `http://aigc.yanhan.cn:5000` 启动
### 环境要求
- Python 3.8+
- Flask
- 相关依赖包详见requirements.txt
- 足够的磁盘空间存储上传文件和生成结果
### 配置文件
可在各模块中修改配置:
- 文件上传限制:`file_upload.py` 中的 `MAX_FILE_SIZES`
- 存储路径:各模块中的路径常量
- 服务端口API配置中的端口设置
这套API系统提供了完整的数字人生成能力支持异步处理适合小程序等前端应用集成使用。

View File

@@ -0,0 +1,234 @@
# Download API Documentation
This document describes the download APIs added to your Flask application for downloading video and audio files from the system.
## Available Endpoints
### 1. Download Video File
**Endpoint:** `GET /download/video/<filename>`
**Description:** Download a video file from the system
**Parameters:**
- `filename` (path parameter): The name of the video file to download
- `directory` (query parameter, optional): Custom directory path (default: `/mnt/docker/resource`)
**Example:**
```bash
curl -O "http://localhost:5000/download/video/scene_one.mp4"
curl -O "http://localhost:5000/download/video/scene_one.mp4?directory=/custom/path"
```
**Response:**
- Success: File download with appropriate MIME type
- Error: JSON with error message and HTTP status code
---
### 2. Download Audio File
**Endpoint:** `GET /download/audio/<filename>`
**Description:** Download an audio file from the system
**Parameters:**
- `filename` (path parameter): The name of the audio file to download
- `directory` (query parameter, optional): Custom directory path (default: `/mnt/docker/resource`)
**Example:**
```bash
curl -O "http://localhost:5000/download/audio/output.wav"
curl -O "http://localhost:5000/download/audio/output.wav?directory=/custom/path"
```
---
### 3. Download Generated Video
**Endpoint:** `GET /download/generated/video/<uuid>`
**Description:** Download a generated video file by UUID
**Parameters:**
- `uuid` (path parameter): The unique identifier for the generated video
- `task_id` (query parameter, optional): Alternative task identifier
**Example:**
```bash
curl -O "http://localhost:5000/download/generated/video/5"
curl -O "http://localhost:5000/download/generated/video/5?task_id=123"
```
---
### 4. Download Generated Audio
**Endpoint:** `GET /download/generated/audio/<uuid>`
**Description:** Download a generated audio file by UUID
**Parameters:**
- `uuid` (path parameter): The unique identifier for the generated audio
**Example:**
```bash
curl -O "http://localhost:5000/download/generated/audio/5"
```
---
### 5. List Available Files
**Endpoint:** `GET /list/files`
**Description:** List available files in the system for download
**Parameters:**
- `directory` (query parameter, optional): Directory to list files from (default: `/mnt/docker/resource`)
- `type` (query parameter, optional): Filter by file type - `video`, `audio`, or `all` (default: `all`)
**Example:**
```bash
curl "http://localhost:5000/list/files"
curl "http://localhost:5000/list/files?type=video"
curl "http://localhost:5000/list/files?type=audio&directory=/custom/path"
```
**Response:**
```json
{
"directory": "/mnt/docker/resource",
"total_files": 3,
"files": [
{
"filename": "scene_one.mp4",
"size": 1048576,
"type": "video",
"modified": 1691234567,
"download_url_video": "/download/video/scene_one.mp4",
"download_url_audio": null
},
{
"filename": "output.wav",
"size": 524288,
"type": "audio",
"modified": 1691234568,
"download_url_video": null,
"download_url_audio": "/download/audio/output.wav"
}
]
}
```
---
### 6. List Generated Files by UUID
**Endpoint:** `GET /list/generated/<uuid>`
**Description:** List generated files for a specific UUID
**Parameters:**
- `uuid` (path parameter): The unique identifier to search for generated files
**Example:**
```bash
curl "http://localhost:5000/list/generated/5"
```
**Response:**
```json
{
"uuid": "5",
"total_files": 2,
"generated_files": [
{
"type": "audio",
"filename": "5output.wav",
"path": "/mnt/docker/resource/5output.wav",
"download_url": "/download/generated/audio/5"
},
{
"type": "video",
"filename": "5output.mp4",
"path": "/mnt/docker/resource/5output.mp4",
"download_url": "/download/generated/video/5"
}
]
}
```
## Error Responses
All endpoints return JSON error responses with appropriate HTTP status codes:
```json
{
"error": "Error message describing what went wrong"
}
```
Common HTTP status codes:
- `200`: Success
- `400`: Bad request (invalid file type, etc.)
- `404`: File not found
- `500`: Internal server error
## File Type Support
**Video formats:** `.mp4`, `.avi`, `.mov`, `.mkv`, `.wmv`, `.flv`, `.webm`
**Audio formats:** `.wav`, `.mp3`, `.aac`, `.flac`, `.ogg`, `.m4a`, `.wma`
## Security Notes
- File paths are validated to prevent directory traversal attacks
- Only files with recognized video/audio extensions are downloadable
- Files must exist in the specified directory to be downloaded
## Usage Examples
### Python Example
```python
import requests
# List all files
response = requests.get("http://localhost:5000/list/files")
files = response.json()
# Download a specific video
response = requests.get("http://localhost:5000/download/video/scene_one.mp4")
if response.status_code == 200:
with open("downloaded_video.mp4", "wb") as f:
f.write(response.content)
# Download generated audio by UUID
response = requests.get("http://localhost:5000/download/generated/audio/5")
if response.status_code == 200:
with open("generated_audio.wav", "wb") as f:
f.write(response.content)
```
### JavaScript Example
```javascript
// List files
fetch('/list/files')
.then(response => response.json())
.then(data => console.log(data));
// Download file
fetch('/download/video/scene_one.mp4')
.then(response => response.blob())
.then(blob => {
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'scene_one.mp4';
a.click();
});
```
## Testing
Use the provided test script to verify the APIs:
```bash
python test_download_api.py
```
Make sure your Flask server is running on port 5000 before testing.

368
function/async_video_api.py Normal file
View File

@@ -0,0 +1,368 @@
"""
异步视频处理和数字人合成API模块
支持文件上传、异步任务处理、模板管理等功能
"""
import asyncio
import uuid as uuid_lib
from typing import Optional, Dict, Any, List
import os
import json
import time
from enum import Enum
from dataclasses import dataclass
import threading
import queue
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def resolve_file_path(filename: str, file_type: str) -> str:
"""
解析文件路径支持Docker容器挂载目录
Args:
filename: 文件名(可能包含路径)
file_type: 文件类型 ('audio', 'video', 'image')
Returns:
可访问的文件路径
"""
# 如果是完整路径,直接返回
if os.path.isabs(filename) and os.path.exists(filename):
return filename
# 仅文件名的情况,尝试多个可能的路径
base_filename = os.path.basename(filename)
# 定义搜索路径优先级
search_paths = []
if file_type == 'audio':
search_paths = [
f"/mnt/docker/resource/uploads/audio/{base_filename}",
f"{os.path.expanduser('~/heygem_data/voice/data')}/{base_filename}",
f"/mnt/docker/resource/{base_filename}",
f"/mnt/docker/code/data/temp/{base_filename}"
]
elif file_type == 'video':
search_paths = [
f"/mnt/docker/resource/uploads/video/{base_filename}",
f"{os.path.expanduser('~/heygem_data/face2face/temp')}/{base_filename}",
f"/mnt/docker/resource/{base_filename}",
f"/mnt/docker/video_resource/{base_filename}"
]
elif file_type == 'image':
search_paths = [
f"/mnt/docker/resource/uploads/image/{base_filename}",
f"/mnt/docker/resource/{base_filename}"
]
# 查找第一个存在的文件
for path in search_paths:
if os.path.exists(path):
logger.info(f"Resolved {file_type} file: {filename} -> {path}")
return path
# 如果都没找到,返回原始文件名(让调用者处理错误)
logger.warning(f"Could not resolve {file_type} file path: {filename}")
return filename
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待中
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消
class TaskType(Enum):
"""任务类型枚举"""
VOICE_GENERATION = "voice_generation"
DIGITAL_HUMAN_CREATION = "digital_human_creation"
VIDEO_COMPOSITION = "video_composition"
TEMPLATE_CREATION = "template_creation"
AUDIO_EXTRACTION = "audio_extraction"
@dataclass
class Task:
"""任务数据类"""
task_id: str
task_type: TaskType
status: TaskStatus
progress: int = 0
result: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
created_at: datetime = None
updated_at: datetime = None
user_id: Optional[str] = None
input_data: Optional[Dict[str, Any]] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
self.updated_at = self.created_at
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.task_queue = queue.Queue()
self.worker_thread = None
self.is_running = False
def start_worker(self):
"""启动工作线程"""
if not self.is_running:
self.is_running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
logger.info("Task worker started")
def stop_worker(self):
"""停止工作线程"""
self.is_running = False
if self.worker_thread:
self.worker_thread.join()
logger.info("Task worker stopped")
def create_task(self, task_type: TaskType, input_data: Dict[str, Any], user_id: str = None) -> str:
"""创建新任务"""
task_id = str(uuid_lib.uuid4())
task = Task(
task_id=task_id,
task_type=task_type,
status=TaskStatus.PENDING,
input_data=input_data,
user_id=user_id
)
self.tasks[task_id] = task
self.task_queue.put(task_id)
logger.info(f"Created task {task_id} of type {task_type.value}")
return task_id
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务信息"""
return self.tasks.get(task_id)
def update_task_status(self, task_id: str, status: TaskStatus, progress: int = None, result: Dict[str, Any] = None, error_message: str = None):
"""更新任务状态"""
if task_id in self.tasks:
task = self.tasks[task_id]
task.status = status
if progress is not None:
task.progress = progress
if result is not None:
task.result = result
if error_message is not None:
task.error_message = error_message
task.updated_at = datetime.now()
logger.info(f"Updated task {task_id}: status={status.value}, progress={progress}")
def _worker_loop(self):
"""工作线程主循环"""
while self.is_running:
try:
task_id = self.task_queue.get(timeout=1)
self._process_task(task_id)
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in worker loop: {str(e)}")
def _process_task(self, task_id: str):
"""处理具体任务"""
task = self.get_task(task_id)
if not task:
logger.error(f"Task {task_id} not found")
return
try:
self.update_task_status(task_id, TaskStatus.PROCESSING, 10)
if task.task_type == TaskType.VOICE_GENERATION:
result = self._process_voice_generation(task)
elif task.task_type == TaskType.DIGITAL_HUMAN_CREATION:
result = self._process_digital_human_creation(task)
elif task.task_type == TaskType.VIDEO_COMPOSITION:
result = self._process_video_composition(task)
elif task.task_type == TaskType.TEMPLATE_CREATION:
result = self._process_template_creation(task)
elif task.task_type == TaskType.AUDIO_EXTRACTION:
result = self._process_audio_extraction(task)
else:
raise ValueError(f"Unknown task type: {task.task_type}")
self.update_task_status(task_id, TaskStatus.COMPLETED, 100, result)
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
self.update_task_status(task_id, TaskStatus.FAILED, error_message=str(e))
def _process_voice_generation(self, task: Task) -> Dict[str, Any]:
"""处理语音生成任务"""
import digital_human_api
input_data = task.input_data
text = input_data['text']
reference_audio = input_data.get('reference_audio')
reference_text = input_data.get('reference_text', '')
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 30)
# 如果需要先训练声音模型
if reference_audio and not reference_text:
voice_result = digital_human_api.train_voice_v2(reference_audio)
reference_audio = voice_result.get('asr_format_audio_url')
reference_text = voice_result.get('reference_audio_text', '')
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 60)
# 生成语音
audio_path = digital_human_api.generate_voice_v2(text, reference_audio, reference_text, uuid)
return {
"audio_path": audio_path,
"audio_url": f"/download/generated/audio/{uuid}",
"uuid": uuid,
"text": text
}
def _process_digital_human_creation(self, task: Task) -> Dict[str, Any]:
"""处理数字人创建任务"""
import digital_human_api
input_data = task.input_data
speech_text = input_data['speech_text']
sample_video = input_data['sample_video']
sample_voice = input_data['sample_voice']
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 20)
# 生成数字人(直接传递文件名)
result = digital_human_api.generate_digital_human_v2(
speech_text,
sample_video,
sample_voice,
uuid
)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 80)
return {
"digital_human_result": result,
"video_url": f"/download/generated/video/{uuid}",
"audio_url": f"/download/generated/audio/{uuid}",
"uuid": uuid,
"speech_text": speech_text
}
def _process_video_composition(self, task: Task) -> Dict[str, Any]:
"""处理视频合成任务"""
import api
input_data = task.input_data
template_id = input_data['template_id']
audio_file = input_data['audio_file']
text_content = input_data.get('text_content', '')
uuid = input_data.get('uuid', task.task_id[:8])
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 40)
# 获取模板信息
template_info = self._get_template_info(template_id)
if not template_info:
raise ValueError(f"Template {template_id} not found")
# 使用模板合成视频
result = api.generate_video(template_info['video_path'], audio_file, uuid)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 90)
return {
"composition_result": result,
"video_url": f"/download/generated/video/{uuid}",
"template_id": template_id,
"uuid": uuid
}
def _process_template_creation(self, task: Task) -> Dict[str, Any]:
"""处理模板创建任务"""
import api
input_data = task.input_data
person_image = input_data['person_image']
title_text = input_data.get('title_text', '')
title_position = tuple(input_data.get('title_position', [50, 50]))
title_font_size = input_data.get('title_font_size', 48)
background_image = input_data['background_image']
video_length = input_data.get('video_length', 10.0)
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 50)
# 生成带标题的模板视频
template_path = api.generate_video_with_title(
person_image, title_text, title_position,
title_font_size, background_image, video_length
)
# 保存模板信息
template_id = self._save_template_info({
"template_path": template_path,
"title_text": title_text,
"person_image": person_image,
"background_image": background_image,
"created_at": datetime.now().isoformat()
})
return {
"template_id": template_id,
"template_path": template_path,
"template_url": f"/download/template/{template_id}",
"title_text": title_text
}
def _process_audio_extraction(self, task: Task) -> Dict[str, Any]:
"""处理音频提取任务"""
import api
input_data = task.input_data
video_file = input_data['video_file']
self.update_task_status(task.task_id, TaskStatus.PROCESSING, 50)
# 提取文本
extracted_text = api.speech_to_text(video_file)
return {
"extracted_text": extracted_text,
"video_file": video_file
}
def _get_template_info(self, template_id: str) -> Optional[Dict[str, Any]]:
"""获取模板信息"""
template_file = f"/mnt/docker/resource/templates/{template_id}.json"
if os.path.exists(template_file):
with open(template_file, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def _save_template_info(self, template_data: Dict[str, Any]) -> str:
"""保存模板信息"""
template_id = str(uuid_lib.uuid4())
template_dir = "/mnt/docker/resource/templates"
os.makedirs(template_dir, exist_ok=True)
template_file = os.path.join(template_dir, f"{template_id}.json")
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(template_data, f, ensure_ascii=False, indent=2)
return template_id
# 全局任务管理器实例
task_manager = TaskManager()
task_manager.start_worker()

View File

@@ -0,0 +1,211 @@
"""
优化后的数字人生成API
按照新的文件管理架构设计
"""
import requests
import os
import time
import logging
from typing import Dict, Any
from api import Config, APIException, _make_request, _copy_file
logger = logging.getLogger(__name__)
def generate_digital_human_v2(speech_text: str, sample_video: str, sample_voice: str, uuid: str) -> Dict[str, Any]:
"""
生成数字人 (优化版本)
Args:
speech_text: 语音文本
sample_video: 样本视频文件名(在上传目录中)
sample_voice: 样本语音文件名(在上传目录中)
uuid: 唯一标识符
Returns:
生成结果
Raises:
APIException: 生成失败时抛出异常
"""
logger.info(f"Generating digital human for UUID: {uuid}")
try:
from file_upload import file_manager
# 步骤1: 将音频文件复制到TTS服务目录进行预处理
tts_audio_filename = file_manager.copy_audio_for_tts(sample_voice, f"dh_{uuid}")
# 步骤2: 预处理和训练语音模型
url = f"{Config.VOICE_SERVICE_URL}/v1/preprocess_and_tran"
request_body_one = {
"format": "wav",
"reference_audio": tts_audio_filename, # 使用TTS目录中的文件名
"lang": "zh"
}
response_one = _make_request(url, request_body_one)
# 检查响应中是否包含必要字段
if 'asr_format_audio_url' not in response_one:
logger.error(f"Voice preprocessing response missing asr_format_audio_url: {response_one}")
raise APIException(f"Voice preprocessing failed: missing asr_format_audio_url in response", 500)
asr_format_audio_url = response_one['asr_format_audio_url']
reference_audio_text = response_one.get('reference_audio_text', '')
# 步骤3: 生成语音
url = f"{Config.VOICE_SERVICE_URL}/v1/invoke"
request_body_two = {
"speaker": uuid,
"text": speech_text,
"reference_audio": asr_format_audio_url,
"reference_text": reference_audio_text,
**Config.DEFAULT_VOICE_PARAMS
}
response = requests.post(url, json=request_body_two, timeout=60)
logger.info(f"Voice generation response status: {response.status_code}")
if response.status_code != 200:
logger.error(f"Voice generation failed: {response.text}")
raise APIException(f"Voice generation failed: {response.text}", response.status_code)
# 保存生成的音频文件到resource目录
generated_audio_filename = f"{uuid}output.wav"
generated_audio_path = os.path.join(Config.RESOURCE_DIR, generated_audio_filename)
with open(generated_audio_path, "wb") as f:
f.write(response.content)
logger.info(f"Generated audio saved: {generated_audio_path}")
# 步骤4: 将视频和生成的音频复制到Face2Face服务目录
# 首先将生成的音频从resource目录复制到uploads目录临时
upload_audio_dir = "/mnt/docker/resource/uploads/audio"
os.makedirs(upload_audio_dir, exist_ok=True)
upload_audio_path = os.path.join(upload_audio_dir, generated_audio_filename)
_copy_file(generated_audio_path, upload_audio_path)
face2face_video, face2face_audio = file_manager.copy_files_for_face2face(
sample_video, generated_audio_filename, uuid
)
# 步骤5: 提交视频生成任务
url = f"{Config.VIDEO_SERVICE_URL}/easy/submit"
request_body = {
"audio_url": face2face_audio,
"video_url": face2face_video,
"code": str(int(time.time())), # 使用时间戳作为唯一任务ID
"chaofen": 0,
"watermark_switch": 0,
"pn": 1
}
result = _make_request(url, request_body)
logger.info(f"Digital human generation submitted successfully: {result}")
# 清理临时文件
try:
os.remove(upload_audio_path)
except:
pass
return result
except Exception as e:
logger.error(f"Failed to generate digital human: {str(e)}")
if isinstance(e, APIException):
raise
raise APIException(f"Failed to generate digital human: {str(e)}", 500)
def generate_voice_v2(text: str, reference_audio: str, reference_text: str, uuid: str) -> str:
"""
生成语音 (优化版本)
Args:
text: 要转换的文本
reference_audio: 参考音频文件名(在上传目录中)
reference_text: 参考文本
uuid: 唯一标识符
Returns:
生成的音频文件路径在resource目录中
Raises:
APIException: 生成失败时抛出异常
"""
logger.info(f"Generating voice for UUID: {uuid}")
try:
from file_upload import file_manager
# 先将参考音频复制到TTS服务目录
tts_audio_filename = file_manager.copy_audio_for_tts(reference_audio, uuid)
url = f"{Config.VOICE_SERVICE_URL}/v1/invoke"
request_body = {
"speaker": uuid,
"text": text,
"reference_audio": tts_audio_filename, # 使用TTS目录中的文件名
"reference_text": reference_text,
**Config.DEFAULT_VOICE_PARAMS
}
response = requests.post(url, json=request_body, timeout=60)
logger.info(f"Voice generation response status: {response.status_code}")
if response.status_code == 200:
# 保存生成的音频到resource目录
output_filename = f"{uuid}output.wav"
output_path = os.path.join(Config.RESOURCE_DIR, output_filename)
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"Generated voice saved to: {output_path}")
return output_path
else:
logger.error(f"Voice generation failed: {response.text}")
raise APIException(f"Voice generation failed: {response.text}", response.status_code)
except Exception as e:
if isinstance(e, APIException):
raise
logger.error(f"Network error during voice generation: {str(e)}")
raise APIException(f"Voice generation error: {str(e)}", 500)
def train_voice_v2(voice_file_name: str) -> Dict[str, Any]:
"""
训练语音模型 (优化版本)
Args:
voice_file_name: 语音文件名(在上传目录中)
Returns:
训练结果
Raises:
APIException: 训练失败时抛出异常
"""
logger.info(f"Training voice model with file: {voice_file_name}")
try:
from file_upload import file_manager
# 将音频文件复制到TTS服务目录
temp_uuid = "train_" + str(int(time.time()))
tts_audio_filename = file_manager.copy_audio_for_tts(voice_file_name, temp_uuid)
url = f"{Config.VOICE_SERVICE_URL}/v1/preprocess_and_tran"
request_body = {
"format": "wav",
"reference_audio": tts_audio_filename, # 使用TTS目录中的文件名
"lang": "zh"
}
return _make_request(url, request_body)
except Exception as e:
if isinstance(e, APIException):
raise
logger.error(f"Voice training error: {str(e)}")
raise APIException(f"Voice training error: {str(e)}", 500)

523
function/miniprogram_api.py Normal file
View File

@@ -0,0 +1,523 @@
"""
数字人小程序API路由系统
提供完整的文件上传、异步任务处理、数字人生成等功能
"""
from flask import Flask, request, jsonify, send_file
import os
import logging
from typing import Dict, Any
import json
from datetime import datetime
# 导入我们的模块
import api
from async_video_api import task_manager, TaskType, TaskStatus
from file_upload import file_manager, save_uploaded_file, get_uploaded_file_info, list_uploaded_files
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB 最大上传大小
logger = logging.getLogger(__name__)
# ================================
# 文件上传相关API
# ================================
@app.route('/api/upload/video', methods=['POST'])
def upload_video():
"""上传视频文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'video', custom_name)
return jsonify({
"success": True,
"message": "视频上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Video upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/upload/audio', methods=['POST'])
def upload_audio():
"""上传音频文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'audio', custom_name)
return jsonify({
"success": True,
"message": "音频上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Audio upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/upload/image', methods=['POST'])
def upload_image():
"""上传图片文件"""
try:
if 'file' not in request.files:
return jsonify({"error": "没有文件"}), 400
file = request.files['file']
custom_name = request.form.get('custom_name')
file_info = save_uploaded_file(file, 'image', custom_name)
return jsonify({
"success": True,
"message": "图片上传成功",
"file_info": file_info
})
except Exception as e:
logger.error(f"Image upload failed: {str(e)}")
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/files/list', methods=['GET'])
def list_files():
"""列出已上传的文件"""
try:
file_type = request.args.get('type') # video, audio, image 或 None (全部)
files = list_uploaded_files(file_type)
return jsonify({
"success": True,
"files": files,
"total": len(files)
})
except Exception as e:
logger.error(f"List files failed: {str(e)}")
return jsonify({"error": f"获取文件列表失败: {str(e)}"}), 500
# ================================
# 异步任务相关API
# ================================
@app.route('/api/tasks/voice/generate', methods=['POST'])
def create_voice_generation_task():
"""创建语音生成任务"""
try:
data = request.get_json()
required_fields = ['text']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.VOICE_GENERATION,
{
'text': data['text'],
'reference_audio': data.get('reference_audio'),
'reference_text': data.get('reference_text', ''),
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "语音生成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create voice generation task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/digital-human/create', methods=['POST'])
def create_digital_human_task():
"""创建数字人生成任务"""
try:
data = request.get_json()
required_fields = ['speech_text', 'sample_video', 'sample_voice']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.DIGITAL_HUMAN_CREATION,
{
'speech_text': data['speech_text'],
'sample_video': data['sample_video'],
'sample_voice': data['sample_voice'],
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "数字人生成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create digital human task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/template/create', methods=['POST'])
def create_template_task():
"""创建数字人模板任务"""
try:
data = request.get_json()
required_fields = ['person_image', 'background_image']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.TEMPLATE_CREATION,
{
'person_image': data['person_image'],
'background_image': data['background_image'],
'title_text': data.get('title_text', ''),
'title_position': data.get('title_position', [50, 50]),
'title_font_size': data.get('title_font_size', 48),
'video_length': data.get('video_length', 10.0)
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "数字人模板创建任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create template task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/video/compose', methods=['POST'])
def create_video_composition_task():
"""创建视频合成任务"""
try:
data = request.get_json()
required_fields = ['template_id', 'audio_file']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.VIDEO_COMPOSITION,
{
'template_id': data['template_id'],
'audio_file': data['audio_file'],
'text_content': data.get('text_content', ''),
'uuid': data.get('uuid')
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "视频合成任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create video composition task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/task/process', methods=['POST'])
def process_task():
"""处理指定任务"""
try:
# 支持 form-data 和 application/json 两种方式
if request.is_json:
task_id = request.json.get('task_id')
else:
task_id = request.form.get('task_id')
if not task_id:
return jsonify({"error": "缺少任务ID"}), 400
# 调用任务处理函数
task_manager._process_task(task_id)
return jsonify({
"success": True,
"message": f"任务 {task_id} 正在处理或已完成"
})
except Exception as e:
logger.error(f"Task processing failed: {str(e)}")
return jsonify({"error": f"任务处理失败: {str(e)}"}), 500
@app.route('/api/tasks/audio/extract', methods=['POST'])
def create_audio_extraction_task():
"""创建音频提取任务"""
try:
data = request.get_json()
required_fields = ['video_file']
if not all(field in data for field in required_fields):
return jsonify({"error": "缺少必要字段"}), 400
# 创建任务
task_id = task_manager.create_task(
TaskType.AUDIO_EXTRACTION,
{
'video_file': data['video_file']
},
user_id=data.get('user_id')
)
return jsonify({
"success": True,
"task_id": task_id,
"message": "音频提取任务已创建",
"status_url": f"/api/tasks/{task_id}/status"
})
except Exception as e:
logger.error(f"Create audio extraction task failed: {str(e)}")
return jsonify({"error": f"创建任务失败: {str(e)}"}), 500
@app.route('/api/tasks/<task_id>/status', methods=['GET'])
def get_task_status(task_id):
"""获取任务状态"""
try:
task = task_manager.get_task(task_id)
if not task:
return jsonify({"error": "任务不存在"}), 404
return jsonify({
"success": True,
"task_id": task_id,
"status": task.status.value,
"progress": task.progress,
"result": task.result,
"error_message": task.error_message,
"created_at": task.created_at.isoformat() if task.created_at else None,
"updated_at": task.updated_at.isoformat() if task.updated_at else None
})
except Exception as e:
logger.error(f"Get task status failed: {str(e)}")
return jsonify({"error": f"获取任务状态失败: {str(e)}"}), 500
@app.route('/api/tasks/<task_id>/cancel', methods=['POST'])
def cancel_task(task_id):
"""取消任务"""
try:
task = task_manager.get_task(task_id)
if not task:
return jsonify({"error": "任务不存在"}), 404
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
return jsonify({"error": "任务已完成或已取消"}), 400
task_manager.update_task_status(task_id, TaskStatus.CANCELLED)
return jsonify({
"success": True,
"message": "任务已取消"
})
except Exception as e:
logger.error(f"Cancel task failed: {str(e)}")
return jsonify({"error": f"取消任务失败: {str(e)}"}), 500
# ================================
# 文件下载相关API
# ================================
@app.route('/api/download/upload/<file_type>/<filename>', methods=['GET'])
def download_uploaded_file(file_type, filename):
"""下载上传的文件"""
try:
file_info = get_uploaded_file_info(file_type, filename)
if not file_info:
return jsonify({"error": "文件不存在"}), 404
return send_file(
file_info['file_path'],
as_attachment=True,
download_name=filename,
mimetype=file_info.get('mime_type')
)
except Exception as e:
logger.error(f"Download uploaded file failed: {str(e)}")
return jsonify({"error": f"下载文件失败: {str(e)}"}), 500
@app.route('/api/download/generated/video/<uuid>', methods=['GET'])
def download_generated_video(uuid):
"""下载生成的视频"""
try:
return api.download_generated_video(uuid)
except Exception as e:
logger.error(f"Download generated video failed: {str(e)}")
return jsonify({"error": f"下载生成视频失败: {str(e)}"}), 500
@app.route('/api/download/generated/audio/<uuid>', methods=['GET'])
def download_generated_audio(uuid):
"""下载生成的音频"""
try:
return api.download_generated_audio(uuid)
except Exception as e:
logger.error(f"Download generated audio failed: {str(e)}")
return jsonify({"error": f"下载生成音频失败: {str(e)}"}), 500
@app.route('/api/download/template/<template_id>', methods=['GET'])
def download_template(template_id):
"""下载模板文件"""
try:
template_info = task_manager._get_template_info(template_id)
if not template_info:
return jsonify({"error": "模板不存在"}), 404
template_path = template_info.get('template_path')
if not template_path or not os.path.exists(template_path):
return jsonify({"error": "模板文件不存在"}), 404
return send_file(
template_path,
as_attachment=True,
download_name=f"template_{template_id}.mp4",
mimetype='video/mp4'
)
except Exception as e:
logger.error(f"Download template failed: {str(e)}")
return jsonify({"error": f"下载模板失败: {str(e)}"}), 500
# ================================
# 模板管理相关API
# ================================
@app.route('/api/templates/list', methods=['GET'])
def list_templates():
"""列出所有模板"""
try:
templates_dir = "/mnt/docker/resource/templates"
if not os.path.exists(templates_dir):
return jsonify({"success": True, "templates": [], "total": 0})
templates = []
for filename in os.listdir(templates_dir):
if filename.endswith('.json'):
template_id = filename[:-5] # 移除.json扩展名
template_info = task_manager._get_template_info(template_id)
if template_info:
template_info['template_id'] = template_id
template_info['download_url'] = f"/api/download/template/{template_id}"
templates.append(template_info)
return jsonify({
"success": True,
"templates": templates,
"total": len(templates)
})
except Exception as e:
logger.error(f"List templates failed: {str(e)}")
return jsonify({"error": f"获取模板列表失败: {str(e)}"}), 500
@app.route('/api/templates/<template_id>', methods=['GET'])
def get_template_info(template_id):
"""获取模板信息"""
try:
template_info = task_manager._get_template_info(template_id)
if not template_info:
return jsonify({"error": "模板不存在"}), 404
template_info['template_id'] = template_id
template_info['download_url'] = f"/api/download/template/{template_id}"
return jsonify({
"success": True,
"template": template_info
})
except Exception as e:
logger.error(f"Get template info failed: {str(e)}")
return jsonify({"error": f"获取模板信息失败: {str(e)}"}), 500
# ================================
# 系统状态相关API
# ================================
@app.route('/api/system/status', methods=['GET'])
def get_system_status():
"""获取系统状态"""
try:
# 统计任务状态
task_stats = {}
for task in task_manager.tasks.values():
status = task.status.value
task_stats[status] = task_stats.get(status, 0) + 1
# 统计文件数量
file_stats = {}
for file_type in ['video', 'audio', 'image']:
files = list_uploaded_files(file_type)
file_stats[file_type] = len(files)
return jsonify({
"success": True,
"system_status": "running",
"task_stats": task_stats,
"file_stats": file_stats,
"total_tasks": len(task_manager.tasks),
"worker_running": task_manager.is_running
})
except Exception as e:
logger.error(f"Get system status failed: {str(e)}")
return jsonify({"error": f"获取系统状态失败: {str(e)}"}), 500
# ================================
# 错误处理
# ================================
@app.errorhandler(413)
def too_large(e):
return jsonify({"error": "文件过大"}), 413
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "接口不存在"}), 404
@app.errorhandler(500)
def internal_error(e):
return jsonify({"error": "服务器内部错误"}), 500
if __name__ == '__main__':
# 启动时清理旧的临时文件
file_manager.cleanup_temp_files()
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=True)