Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
elebumm
GitHub Repository: elebumm/RedditVideoMakerBot
Path: blob/master/TTS/engine_wrapper.py
327 views
1
import os
2
import re
3
from pathlib import Path
4
from typing import Tuple
5
6
import numpy as np
7
import translators
8
from moviepy.audio.AudioClip import AudioClip
9
from moviepy.audio.fx.volumex import volumex
10
from moviepy.editor import AudioFileClip
11
from rich.progress import track
12
13
from utils import settings
14
from utils.console import print_step, print_substep
15
from utils.voice import sanitize_text
16
17
DEFAULT_MAX_LENGTH: int = (
18
50 # Video length variable, edit this on your own risk. It should work, but it's not supported
19
)
20
21
22
class TTSEngine:
23
"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.
24
25
Args:
26
tts_module : The TTS module. Your module should handle the TTS itself and saving to the given path under the run method.
27
reddit_object : The reddit object that contains the posts to read.
28
path (Optional) : The unix style path to save the mp3 files to. This must not have leading or trailing slashes.
29
max_length (Optional) : The maximum length of the mp3 files in total.
30
31
Notes:
32
tts_module must take the arguments text and filepath.
33
"""
34
35
def __init__(
36
self,
37
tts_module,
38
reddit_object: dict,
39
path: str = "assets/temp/",
40
max_length: int = DEFAULT_MAX_LENGTH,
41
last_clip_length: int = 0,
42
):
43
self.tts_module = tts_module()
44
self.reddit_object = reddit_object
45
46
self.redditid = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
47
self.path = path + self.redditid + "/mp3"
48
self.max_length = max_length
49
self.length = 0
50
self.last_clip_length = last_clip_length
51
52
def add_periods(
53
self,
54
): # adds periods to the end of paragraphs (where people often forget to put them) so tts doesn't blend sentences
55
for comment in self.reddit_object["comments"]:
56
# remove links
57
regex_urls = r"((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*"
58
comment["comment_body"] = re.sub(regex_urls, " ", comment["comment_body"])
59
comment["comment_body"] = comment["comment_body"].replace("\n", ". ")
60
comment["comment_body"] = re.sub(r"\bAI\b", "A.I", comment["comment_body"])
61
comment["comment_body"] = re.sub(r"\bAGI\b", "A.G.I", comment["comment_body"])
62
if comment["comment_body"][-1] != ".":
63
comment["comment_body"] += "."
64
comment["comment_body"] = comment["comment_body"].replace(". . .", ".")
65
comment["comment_body"] = comment["comment_body"].replace(".. . ", ".")
66
comment["comment_body"] = comment["comment_body"].replace(". . ", ".")
67
comment["comment_body"] = re.sub(r'\."\.', '".', comment["comment_body"])
68
69
def run(self) -> Tuple[int, int]:
70
Path(self.path).mkdir(parents=True, exist_ok=True)
71
print_step("Saving Text to MP3 files...")
72
73
self.add_periods()
74
self.call_tts("title", process_text(self.reddit_object["thread_title"]))
75
# processed_text = ##self.reddit_object["thread_post"] != ""
76
idx = 0
77
78
if settings.config["settings"]["storymode"]:
79
if settings.config["settings"]["storymodemethod"] == 0:
80
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
81
self.split_post(self.reddit_object["thread_post"], "postaudio")
82
else:
83
self.call_tts("postaudio", process_text(self.reddit_object["thread_post"]))
84
elif settings.config["settings"]["storymodemethod"] == 1:
85
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
86
self.call_tts(f"postaudio-{idx}", process_text(text))
87
88
else:
89
for idx, comment in track(enumerate(self.reddit_object["comments"]), "Saving..."):
90
# ! Stop creating mp3 files if the length is greater than max length.
91
if self.length > self.max_length and idx > 1:
92
self.length -= self.last_clip_length
93
idx -= 1
94
break
95
if (
96
len(comment["comment_body"]) > self.tts_module.max_chars
97
): # Split the comment if it is too long
98
self.split_post(comment["comment_body"], idx) # Split the comment
99
else: # If the comment is not too long, just call the tts engine
100
self.call_tts(f"{idx}", process_text(comment["comment_body"]))
101
102
print_substep("Saved Text to MP3 files successfully.", style="bold green")
103
return self.length, idx
104
105
def split_post(self, text: str, idx):
106
split_files = []
107
split_text = [
108
x.group().strip()
109
for x in re.finditer(
110
r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text
111
)
112
]
113
self.create_silence_mp3()
114
115
idy = None
116
for idy, text_cut in enumerate(split_text):
117
newtext = process_text(text_cut)
118
# print(f"{idx}-{idy}: {newtext}\n")
119
120
if not newtext or newtext.isspace():
121
print("newtext was blank because sanitized split text resulted in none")
122
continue
123
else:
124
self.call_tts(f"{idx}-{idy}.part", newtext)
125
with open(f"{self.path}/list.txt", "w") as f:
126
for idz in range(0, len(split_text)):
127
f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n")
128
split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
129
f.write("file " + f"'silence.mp3'" + "\n")
130
131
os.system(
132
"ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 "
133
+ "-i "
134
+ f"{self.path}/list.txt "
135
+ "-c copy "
136
+ f"{self.path}/{idx}.mp3"
137
)
138
try:
139
for i in range(0, len(split_files)):
140
os.unlink(split_files[i])
141
except FileNotFoundError as e:
142
print("File not found: " + e.filename)
143
except OSError:
144
print("OSError")
145
146
def call_tts(self, filename: str, text: str):
147
self.tts_module.run(
148
text,
149
filepath=f"{self.path}/{filename}.mp3",
150
random_voice=settings.config["settings"]["tts"]["random_voice"],
151
)
152
# try:
153
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
154
# except (MutagenError, HeaderNotFoundError):
155
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
156
try:
157
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
158
self.last_clip_length = clip.duration
159
self.length += clip.duration
160
clip.close()
161
except:
162
self.length = 0
163
164
def create_silence_mp3(self):
165
silence_duration = settings.config["settings"]["tts"]["silence_duration"]
166
silence = AudioClip(
167
make_frame=lambda t: np.sin(440 * 2 * np.pi * t),
168
duration=silence_duration,
169
fps=44100,
170
)
171
silence = volumex(silence, 0)
172
silence.write_audiofile(f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None)
173
174
175
def process_text(text: str, clean: bool = True):
176
lang = settings.config["reddit"]["thread"]["post_lang"]
177
new_text = sanitize_text(text) if clean else text
178
if lang:
179
print_substep("Translating Text...")
180
translated_text = translators.translate_text(text, translator="google", to_language=lang)
181
new_text = sanitize_text(translated_text)
182
return new_text
183
184