import subprocess
import random
import re
from pathlib import Path
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
class VerticalVideoGenerator:
def __init__(self, source_folder, temp_folder, preview_generator, shorts_resolution=(1080, 1920), target_duration=59):
"""
Ініціалізація генератора вертикальних відео (Shorts)
Args:
source_folder: Папка з фоновими відео
temp_folder: Папка для тимчасових файлів
preview_generator: Об'єкт PreviewGenerator для створення превью
shorts_resolution: Розмір Shorts відео (ширина, висота)
target_duration: Цільова тривалість Shorts у секундах
"""
self.source_folder = Path(source_folder)
self.temp_folder = Path(temp_folder)
self.preview_generator = preview_generator
self.shorts_resolution = shorts_resolution
self.target_duration = target_duration
def apply_safe_fade_filters(self, use_dur, base_filters):
"""Застосовує fade фільтри безпечно для вертикального відео"""
fade_in_duration = min(0.3, use_dur * 0.3)
fade_out_duration = min(0.3, use_dur * 0.3)
if fade_in_duration + fade_out_duration > use_dur:
fade_in_duration = use_dur * 0.4
fade_out_duration = use_dur * 0.4
fade_out_start = max(0, use_dur - fade_out_duration)
fade_filters = []
if fade_in_duration > 0.05:
fade_filters.append(f"fade=t=in:st=0:d={fade_in_duration}")
if fade_out_duration > 0.05 and fade_out_start > fade_in_duration:
fade_filters.append(f"fade=t=out:st={fade_out_start}:d={fade_out_duration}")
all_filters = [base_filters] + fade_filters
return ",".join(all_filters)
def truncate_story_for_duration(self, story_text, target_duration, preview_duration):
"""Обрізає історію до цільової тривалості, завершуючи на кінці речення"""
# Розділяємо на речення
sentences = re.split(r'[.!?]+', story_text)
sentences = [s.strip() for s in sentences if s.strip()]
truncated_text = ""
current_duration = preview_duration + 0.5 # preview + пауза
for sentence in sentences:
# Приблизно розраховуємо тривалість речення (середня швидкість мовлення ~150 слів/хв)
words_count = len(sentence.split())
sentence_duration = words_count / 2.5 # приблизно 2.5 слова в секунду
if current_duration + sentence_duration > target_duration - 5: # залишаємо 5 секунд на кінцівку
break
truncated_text += sentence + ". "
current_duration += sentence_duration
return truncated_text.strip()
def create_preview_segments_vertical(self, story_id, preview_duration):
"""Створює вертикальні фонові сегменти для preview"""
bg_videos = list(self.source_folder.glob("*.mp4"))
random.shuffle(bg_videos)
bg_iter = iter(bg_videos)
preview_clips = []
preview_duration_collected = 0
while preview_duration_collected < preview_duration:
try:
bg_video = next(bg_iter)
except StopIteration:
bg_videos_reset = list(self.source_folder.glob("*.mp4"))
random.shuffle(bg_videos_reset)
bg_iter = iter(bg_videos_reset)
bg_video = next(bg_iter)
clip = VideoFileClip(str(bg_video))
remaining_preview_duration = preview_duration - preview_duration_collected
use_duration = min(clip.duration, remaining_preview_duration)
clip.close()
if use_duration <= 0:
break
preview_clip_tmp = self.temp_folder / f"{story_id}_shorts_preview_bg_{len(preview_clips):02d}.mp4"
# Створюємо вертикальний кліп з потрібною тривалістю
subprocess.run([
"ffmpeg", "-y", "-i", str(bg_video), "-ss", "0", "-t", str(use_duration),
"-vf", f"scale='max({self.shorts_resolution[0]},iw*{self.shorts_resolution[1]}/ih)':'max({self.shorts_resolution[1]},ih*{self.shorts_resolution[0]}/iw)',crop={self.shorts_resolution[0]}:{self.shorts_resolution[1]},fps=30",
"-an", "-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "5M", str(preview_clip_tmp)
], check=True)
preview_clips.append(preview_clip_tmp)
preview_duration_collected += use_duration
return preview_clips, bg_iter
def create_preview_with_overlay_vertical(self, story_id, preview_clips, horizontal_preview_path, preview_duration):
"""Створює preview відео з накладеним вертикальним preview зображенням"""
# Об'єднуємо всі preview кліпи
if len(preview_clips) > 1:
preview_concat_list = self.temp_folder / f"{story_id}_shorts_preview_concat.txt"
with open(preview_concat_list, "w", encoding="utf-8") as f:
for clip in preview_clips:
f.write(f"file '{clip.as_posix()}'\n")
preview_combined = self.temp_folder / f"{story_id}_shorts_preview_combined.mp4"
subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(preview_concat_list),
"-c:v", "h264_nvenc", "-b:v", "5M", "-preset", "p4", str(preview_combined)
], check=True)
else:
preview_combined = preview_clips[0]
# Створюємо вертикальне preview зображення
preview_img_path = self.temp_folder / f"{story_id}_preview_vertical.png"
self.preview_generator.draw_vertical_preview(horizontal_preview_path, preview_img_path)
# Накладаємо preview зображення на відео
preview_video = self.temp_folder / f"{story_id}_shorts_preview_video.mp4"
subprocess.run([
"ffmpeg", "-y", "-i", str(preview_combined), "-i", str(preview_img_path),
"-filter_complex",
"[1:v]format=rgba[overlay];[0:v][overlay]overlay=(W-w)/2:(H-h)/2:format=auto",
"-t", str(preview_duration), "-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "5M", str(preview_video)
], check=True)
return preview_video
def create_story_segments_vertical(self, story_id, bg_iter, story_duration):
"""Створює вертикальні відео сегменти для основної частини історії"""
used_clips = []
remaining_story_duration = story_duration
for idx, bg in enumerate(bg_iter):
if remaining_story_duration <= 0:
break
clip = VideoFileClip(str(bg))
use_dur = min(clip.duration, remaining_story_duration)
clip.close()
if use_dur <= 0:
break
temp_out = self.temp_folder / f"shorts_clip_{idx:02d}.mp4"
base_filters = f"scale='max({self.shorts_resolution[0]},iw*{self.shorts_resolution[1]}/ih)':'max({self.shorts_resolution[1]},ih*{self.shorts_resolution[0]}/iw)',crop={self.shorts_resolution[0]}:{self.shorts_resolution[1]},fps=30"
filter_string = self.apply_safe_fade_filters(use_dur, base_filters)
subprocess.run([
"ffmpeg", "-y", "-i", str(bg), "-ss", "0", "-t", str(use_dur),
"-vf", filter_string,
"-an", "-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "5M", str(temp_out)
], check=True)
used_clips.append(temp_out)
remaining_story_duration -= use_dur
# Якщо все ще не достатньо відео для story, додаємо ще
if remaining_story_duration > 0:
bg_videos_extra = list(self.source_folder.glob("*.mp4"))
random.shuffle(bg_videos_extra)
for idx, bg in enumerate(bg_videos_extra):
if remaining_story_duration <= 0:
break
clip = VideoFileClip(str(bg))
use_dur = min(clip.duration, remaining_story_duration)
clip.close()
if use_dur <= 0:
break
temp_out = self.temp_folder / f"shorts_clip_extra_{idx:02d}.mp4"
base_filters = f"scale='max({self.shorts_resolution[0]},iw*{self.shorts_resolution[1]}/ih)':'max({self.shorts_resolution[1]},ih*{self.shorts_resolution[0]}/iw)',crop={self.shorts_resolution[0]}:{self.shorts_resolution[1]},fps=30"
filter_string = self.apply_safe_fade_filters(use_dur, base_filters)
subprocess.run([
"ffmpeg", "-y", "-i", str(bg), "-ss", "0", "-t", str(use_dur),
"-vf", filter_string,
"-an", "-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "5M", str(temp_out)
], check=True)
used_clips.append(temp_out)
remaining_story_duration -= use_dur
return used_clips
def create_ending_segment(self, story_id, bg_iter, ending_duration):
"""Створює сегмент кінцівки з фоновим відео"""
ending_bg = self.temp_folder / f"{story_id}_ending_bg.mp4"
# Беремо наступне фонове відео для кінцівки
try:
ending_bg_source = next(bg_iter)
except StopIteration:
# Якщо відео закінчились, беремо перше знову
bg_videos_reset = list(self.source_folder.glob("*.mp4"))
random.shuffle(bg_videos_reset)
ending_bg_source = bg_videos_reset[0]
# Створюємо фонове відео для кінцівки з правильним масштабуванням
subprocess.run([
"ffmpeg", "-y", "-i", str(ending_bg_source), "-ss", "0", "-t", str(ending_duration),
"-vf", f"scale='max({self.shorts_resolution[0]},iw*{self.shorts_resolution[1]}/ih)':'max({self.shorts_resolution[1]},ih*{self.shorts_resolution[0]}/iw)',crop={self.shorts_resolution[0]}:{self.shorts_resolution[1]},fps=30",
"-an", "-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "5M", str(ending_bg)
], check=True)
return ending_bg
def combine_shorts_segments(self, story_id, preview_video, story_clips, ending_clip):
"""Об'єднує всі сегменти Shorts відео"""
used_clips = [preview_video] + story_clips + [ending_clip]
concat_list = self.temp_folder / f"{story_id}_shorts_concat.txt"
with open(concat_list, "w", encoding="utf-8") as f:
for clip in used_clips:
f.write(f"file '{clip.as_posix()}'\n")
combined_path = self.temp_folder / f"{story_id}_shorts_combined.mp4"
subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list),
"-c:v", "h264_nvenc", "-b:v", "5M", "-preset", "p4", str(combined_path)
], check=True)
return combined_path
def create_shorts_video(self, story_txt, preview_txt, story_id, horizontal_preview_path, coqui_tts):
"""
Створює вертикальне відео для Shorts
Args:
story_txt: Текст історії
preview_txt: Текст preview
story_id: ID історії
horizontal_preview_path: Шлях до горизонтального preview
coqui_tts: Функція TTS для генерації аудіо
Returns:
Path: Шлях до створеного Shorts відео
"""
print(f"???? Створення Shorts відео...")
# Генеруємо аудіо
audio_preview = coqui_tts(preview_txt)
# Обрізаємо історію для цільової тривалості
truncated_story = self.truncate_story_for_duration(story_txt, self.target_duration, audio_preview.duration_seconds)
audio_story = coqui_tts(truncated_story)
# Створюємо кінцівку англійською з паузою ПЕРЕД нею
ending_text = "Full video on the channel. Subscribe for more stories"
audio_ending = coqui_tts(ending_text)
# Додаємо паузу ПЕРЕД кінцівкою (0.5 сек) та в кінці (1 сек)
audio_ending = AudioSegment.silent(duration=500) + audio_ending + AudioSegment.silent(duration=1000)
# Збираємо фінальне аудіо
final_audio = audio_preview + AudioSegment.silent(duration=500) + audio_story + audio_ending
audio_path = self.temp_folder / f"{story_id}_shorts_voice.wav"
final_audio.export(audio_path, format="wav")
# Створюємо preview сегменти
preview_duration = audio_preview.duration_seconds
story_duration = audio_story.duration_seconds
ending_duration = audio_ending.duration_seconds
preview_clips, bg_iter = self.create_preview_segments_vertical(story_id, preview_duration)
preview_video = self.create_preview_with_overlay_vertical(story_id, preview_clips, horizontal_preview_path, preview_duration)
# Створюємо story сегменти
story_clips = self.create_story_segments_vertical(story_id, bg_iter, story_duration)
# Створюємо кінцівку
ending_clip = self.create_ending_segment(story_id, bg_iter, ending_duration)
# Об'єднуємо всі сегменти
combined_video = self.combine_shorts_segments(story_id, preview_video, story_clips, ending_clip)
return combined_video, audio_path
def finalize_shorts_video(self, combined_video, audio_path, output_path):
"""
Фінальна збірка Shorts відео з аудіо
Args:
combined_video: Шлях до відео без аудіо
audio_path: Шлях до аудіо файлу
output_path: Шлях для збереження фінального відео
"""
print(f"???? Фінальна збірка Shorts відео...")
subprocess.run([
"ffmpeg", "-y", "-i", str(combined_video), "-i", str(audio_path),
"-c:v", "copy", "-c:a", "aac", "-shortest", "-movflags", "+faststart",
str(output_path)
], check=True)