Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
elebumm
GitHub Repository: elebumm/RedditVideoMakerBot
Path: blob/master/video_creation/final_video.py
327 views
1
import multiprocessing
2
import os
3
import re
4
import tempfile
5
import textwrap
6
import threading
7
import time
8
from os.path import exists # Needs to be imported specifically
9
from pathlib import Path
10
from typing import Dict, Final, Tuple
11
12
import ffmpeg
13
import translators
14
from PIL import Image, ImageDraw, ImageFont
15
from rich.console import Console
16
from rich.progress import track
17
18
from utils import settings
19
from utils.cleanup import cleanup
20
from utils.console import print_step, print_substep
21
from utils.fonts import getheight
22
from utils.thumbnail import create_thumbnail
23
from utils.videos import save_data
24
25
console = Console()
26
27
28
class ProgressFfmpeg(threading.Thread):
29
def __init__(self, vid_duration_seconds, progress_update_callback):
30
threading.Thread.__init__(self, name="ProgressFfmpeg")
31
self.stop_event = threading.Event()
32
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
33
self.vid_duration_seconds = vid_duration_seconds
34
self.progress_update_callback = progress_update_callback
35
36
def run(self):
37
while not self.stop_event.is_set():
38
latest_progress = self.get_latest_ms_progress()
39
if latest_progress is not None:
40
completed_percent = latest_progress / self.vid_duration_seconds
41
self.progress_update_callback(completed_percent)
42
time.sleep(1)
43
44
def get_latest_ms_progress(self):
45
lines = self.output_file.readlines()
46
47
if lines:
48
for line in lines:
49
if "out_time_ms" in line:
50
out_time_ms_str = line.split("=")[1].strip()
51
if out_time_ms_str.isnumeric():
52
return float(out_time_ms_str) / 1000000.0
53
else:
54
# Handle the case when "N/A" is encountered
55
return None
56
return None
57
58
def stop(self):
59
self.stop_event.set()
60
61
def __enter__(self):
62
self.start()
63
return self
64
65
def __exit__(self, *args, **kwargs):
66
self.stop()
67
68
69
def name_normalize(name: str) -> str:
70
name = re.sub(r'[?\\"%*:|<>]', "", name)
71
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
72
name = re.sub(r"( [w,W]\s?\/)", r" with", name)
73
name = re.sub(r"(\d+)\s?\/\s?(\d+)", r"\1 of \2", name)
74
name = re.sub(r"(\w+)\s?\/\s?(\w+)", r"\1 or \2", name)
75
name = re.sub(r"\/", r"", name)
76
77
lang = settings.config["reddit"]["thread"]["post_lang"]
78
if lang:
79
print_substep("Translating filename...")
80
translated_name = translators.translate_text(name, translator="google", to_language=lang)
81
return translated_name
82
else:
83
return name
84
85
86
def prepare_background(reddit_id: str, W: int, H: int) -> str:
87
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
88
output = (
89
ffmpeg.input(f"assets/temp/{reddit_id}/background.mp4")
90
.filter("crop", f"ih*({W}/{H})", "ih")
91
.output(
92
output_path,
93
an=None,
94
**{
95
"c:v": "h264",
96
"b:v": "20M",
97
"b:a": "192k",
98
"threads": multiprocessing.cpu_count(),
99
},
100
)
101
.overwrite_output()
102
)
103
try:
104
output.run(quiet=True)
105
except ffmpeg.Error as e:
106
print(e.stderr.decode("utf8"))
107
exit(1)
108
return output_path
109
110
111
def create_fancy_thumbnail(image, text, text_color, padding, wrap=35):
112
print_step(f"Creating fancy thumbnail for: {text}")
113
font_title_size = 47
114
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size)
115
image_width, image_height = image.size
116
lines = textwrap.wrap(text, width=wrap)
117
y = (
118
(image_height / 2)
119
- (((getheight(font, text) + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
120
+ 30
121
)
122
draw = ImageDraw.Draw(image)
123
124
username_font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), 30)
125
draw.text(
126
(205, 825),
127
settings.config["settings"]["channel_name"],
128
font=username_font,
129
fill=text_color,
130
align="left",
131
)
132
133
if len(lines) == 3:
134
lines = textwrap.wrap(text, width=wrap + 10)
135
font_title_size = 40
136
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size)
137
y = (
138
(image_height / 2)
139
- (((getheight(font, text) + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
140
+ 35
141
)
142
elif len(lines) == 4:
143
lines = textwrap.wrap(text, width=wrap + 10)
144
font_title_size = 35
145
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size)
146
y = (
147
(image_height / 2)
148
- (((getheight(font, text) + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
149
+ 40
150
)
151
elif len(lines) > 4:
152
lines = textwrap.wrap(text, width=wrap + 10)
153
font_title_size = 30
154
font = ImageFont.truetype(os.path.join("fonts", "Roboto-Bold.ttf"), font_title_size)
155
y = (
156
(image_height / 2)
157
- (((getheight(font, text) + (len(lines) * padding) / len(lines)) * len(lines)) / 2)
158
+ 30
159
)
160
161
for line in lines:
162
draw.text((120, y), line, font=font, fill=text_color, align="left")
163
y += getheight(font, line) + padding
164
165
return image
166
167
168
def merge_background_audio(audio: ffmpeg, reddit_id: str):
169
"""Gather an audio and merge with assets/backgrounds/background.mp3
170
Args:
171
audio (ffmpeg): The TTS final audio but without background.
172
reddit_id (str): The ID of subreddit
173
"""
174
background_audio_volume = settings.config["settings"]["background"]["background_audio_volume"]
175
if background_audio_volume == 0:
176
return audio # Return the original audio
177
else:
178
# sets volume to config
179
bg_audio = ffmpeg.input(f"assets/temp/{reddit_id}/background.mp3").filter(
180
"volume",
181
background_audio_volume,
182
)
183
# Merges audio and background_audio
184
merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest")
185
return merged_audio # Return merged audio
186
187
188
def make_final_video(
189
number_of_clips: int,
190
length: int,
191
reddit_obj: dict,
192
background_config: Dict[str, Tuple],
193
):
194
"""Gathers audio clips, gathers all screenshots, stitches them together and saves the final video to assets/temp
195
Args:
196
number_of_clips (int): Index to end at when going through the screenshots'
197
length (int): Length of the video
198
reddit_obj (dict): The reddit object that contains the posts to read.
199
background_config (Tuple[str, str, str, Any]): The background config to use.
200
"""
201
# settings values
202
W: Final[int] = int(settings.config["settings"]["resolution_w"])
203
H: Final[int] = int(settings.config["settings"]["resolution_h"])
204
205
opacity = settings.config["settings"]["opacity"]
206
207
reddit_id = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
208
209
allowOnlyTTSFolder: bool = (
210
settings.config["settings"]["background"]["enable_extra_audio"]
211
and settings.config["settings"]["background"]["background_audio_volume"] != 0
212
)
213
214
print_step("Creating the final video 🎥")
215
216
background_clip = ffmpeg.input(prepare_background(reddit_id, W=W, H=H))
217
218
# Gather all audio clips
219
audio_clips = list()
220
if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false":
221
print(
222
"No audio clips to gather. Please use a different TTS or post."
223
) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
224
exit()
225
if settings.config["settings"]["storymode"]:
226
if settings.config["settings"]["storymodemethod"] == 0:
227
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
228
audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
229
elif settings.config["settings"]["storymodemethod"] == 1:
230
audio_clips = [
231
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")
232
for i in track(range(number_of_clips + 1), "Collecting the audio files...")
233
]
234
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
235
236
else:
237
audio_clips = [
238
ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{i}.mp3") for i in range(number_of_clips)
239
]
240
audio_clips.insert(0, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3"))
241
242
audio_clips_durations = [
243
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{i}.mp3")["format"]["duration"])
244
for i in range(number_of_clips)
245
]
246
audio_clips_durations.insert(
247
0,
248
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
249
)
250
audio_concat = ffmpeg.concat(*audio_clips, a=1, v=0)
251
ffmpeg.output(
252
audio_concat, f"assets/temp/{reddit_id}/audio.mp3", **{"b:a": "192k"}
253
).overwrite_output().run(quiet=True)
254
255
console.log(f"[bold green] Video Will Be: {length} Seconds Long")
256
257
screenshot_width = int((W * 45) // 100)
258
audio = ffmpeg.input(f"assets/temp/{reddit_id}/audio.mp3")
259
final_audio = merge_background_audio(audio, reddit_id)
260
261
image_clips = list()
262
263
Path(f"assets/temp/{reddit_id}/png").mkdir(parents=True, exist_ok=True)
264
265
# Credits to tim (beingbored)
266
# get the title_template image and draw a text in the middle part of it with the title of the thread
267
title_template = Image.open("assets/title_template.png")
268
269
title = reddit_obj["thread_title"]
270
271
title = name_normalize(title)
272
273
font_color = "#000000"
274
padding = 5
275
276
# create_fancy_thumbnail(image, text, text_color, padding
277
title_img = create_fancy_thumbnail(title_template, title, font_color, padding)
278
279
title_img.save(f"assets/temp/{reddit_id}/png/title.png")
280
image_clips.insert(
281
0,
282
ffmpeg.input(f"assets/temp/{reddit_id}/png/title.png")["v"].filter(
283
"scale", screenshot_width, -1
284
),
285
)
286
287
current_time = 0
288
if settings.config["settings"]["storymode"]:
289
audio_clips_durations = [
290
float(
291
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"]
292
)
293
for i in range(number_of_clips)
294
]
295
audio_clips_durations.insert(
296
0,
297
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"]),
298
)
299
if settings.config["settings"]["storymodemethod"] == 0:
300
image_clips.insert(
301
1,
302
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
303
"scale", screenshot_width, -1
304
),
305
)
306
background_clip = background_clip.overlay(
307
image_clips[0],
308
enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})",
309
x="(main_w-overlay_w)/2",
310
y="(main_h-overlay_h)/2",
311
)
312
current_time += audio_clips_durations[0]
313
elif settings.config["settings"]["storymodemethod"] == 1:
314
for i in track(range(0, number_of_clips + 1), "Collecting the image files..."):
315
image_clips.append(
316
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
317
"scale", screenshot_width, -1
318
)
319
)
320
background_clip = background_clip.overlay(
321
image_clips[i],
322
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
323
x="(main_w-overlay_w)/2",
324
y="(main_h-overlay_h)/2",
325
)
326
current_time += audio_clips_durations[i]
327
else:
328
for i in range(0, number_of_clips + 1):
329
image_clips.append(
330
ffmpeg.input(f"assets/temp/{reddit_id}/png/comment_{i}.png")["v"].filter(
331
"scale", screenshot_width, -1
332
)
333
)
334
image_overlay = image_clips[i].filter("colorchannelmixer", aa=opacity)
335
assert (
336
audio_clips_durations is not None
337
), "Please make a GitHub issue if you see this. Ping @JasonLovesDoggo on GitHub."
338
background_clip = background_clip.overlay(
339
image_overlay,
340
enable=f"between(t,{current_time},{current_time + audio_clips_durations[i]})",
341
x="(main_w-overlay_w)/2",
342
y="(main_h-overlay_h)/2",
343
)
344
current_time += audio_clips_durations[i]
345
346
title = re.sub(r"[^\w\s-]", "", reddit_obj["thread_title"])
347
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
348
title_thumb = reddit_obj["thread_title"]
349
350
filename = f"{name_normalize(title)[:251]}"
351
subreddit = settings.config["reddit"]["thread"]["subreddit"]
352
353
if not exists(f"./results/{subreddit}"):
354
print_substep("The 'results' folder could not be found so it was automatically created.")
355
os.makedirs(f"./results/{subreddit}")
356
357
if not exists(f"./results/{subreddit}/OnlyTTS") and allowOnlyTTSFolder:
358
print_substep("The 'OnlyTTS' folder could not be found so it was automatically created.")
359
os.makedirs(f"./results/{subreddit}/OnlyTTS")
360
361
# create a thumbnail for the video
362
settingsbackground = settings.config["settings"]["background"]
363
364
if settingsbackground["background_thumbnail"]:
365
if not exists(f"./results/{subreddit}/thumbnails"):
366
print_substep(
367
"The 'results/thumbnails' folder could not be found so it was automatically created."
368
)
369
os.makedirs(f"./results/{subreddit}/thumbnails")
370
# get the first file with the .png extension from assets/backgrounds and use it as a background for the thumbnail
371
first_image = next(
372
(file for file in os.listdir("assets/backgrounds") if file.endswith(".png")),
373
None,
374
)
375
if first_image is None:
376
print_substep("No png files found in assets/backgrounds", "red")
377
378
else:
379
font_family = settingsbackground["background_thumbnail_font_family"]
380
font_size = settingsbackground["background_thumbnail_font_size"]
381
font_color = settingsbackground["background_thumbnail_font_color"]
382
thumbnail = Image.open(f"assets/backgrounds/{first_image}")
383
width, height = thumbnail.size
384
thumbnailSave = create_thumbnail(
385
thumbnail,
386
font_family,
387
font_size,
388
font_color,
389
width,
390
height,
391
title_thumb,
392
)
393
thumbnailSave.save(f"./assets/temp/{reddit_id}/thumbnail.png")
394
print_substep(f"Thumbnail - Building Thumbnail in assets/temp/{reddit_id}/thumbnail.png")
395
396
text = f"Background by {background_config['video'][2]}"
397
background_clip = ffmpeg.drawtext(
398
background_clip,
399
text=text,
400
x=f"(w-text_w)",
401
y=f"(h-text_h)",
402
fontsize=5,
403
fontcolor="White",
404
fontfile=os.path.join("fonts", "Roboto-Regular.ttf"),
405
)
406
background_clip = background_clip.filter("scale", W, H)
407
print_step("Rendering the video 🎥")
408
from tqdm import tqdm
409
410
pbar = tqdm(total=100, desc="Progress: ", bar_format="{l_bar}{bar}", unit=" %")
411
412
def on_update_example(progress) -> None:
413
status = round(progress * 100, 2)
414
old_percentage = pbar.n
415
pbar.update(status - old_percentage)
416
417
defaultPath = f"results/{subreddit}"
418
with ProgressFfmpeg(length, on_update_example) as progress:
419
path = defaultPath + f"/{filename}"
420
path = (
421
path[:251] + ".mp4"
422
) # Prevent a error by limiting the path length, do not change this.
423
try:
424
ffmpeg.output(
425
background_clip,
426
final_audio,
427
path,
428
f="mp4",
429
**{
430
"c:v": "h264",
431
"b:v": "20M",
432
"b:a": "192k",
433
"threads": multiprocessing.cpu_count(),
434
},
435
).overwrite_output().global_args("-progress", progress.output_file.name).run(
436
quiet=True,
437
overwrite_output=True,
438
capture_stdout=False,
439
capture_stderr=False,
440
)
441
except ffmpeg.Error as e:
442
print(e.stderr.decode("utf8"))
443
exit(1)
444
old_percentage = pbar.n
445
pbar.update(100 - old_percentage)
446
if allowOnlyTTSFolder:
447
path = defaultPath + f"/OnlyTTS/{filename}"
448
path = (
449
path[:251] + ".mp4"
450
) # Prevent a error by limiting the path length, do not change this.
451
print_step("Rendering the Only TTS Video 🎥")
452
with ProgressFfmpeg(length, on_update_example) as progress:
453
try:
454
ffmpeg.output(
455
background_clip,
456
audio,
457
path,
458
f="mp4",
459
**{
460
"c:v": "h264",
461
"b:v": "20M",
462
"b:a": "192k",
463
"threads": multiprocessing.cpu_count(),
464
},
465
).overwrite_output().global_args("-progress", progress.output_file.name).run(
466
quiet=True,
467
overwrite_output=True,
468
capture_stdout=False,
469
capture_stderr=False,
470
)
471
except ffmpeg.Error as e:
472
print(e.stderr.decode("utf8"))
473
exit(1)
474
475
old_percentage = pbar.n
476
pbar.update(100 - old_percentage)
477
pbar.close()
478
save_data(subreddit, filename + ".mp4", title, idx, background_config["video"][2])
479
print_step("Removing temporary files 🗑")
480
cleanups = cleanup(reddit_id)
481
print_substep(f"Removed {cleanups} temporary files 🗑")
482
print_step("Done! 🎉 The video is in the results folder 📁")
483
484