Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
elebumm
GitHub Repository: elebumm/RedditVideoMakerBot
Path: blob/master/video_creation/final_video.py
493 views
1
import multiprocessing
2
import os
3
import re
4
import tempfile
5
import textwrap
6
import threading
7
import time
8
from os.path import exists # Needs to be imported specifically
9
from pathlib import Path
10
from typing import Dict, Final, Tuple
11
12
import ffmpeg
13
import translators
14
from PIL import Image, ImageDraw, ImageFont
15
from rich.console import Console
16
from rich.progress import track
17
18
from utils import settings
19
from utils.cleanup import cleanup
20
from utils.console import print_step, print_substep
21
from utils.fonts import getheight
22
from utils.id import extract_id
23
from utils.thumbnail import create_thumbnail
24
from utils.videos import save_data
25
26
console = Console()
27
28
29
class ProgressFfmpeg(threading.Thread):
30
def __init__(self, vid_duration_seconds, progress_update_callback):
31
threading.Thread.__init__(self, name="ProgressFfmpeg")
32
self.stop_event = threading.Event()
33
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
34
self.vid_duration_seconds = vid_duration_seconds
35
self.progress_update_callback = progress_update_callback
36
37
def run(self):
38
while not self.stop_event.is_set():
39
latest_progress = self.get_latest_ms_progress()
40
if latest_progress is not None:
41
completed_percent = latest_progress / self.vid_duration_seconds
42
self.progress_update_callback(completed_percent)
43
time.sleep(1)
44
45
def get_latest_ms_progress(self):
46
lines = self.output_file.readlines()
47
48
if lines:
49
for line in lines:
50
if "out_time_ms" in line:
51
out_time_ms_str = line.split("=")[1].strip()
52
if out_time_ms_str.isnumeric():
53
return float(out_time_ms_str) / 1000000.0
54
else:
55
# Handle the case when "N/A" is encountered
56
return None
57
return None
58
59
def stop(self):
60
self.stop_event.set()
61
62
def __enter__(self):
63
self.start()
64
return self
65
66
def __exit__(self, *args, **kwargs):
67
self.stop()
68
69
70
def name_normalize(name: str) -> str:
71
name = re.sub(r'[?\\"%*:|<>]', "", name)
72
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
73
name = re.sub(r"( [w,W]\s?\/)", r" with", name)
74
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
75
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
76
name = re.sub(r"\/", r"", name)
77
78
lang = settings.config["reddit"]["thread"]["post_lang"]
79
if lang:
80
print_substep("Translating filename...")
81
translated_name = translators.translate_text(name, translator="google", to_language=lang)
82
return translated_name
83
else:
84
return name
85
86
87
def prepare_background(reddit_id: str, W: int, H: int) -> str:
88
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
89
output = (
90
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
91
.filter("crop", f"ih*({W}/{H})", "ih")
92
.output(
93
output_path,
94
an=None,
95
**{
96
"c:v": "h264_nvenc",
97
"b:v": "20M",
98
"b:a": "192k",
99
"threads": multiprocessing.cpu_count(),
100
},
101
)
102
.overwrite_output()
103
)
104
try:
105
output.run(quiet=True)
106
except ffmpeg.Error as e:
107
print(e.stderr.decode("utf8"))
108
exit(1)
109
return output_path
110
111
112
def get_text_height(draw, text, font, max_width):
113
lines = textwrap.wrap(text, width=max_width)
114
total_height = 0
115
for line in lines:
116
_, _, _, height = draw.textbbox((0, 0), line, font=font)
117
total_height += height
118
return total_height
119
120
121
def create_fancy_thumbnail(image, text, text_color, padding, wrap=35):
122
"""
123
It will take the 1px from the middle of the template and will be resized (stretched) vertically to accommodate the extra height needed for the title.
124
"""
125
print_step(f"Creating fancy thumbnail for: {text}")
126
font_title_size = 47
127
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size)
128
image_width, image_height = image.size
129
130
# Calculate text height to determine new image height
131
draw = ImageDraw.Draw(image)
132
text_height = get_text_height(draw, text, font, wrap)
133
lines = textwrap.wrap(text, width=wrap)
134
# This is -50 to reduce the empty space at the bottom of the image,
135
# change it as per your requirement if needed otherwise leave it.
136
new_image_height = image_height + text_height + padding * (len(lines) - 1) - 50
137
138
# Separate the image into top, middle (1px), and bottom parts
139
top_part_height = image_height // 2
140
middle_part_height = 1 # 1px height middle section
141
bottom_part_height = image_height - top_part_height - middle_part_height
142
143
top_part = image.crop((0, 0, image_width, top_part_height))
144
middle_part = image.crop((0, top_part_height, image_width, top_part_height + middle_part_height))
145
bottom_part = image.crop((0, top_part_height + middle_part_height, image_width, image_height))
146
147
# Stretch the middle part
148
new_middle_height = new_image_height - top_part_height - bottom_part_height
149
middle_part = middle_part.resize((image_width, new_middle_height))
150
151
# Create new image with the calculated height
152
new_image = Image.new("RGBA", (image_width, new_image_height))
153
154
# Paste the top, stretched middle, and bottom parts into the new image
155
new_image.paste(top_part, (0, 0))
156
new_image.paste(middle_part, (0, top_part_height))
157
new_image.paste(bottom_part, (0, top_part_height + new_middle_height))
158
159
# Draw the title text on the new image
160
draw = ImageDraw.Draw(new_image)
161
y = top_part_height + padding
162
for line in lines:
163
draw.text((120, y), line, font=font, fill=text_color, align="left")
164
y += get_text_height(draw, line, font, wrap) + padding
165
166
# Draw the username "PlotPulse" at the specific position
167
username_font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 30)
168
draw.text(
169
(205, 825),
170
settings.config["settings"]["channel_name"],
171
font=username_font,
172
fill=text_color,
173
align="left",
174
)
175
176
return new_image
177
178
179
def merge_background_audio(audio: ffmpeg, reddit_id: str):
180
"""Gather an audio and merge with assets/backgrounds/background.mp3
181
Args:
182
audio (ffmpeg): The TTS final audio but without background.
183
reddit_id (str): The ID of subreddit
184
"""
185
background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"]
186
if background_audio_volume == 0:
187
return audio # Return the original audio
188
else:
189
# sets volume to config
190
bg_audio = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3").filter(
191
"volume",
192
background_audio_volume,
193
)
194
# Merges audio and background_audio
195
merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest")
196
return merged_audio # Return merged audio
197
198
199
def make_final_video(
200
number_of_clips: int,
201
length: int,
202
reddit_obj: dict,
203
background_config: Dict[str, Tuple],
204
):
205
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
206
Args:
207
number_of_clips (int): Index to end at when going through the screenshots'
208
length (int): Length of the video
209
reddit_obj (dict): The reddit object that contains the posts to read.
210
background_config (Tuple[str, str, str, Any]): The background config to use.
211
"""
212
# settings values
213
W: Final[int] = int(settings.config["settings"]["resolution_w"])
214
H: Final[int] = int(settings.config["settings"]["resolution_h"])
215
216
opacity = settings.config["settings"]["opacity"]
217
218
reddit_id = extract_id(reddit_obj)
219
220
allowOnlyTTSFolder: bool = (
221
settings.config["settings"]["background"]["enable_extra_audio"]
222
and settings.config["settings"]["background"]["background_audio_volume"] != 0
223
)
224
225
print_step("Creating the final video 🎥")
226
227
background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H))
228
229
# Gather all audio clips
230
audio_clips = list()
231
if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false":
232
print(
233
"No audio clips to gather. Please use a different TTS or post."
234
) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
235
exit()
236
if settings.config["settings"]["storymode"]:
237
if settings.config["settings"]["storymodemethod"] == 0:
238
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
239
audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
240
elif settings.config["settings"]["storymodemethod"] == 1:
241
audio_clips = [
242
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
243
for i in track(range(number_of_clips + 1), "Collecting the audio files...")
244
]
245
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
246
247
else:
248
audio_clips = [
249
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)
250
]
251
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
252
253
audio_clips_durations = [
254
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"]["duration"])
255
for i in range(number_of_clips)
256
]
257
audio_clips_durations.insert(
258
0,
259
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
260
)
261
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
262
ffmpeg.output(
263
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}
264
).overwrite_output().run(quiet=True)
265
266
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
267
268
screenshot_width = int((W * 45) // 100)
269
audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3")
270
final_audio = merge_background_audio(audio, reddit_id)
271
272
image_clips = list()
273
274
Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True)
275
276
# Credits to tim (beingbored)
277
# get the title_template image and draw a text in the middle part of it with the title of the thread
278
title_template = Image.open("assets/title_template.png")
279
280
title = reddit_obj["thread_title"]
281
282
title = name_normalize(title)
283
284
font_color = "#000000"
285
padding = 5
286
287
# create_fancy_thumbnail(image, text, text_color, padding
288
title_img = create_fancy_thumbnail(title_template, title, font_color, padding)
289
290
title_img.save(f"assets/temp/{reddit_id}/png/title.png")
291
image_clips.insert(
292
0,
293
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter(
294
"scale", screenshot_width, -1
295
),
296
)
297
298
current_time = 0
299
if settings.config["settings"]["storymode"]:
300
audio_clips_durations = [
301
float(
302
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"]
303
)
304
for i in range(number_of_clips)
305
]
306
audio_clips_durations.insert(
307
0,
308
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
309
)
310
if settings.config["settings"]["storymodemethod"] == 0:
311
image_clips.insert(
312
1,
313
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
314
"scale", screenshot_width, -1
315
),
316
)
317
background_clip = background_clip.overlay(
318
image_clips[0],
319
enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})",
320
x="(main_w-overlay_w)/2",
321
y="(main_h-overlay_h)/2",
322
)
323
current_time += audio_clips_durations[0]
324
elif settings.config["settings"]["storymodemethod"] == 1:
325
for i in track(range(0, number_of_clips + 1), "Collecting the image files..."):
326
image_clips.append(
327
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
328
"scale", screenshot_width, -1
329
)
330
)
331
background_clip = background_clip.overlay(
332
image_clips[i],
333
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
334
x="(main_w-overlay_w)/2",
335
y="(main_h-overlay_h)/2",
336
)
337
current_time += audio_clips_durations[i]
338
else:
339
for i in range(0, number_of_clips + 1):
340
image_clips.append(
341
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")["v"].filter(
342
"scale", screenshot_width, -1
343
)
344
)
345
image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity)
346
assert (
347
audio_clips_durations is not None
348
), "Please make a GitHub issue if you see this. Ping @JasonLovesDoggo on GitHub."
349
background_clip = background_clip.overlay(
350
image_overlay,
351
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
352
x="(main_w-overlay_w)/2",
353
y="(main_h-overlay_h)/2",
354
)
355
current_time += audio_clips_durations[i]
356
357
title = extract_id(reddit_obj, "thread_title")
358
idx = extract_id(reddit_obj)
359
title_thumb = reddit_obj["thread_title"]
360
361
filename = f"{name_normalize(title)[:251]}"
362
subreddit = settings.config["reddit"]["thread"]["subreddit"]
363
364
if not exists(f"./results/{subreddit}"):
365
print_substep("The 'results' folder could not be found so it was automatically created.")
366
os.makedirs(f"./results/{subreddit}")
367
368
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder:
369
print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.")
370
os.makedirs(f"./results/{subreddit}/OnlyTTS")
371
372
# create a thumbnail for the video
373
settingsbackground = settings.config["settings"]["background"]
374
375
if settingsbackground["background_thumbnail"]:
376
if not exists(f"./results/{subreddit}/thumbnails"):
377
print_substep(
378
"The 'results/thumbnails' folder could not be found so it was automatically created."
379
)
380
os.makedirs(f"./results/{subreddit}/thumbnails")
381
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
382
first_image = next(
383
(file for file in os.listdir("assets/backgrounds") if file.endswith(".png")),
384
None,
385
)
386
if first_image is None:
387
print_substep("No png files found in assets/backgrounds", "red")
388
389
else:
390
font_family = settingsbackground["background_thumbnail_font_family"]
391
font_size = settingsbackground["background_thumbnail_font_size"]
392
font_color = settingsbackground["background_thumbnail_font_color"]
393
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
394
width, height = thumbnail.size
395
thumbnailSave = create_thumbnail(
396
thumbnail,
397
font_family,
398
font_size,
399
font_color,
400
width,
401
height,
402
title_thumb,
403
)
404
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
405
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
406
407
text = f"Background by {background_config['video'][2]}"
408
background_clip = ffmpeg.drawtext(
409
background_clip,
410
text=text,
411
x=f"(w-text_w)",
412
y=f"(h-text_h)",
413
fontsize=5,
414
fontcolor="White",
415
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
416
)
417
background_clip = background_clip.filter("scale", W, H)
418
print_step("Rendering the video 🎥")
419
from tqdm import tqdm
420
421
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
422
423
def on_update_example(progress) -> None:
424
status = round(progress * 100, 2)
425
old_percentage = pbar.n
426
pbar.update(status - old_percentage)
427
428
defaultPath = f"results/{subreddit}"
429
with ProgressFfmpeg(length, on_update_example) as progress:
430
path = defaultPath + f"/{filename}"
431
path = (
432
path[:251] + ".mp4"
433
) # Prevent a error by limiting the path length, do not change this.
434
try:
435
ffmpeg.output(
436
background_clip,
437
final_audio,
438
path,
439
f="mp4",
440
**{
441
"c:v": "h264_nvenc",
442
"b:v": "20M",
443
"b:a": "192k",
444
"threads": multiprocessing.cpu_count(),
445
},
446
).overwrite_output().global_args("-progress", progress.output_file.name).run(
447
quiet=True,
448
overwrite_output=True,
449
capture_stdout=False,
450
capture_stderr=False,
451
)
452
except ffmpeg.Error as e:
453
print(e.stderr.decode("utf8"))
454
exit(1)
455
old_percentage = pbar.n
456
pbar.update(100 - old_percentage)
457
if allowOnlyTTSFolder:
458
path = defaultPath + f"/OnlyTTS/{filename}"
459
path = (
460
path[:251] + ".mp4"
461
) # Prevent a error by limiting the path length, do not change this.
462
print_step("Rendering the Only TTS Video 🎥")
463
with ProgressFfmpeg(length, on_update_example) as progress:
464
try:
465
ffmpeg.output(
466
background_clip,
467
audio,
468
path,
469
f="mp4",
470
**{
471
"c:v": "h264_nvenc",
472
"b:v": "20M",
473
"b:a": "192k",
474
"threads": multiprocessing.cpu_count(),
475
},
476
).overwrite_output().global_args("-progress", progress.output_file.name).run(
477
quiet=True,
478
overwrite_output=True,
479
capture_stdout=False,
480
capture_stderr=False,
481
)
482
except ffmpeg.Error as e:
483
print(e.stderr.decode("utf8"))
484
exit(1)
485
486
old_percentage = pbar.n
487
pbar.update(100 - old_percentage)
488
pbar.close()
489
save_data(subreddit, filename + ".mp4", title, idx, background_config["video"][2])
490
print_step("Removing temporary files 🗑")
491
cleanups = cleanup(reddit_id)
492
print_substep(f"Removed {cleanups} temporary files 🗑")
493
print_step("Done! 🎉 The video is in the results folder 📁")
494
495