Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/ext_utils/bot_utils.py
1630 views
1
from httpx import AsyncClient
2
from asyncio.subprocess import PIPE
3
from functools import partial, wraps
4
from concurrent.futures import ThreadPoolExecutor
5
from asyncio import (
6
create_subprocess_exec,
7
create_subprocess_shell,
8
run_coroutine_threadsafe,
9
sleep,
10
)
11
12
from ... import user_data, bot_loop
13
from ...core.config_manager import Config
14
from ..telegram_helper.button_build import ButtonMaker
15
from .telegraph_helper import telegraph
16
from .help_messages import (
17
YT_HELP_DICT,
18
MIRROR_HELP_DICT,
19
CLONE_HELP_DICT,
20
)
21
22
COMMAND_USAGE = {}
23
24
THREAD_POOL = ThreadPoolExecutor(max_workers=500)
25
26
27
class SetInterval:
28
def __init__(self, interval, action, *args, **kwargs):
29
self.interval = interval
30
self.action = action
31
self.task = bot_loop.create_task(self._set_interval(*args, **kwargs))
32
33
async def _set_interval(self, *args, **kwargs):
34
while True:
35
await sleep(self.interval)
36
await self.action(*args, **kwargs)
37
38
def cancel(self):
39
self.task.cancel()
40
41
42
def _build_command_usage(help_dict, command_key):
43
buttons = ButtonMaker()
44
for name in list(help_dict.keys())[1:]:
45
buttons.data_button(name, f"help {command_key} {name}")
46
buttons.data_button("Close", "help close")
47
COMMAND_USAGE[command_key] = [help_dict["main"], buttons.build_menu(3)]
48
buttons.reset()
49
50
51
def create_help_buttons():
52
_build_command_usage(MIRROR_HELP_DICT, "mirror")
53
_build_command_usage(YT_HELP_DICT, "yt")
54
_build_command_usage(CLONE_HELP_DICT, "clone")
55
56
57
def bt_selection_buttons(id_):
58
gid = id_[:12] if len(id_) > 25 else id_
59
pin = "".join([n for n in id_ if n.isdigit()][:4])
60
buttons = ButtonMaker()
61
if Config.WEB_PINCODE:
62
buttons.url_button("Select Files", f"{Config.BASE_URL}/app/files?gid={id_}")
63
buttons.data_button("Pincode", f"sel pin {gid} {pin}")
64
else:
65
buttons.url_button(
66
"Select Files", f"{Config.BASE_URL}/app/files?gid={id_}&pin={pin}"
67
)
68
buttons.data_button("Done Selecting", f"sel done {gid} {id_}")
69
buttons.data_button("Cancel", f"sel cancel {gid}")
70
return buttons.build_menu(2)
71
72
73
async def get_telegraph_list(telegraph_content):
74
path = [
75
(
76
await telegraph.create_page(
77
title="Mirror-Leech-Bot Drive Search", content=content
78
)
79
)["path"]
80
for content in telegraph_content
81
]
82
if len(path) > 1:
83
await telegraph.edit_telegraph(path, telegraph_content)
84
buttons = ButtonMaker()
85
buttons.url_button("🔎 VIEW", f"https://telegra.ph/{path[0]}")
86
return buttons.build_menu(1)
87
88
89
def arg_parser(items, arg_base):
90
91
if not items:
92
return
93
94
arg_start = -1
95
i = 0
96
total = len(items)
97
98
bool_arg_set = {
99
"-b",
100
"-e",
101
"-z",
102
"-s",
103
"-j",
104
"-d",
105
"-sv",
106
"-ss",
107
"-f",
108
"-fd",
109
"-fu",
110
"-sync",
111
"-hl",
112
"-doc",
113
"-med",
114
"-ut",
115
"-bt",
116
}
117
118
while i < total:
119
part = items[i]
120
121
if part in arg_base:
122
if arg_start == -1:
123
arg_start = i
124
125
if (
126
i + 1 == total
127
and part in bool_arg_set
128
or part
129
in [
130
"-s",
131
"-j",
132
"-f",
133
"-fd",
134
"-fu",
135
"-sync",
136
"-hl",
137
"-doc",
138
"-med",
139
"-ut",
140
"-bt",
141
]
142
):
143
arg_base[part] = True
144
else:
145
sub_list = []
146
for j in range(i + 1, total):
147
if items[j] in arg_base:
148
if part == "-c" and items[j] == "-c":
149
sub_list.append(items[j])
150
continue
151
if part in bool_arg_set and not sub_list:
152
arg_base[part] = True
153
break
154
if not sub_list:
155
break
156
check = " ".join(sub_list).strip()
157
if part != "-ff":
158
break
159
if check.startswith("[") and check.endswith("]"):
160
break
161
elif not check.startswith("["):
162
break
163
sub_list.append(items[j])
164
if sub_list:
165
value = " ".join(sub_list)
166
if part == "-ff":
167
if not value.strip().startswith("["):
168
arg_base[part].add(value)
169
else:
170
try:
171
arg_base[part].add(tuple(eval(value)))
172
except:
173
pass
174
else:
175
arg_base[part] = value
176
i += len(sub_list)
177
i += 1
178
if "link" in arg_base:
179
link_items = items[:arg_start] if arg_start != -1 else items
180
if link_items:
181
arg_base["link"] = " ".join(link_items)
182
183
184
def get_size_bytes(size):
185
size = size.lower()
186
if "k" in size:
187
size = int(float(size.split("k")[0]) * 1024)
188
elif "m" in size:
189
size = int(float(size.split("m")[0]) * 1048576)
190
elif "g" in size:
191
size = int(float(size.split("g")[0]) * 1073741824)
192
elif "t" in size:
193
size = int(float(size.split("t")[0]) * 1099511627776)
194
else:
195
size = 0
196
return size
197
198
199
async def get_content_type(url):
200
try:
201
async with AsyncClient() as client:
202
response = await client.get(url, allow_redirects=True, verify=False)
203
return response.headers.get("Content-Type")
204
except:
205
return None
206
207
208
def update_user_ldata(id_, key, value):
209
user_data.setdefault(id_, {})
210
user_data[id_][key] = value
211
212
213
async def cmd_exec(cmd, shell=False):
214
if shell:
215
proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
216
else:
217
proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
218
stdout, stderr = await proc.communicate()
219
try:
220
stdout = stdout.decode().strip()
221
except:
222
stdout = "Unable to decode the response!"
223
try:
224
stderr = stderr.decode().strip()
225
except:
226
stderr = "Unable to decode the error!"
227
return stdout, stderr, proc.returncode
228
229
230
def new_task(func):
231
@wraps(func)
232
async def wrapper(*args, **kwargs):
233
task = bot_loop.create_task(func(*args, **kwargs))
234
return task
235
236
return wrapper
237
238
239
async def sync_to_async(func, *args, wait=True, **kwargs):
240
pfunc = partial(func, *args, **kwargs)
241
future = bot_loop.run_in_executor(THREAD_POOL, pfunc)
242
return await future if wait else future
243
244
245
def async_to_sync(func, *args, wait=True, **kwargs):
246
future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop)
247
return future.result() if wait else future
248
249
250
def loop_thread(func):
251
@wraps(func)
252
def wrapper(*args, wait=False, **kwargs):
253
future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop)
254
return future.result() if wait else future
255
256
return wrapper
257
258