Path: blob/master/bot/helper/ext_utils/bot_utils.py
1630 views
from httpx import AsyncClient1from asyncio.subprocess import PIPE2from functools import partial, wraps3from concurrent.futures import ThreadPoolExecutor4from asyncio import (5create_subprocess_exec,6create_subprocess_shell,7run_coroutine_threadsafe,8sleep,9)1011from ... import user_data, bot_loop12from ...core.config_manager import Config13from ..telegram_helper.button_build import ButtonMaker14from .telegraph_helper import telegraph15from .help_messages import (16YT_HELP_DICT,17MIRROR_HELP_DICT,18CLONE_HELP_DICT,19)2021COMMAND_USAGE = {}2223THREAD_POOL = ThreadPoolExecutor(max_workers=500)242526class SetInterval:27def __init__(self, interval, action, *args, **kwargs):28self.interval = interval29self.action = action30self.task = bot_loop.create_task(self._set_interval(*args, **kwargs))3132async def _set_interval(self, *args, **kwargs):33while True:34await sleep(self.interval)35await self.action(*args, **kwargs)3637def cancel(self):38self.task.cancel()394041def _build_command_usage(help_dict, command_key):42buttons = ButtonMaker()43for name in list(help_dict.keys())[1:]:44buttons.data_button(name, f"help {command_key} {name}")45buttons.data_button("Close", "help close")46COMMAND_USAGE[command_key] = [help_dict["main"], buttons.build_menu(3)]47buttons.reset()484950def create_help_buttons():51_build_command_usage(MIRROR_HELP_DICT, "mirror")52_build_command_usage(YT_HELP_DICT, "yt")53_build_command_usage(CLONE_HELP_DICT, "clone")545556def bt_selection_buttons(id_):57gid = id_[:12] if len(id_) > 25 else id_58pin = "".join([n for n in id_ if n.isdigit()][:4])59buttons = ButtonMaker()60if Config.WEB_PINCODE:61buttons.url_button("Select Files", f"{Config.BASE_URL}/app/files?gid={id_}")62buttons.data_button("Pincode", f"sel pin {gid} {pin}")63else:64buttons.url_button(65"Select Files", f"{Config.BASE_URL}/app/files?gid={id_}&pin={pin}"66)67buttons.data_button("Done Selecting", f"sel done {gid} {id_}")68buttons.data_button("Cancel", f"sel cancel {gid}")69return buttons.build_menu(2)707172async def get_telegraph_list(telegraph_content):73path = [74(75await telegraph.create_page(76title="Mirror-Leech-Bot Drive Search", content=content77)78)["path"]79for content in telegraph_content80]81if len(path) > 1:82await telegraph.edit_telegraph(path, telegraph_content)83buttons = ButtonMaker()84buttons.url_button("🔎 VIEW", f"https://telegra.ph/{path[0]}")85return buttons.build_menu(1)868788def arg_parser(items, arg_base):8990if not items:91return9293arg_start = -194i = 095total = len(items)9697bool_arg_set = {98"-b",99"-e",100"-z",101"-s",102"-j",103"-d",104"-sv",105"-ss",106"-f",107"-fd",108"-fu",109"-sync",110"-hl",111"-doc",112"-med",113"-ut",114"-bt",115}116117while i < total:118part = items[i]119120if part in arg_base:121if arg_start == -1:122arg_start = i123124if (125i + 1 == total126and part in bool_arg_set127or part128in [129"-s",130"-j",131"-f",132"-fd",133"-fu",134"-sync",135"-hl",136"-doc",137"-med",138"-ut",139"-bt",140]141):142arg_base[part] = True143else:144sub_list = []145for j in range(i + 1, total):146if items[j] in arg_base:147if part == "-c" and items[j] == "-c":148sub_list.append(items[j])149continue150if part in bool_arg_set and not sub_list:151arg_base[part] = True152break153if not sub_list:154break155check = " ".join(sub_list).strip()156if part != "-ff":157break158if check.startswith("[") and check.endswith("]"):159break160elif not check.startswith("["):161break162sub_list.append(items[j])163if sub_list:164value = " ".join(sub_list)165if part == "-ff":166if not value.strip().startswith("["):167arg_base[part].add(value)168else:169try:170arg_base[part].add(tuple(eval(value)))171except:172pass173else:174arg_base[part] = value175i += len(sub_list)176i += 1177if "link" in arg_base:178link_items = items[:arg_start] if arg_start != -1 else items179if link_items:180arg_base["link"] = " ".join(link_items)181182183def get_size_bytes(size):184size = size.lower()185if "k" in size:186size = int(float(size.split("k")[0]) * 1024)187elif "m" in size:188size = int(float(size.split("m")[0]) * 1048576)189elif "g" in size:190size = int(float(size.split("g")[0]) * 1073741824)191elif "t" in size:192size = int(float(size.split("t")[0]) * 1099511627776)193else:194size = 0195return size196197198async def get_content_type(url):199try:200async with AsyncClient() as client:201response = await client.get(url, allow_redirects=True, verify=False)202return response.headers.get("Content-Type")203except:204return None205206207def update_user_ldata(id_, key, value):208user_data.setdefault(id_, {})209user_data[id_][key] = value210211212async def cmd_exec(cmd, shell=False):213if shell:214proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)215else:216proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)217stdout, stderr = await proc.communicate()218try:219stdout = stdout.decode().strip()220except:221stdout = "Unable to decode the response!"222try:223stderr = stderr.decode().strip()224except:225stderr = "Unable to decode the error!"226return stdout, stderr, proc.returncode227228229def new_task(func):230@wraps(func)231async def wrapper(*args, **kwargs):232task = bot_loop.create_task(func(*args, **kwargs))233return task234235return wrapper236237238async def sync_to_async(func, *args, wait=True, **kwargs):239pfunc = partial(func, *args, **kwargs)240future = bot_loop.run_in_executor(THREAD_POOL, pfunc)241return await future if wait else future242243244def async_to_sync(func, *args, wait=True, **kwargs):245future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop)246return future.result() if wait else future247248249def loop_thread(func):250@wraps(func)251def wrapper(*args, wait=False, **kwargs):252future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop)253return future.result() if wait else future254255return wrapper256257258