import mysql.connector def connect_to_db(host="localhost", user="root", password="password", database="media_db"): return mysql.connector.connect( host=host, user=user, password=password, database=database ) # ------------------------------ # Insert or Update Video Entry # ------------------------------ def insert_or_update_video(cursor, uuid, video_path, silenced_video_path=None, model_name=None): cursor.execute(""" INSERT INTO videos (uuid, video_path, silenced_video_path, model_name) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE video_path = VALUES(video_path), silenced_video_path = VALUES(silenced_video_path), model_name = VALUES(model_name) """, (uuid, video_path, silenced_video_path, model_name)) # ------------------------------ # Insert or Update Audio Entry # ------------------------------ def insert_or_update_audio(cursor, uuid, audio_type, file_path): if audio_type not in ('original', 'formatted'): raise ValueError("audio_type must be 'original' or 'formatted'") cursor.execute(""" SELECT id FROM audios WHERE uuid = %s AND type = %s """, (uuid, audio_type)) if cursor.fetchone(): cursor.execute(""" UPDATE audios SET file_path = %s WHERE uuid = %s AND type = %s """, (file_path, uuid, audio_type)) else: cursor.execute(""" INSERT INTO audios (uuid, type, file_path) VALUES (%s, %s, %s) """, (uuid, audio_type, file_path)) # ------------------------------ # Insert or Update Reference Text # ------------------------------ def insert_or_update_ref_text(cursor, uuid, ref_text): cursor.execute(""" INSERT INTO reference_texts (uuid, ref_text) VALUES (%s, %s) ON DUPLICATE KEY UPDATE ref_text = VALUES(ref_text) """, (uuid, ref_text)) # ------------------------------ # Delete All Entries for a UUID # ------------------------------ def delete_entry(db, uuid): cursor = db.cursor() cursor.execute("DELETE FROM reference_texts WHERE uuid = %s", (uuid,)) cursor.execute("DELETE FROM audios WHERE uuid = %s", (uuid,)) cursor.execute("DELETE FROM videos WHERE uuid = %s", (uuid,)) db.commit() cursor.close() # ------------------------------ # Insert New Media Record # ------------------------------ def insert_entry(db, uuid, video_path, silenced_video_path=None, model_name=None, audio_paths=None, ref_text=None): cursor = db.cursor() insert_or_update_video(cursor, uuid, video_path, silenced_video_path, model_name) if audio_paths: for typ, path in audio_paths.items(): if path: insert_or_update_audio(cursor, uuid, typ, path) if ref_text: insert_or_update_ref_text(cursor, uuid, ref_text) db.commit() cursor.close() # ------------------------------ # Update Entry (Selective) # ------------------------------ def update_entry(db, uuid, video_path=None, silenced_video_path=None, model_name=None, audio_paths=None, ref_text=None): cursor = db.cursor() if video_path or silenced_video_path or model_name: insert_or_update_video(cursor, uuid, video_path or get_existing_value(cursor, uuid, 'video_path'), silenced_video_path or get_existing_value(cursor, uuid, 'silenced_video_path'), model_name or get_existing_value(cursor, uuid, 'model_name')) if audio_paths: for typ, path in audio_paths.items(): if path: insert_or_update_audio(cursor, uuid, typ, path) if ref_text: insert_or_update_ref_text(cursor, uuid, ref_text) db.commit() cursor.close() def get_existing_value(cursor, uuid, field): cursor.execute(f"SELECT {field} FROM videos WHERE uuid = %s", (uuid,)) result = cursor.fetchone() return result[0] if result else None