Path: blob/master/bot/helper/common.py
1630 views
from aiofiles.os import path as aiopath, remove, makedirs, listdir1from asyncio import sleep, gather2from os import walk, path as ospath3from secrets import token_urlsafe4from aioshutil import move, rmtree5from pyrogram.enums import ChatAction6from re import sub, I, findall7from shlex import split8from collections import Counter9from copy import deepcopy1011from .. import (12user_data,13multi_tags,14LOGGER,15task_dict_lock,16task_dict,17excluded_extensions,18included_extensions,19cpu_eater_lock,20intervals,21DOWNLOAD_DIR,22cores,23)24from ..core.config_manager import Config25from ..core.telegram_manager import TgClient26from .ext_utils.bot_utils import new_task, sync_to_async, get_size_bytes27from .ext_utils.bulk_links import extract_bulk_links28from .mirror_leech_utils.gdrive_utils.list import GoogleDriveList29from .mirror_leech_utils.rclone_utils.list import RcloneList30from .mirror_leech_utils.status_utils.sevenz_status import SevenZStatus31from .mirror_leech_utils.status_utils.ffmpeg_status import FFmpegStatus32from .telegram_helper.bot_commands import BotCommands33from .ext_utils.files_utils import (34get_base_name,35is_first_archive_split,36is_archive,37is_archive_split,38get_path_size,39split_file,40SevenZ,41)42from .ext_utils.links_utils import (43is_gdrive_id,44is_rclone_path,45is_gdrive_link,46is_telegram_link,47)48from .ext_utils.media_utils import (49create_thumb,50take_ss,51get_document_type,52FFMpeg,53)54from .telegram_helper.message_utils import (55send_message,56send_status_message,57get_tg_link_message,58temp_download,59)606162class TaskConfig:63def __init__(self):64self.mid = self.message.id65self.user = self.message.from_user or self.message.sender_chat66self.user_id = self.user.id67self.user_dict = user_data.get(self.user_id, {})68self.clone_dump_chats = {}69self.dir = f"{DOWNLOAD_DIR}{self.mid}"70self.up_dir = ""71self.link = ""72self.up_dest = ""73self.rc_flags = ""74self.tag = ""75self.name = ""76self.subname = ""77self.name_sub = ""78self.thumbnail_layout = ""79self.folder_name = ""80self.split_size = 081self.max_split_size = 082self.multi = 083self.size = 084self.subsize = 085self.proceed_count = 086self.is_leech = False87self.is_qbit = False88self.is_nzb = False89self.is_jd = False90self.is_clone = False91self.is_ytdlp = False92self.equal_splits = False93self.user_transmission = False94self.hybrid_leech = False95self.extract = False96self.compress = False97self.select = False98self.seed = False99self.compress = False100self.extract = False101self.join = False102self.private_link = False103self.stop_duplicate = False104self.sample_video = False105self.convert_audio = False106self.convert_video = False107self.screen_shots = False108self.is_cancelled = False109self.force_run = False110self.force_download = False111self.force_upload = False112self.is_torrent = False113self.as_med = False114self.as_doc = False115self.is_file = False116self.bot_trans = False117self.user_trans = False118self.is_rss = False119self.progress = True120self.ffmpeg_cmds = None121self.chat_thread_id = None122self.subproc = None123self.thumb = None124self.excluded_extensions = []125self.included_extensions = []126self.files_to_proceed = []127self.is_super_chat = self.message.chat.type.name in [128"SUPERGROUP",129"CHANNEL",130"FORUM",131]132133def get_token_path(self, dest):134if dest.startswith("mtp:"):135return f"tokens/{self.user_id}.pickle"136elif (137dest.startswith("sa:")138or Config.USE_SERVICE_ACCOUNTS139and not dest.startswith("tp:")140):141return "accounts"142else:143return "token.pickle"144145def get_config_path(self, dest):146return (147f"rclone/{self.user_id}.conf" if dest.startswith("mrcc:") else "rclone.conf"148)149150async def is_token_exists(self, path, status):151if is_rclone_path(path):152config_path = self.get_config_path(path)153if config_path != "rclone.conf" and status == "up":154self.private_link = True155if not await aiopath.exists(config_path):156raise ValueError(f"Rclone Config: {config_path} not Exists!")157elif (158status == "dl"159and is_gdrive_link(path)160or status == "up"161and is_gdrive_id(path)162):163token_path = self.get_token_path(path)164if token_path.startswith("tokens/") and status == "up":165self.private_link = True166if not await aiopath.exists(token_path):167raise ValueError(f"NO TOKEN! {token_path} not Exists!")168169async def before_start(self):170self.name_sub = (171self.name_sub172or self.user_dict.get("NAME_SUBSTITUTE", False)173or (174Config.NAME_SUBSTITUTE175if "NAME_SUBSTITUTE" not in self.user_dict176else ""177)178)179if self.name_sub:180self.name_sub = [x.split("/") for x in self.name_sub.split(" | ")]181self.excluded_extensions = self.user_dict.get("EXCLUDED_EXTENSIONS") or (182excluded_extensions183if "EXCLUDED_EXTENSIONS" not in self.user_dict184else ["aria2", "!qB"]185)186self.included_extensions = self.user_dict.get("INCLUDED_EXTENSIONS") or (187included_extensions if "INCLUDED_EXTENSIONS" not in self.user_dict else []188)189if not self.rc_flags:190if self.user_dict.get("RCLONE_FLAGS"):191self.rc_flags = self.user_dict["RCLONE_FLAGS"]192elif "RCLONE_FLAGS" not in self.user_dict and Config.RCLONE_FLAGS:193self.rc_flags = Config.RCLONE_FLAGS194if self.link not in ["rcl", "gdl"]:195if not self.is_jd:196if is_rclone_path(self.link):197if not self.link.startswith("mrcc:") and self.user_dict.get(198"USER_TOKENS", False199):200self.link = f"mrcc:{self.link}"201await self.is_token_exists(self.link, "dl")202elif is_gdrive_link(self.link):203if not self.link.startswith(204("mtp:", "tp:", "sa:")205) and self.user_dict.get("USER_TOKENS", False):206self.link = f"mtp:{self.link}"207await self.is_token_exists(self.link, "dl")208elif self.link == "rcl":209if not self.is_ytdlp and not self.is_jd:210self.link = await RcloneList(self).get_rclone_path("rcd")211if not is_rclone_path(self.link):212raise ValueError(self.link)213elif self.link == "gdl":214if not self.is_ytdlp and not self.is_jd:215self.link = await GoogleDriveList(self).get_target_id("gdd")216if not is_gdrive_id(self.link):217raise ValueError(self.link)218219self.user_transmission = TgClient.IS_PREMIUM_USER and (220self.user_dict.get("USER_TRANSMISSION")221or Config.USER_TRANSMISSION222and "USER_TRANSMISSION" not in self.user_dict223)224225if self.user_dict.get("UPLOAD_PATHS", False):226if self.up_dest in self.user_dict["UPLOAD_PATHS"]:227self.up_dest = self.user_dict["UPLOAD_PATHS"][self.up_dest]228elif (229"UPLOAD_PATHS" not in self.user_dict or not self.user_dict["UPLOAD_PATHS"]230) and Config.UPLOAD_PATHS:231if self.up_dest in Config.UPLOAD_PATHS:232self.up_dest = Config.UPLOAD_PATHS[self.up_dest]233234if self.ffmpeg_cmds:235if self.user_dict.get("FFMPEG_CMDS", None):236ffmpeg_dict = deepcopy(self.user_dict["FFMPEG_CMDS"])237elif (238"FFMPEG_CMDS" not in self.user_dict or not self.user_dict["FFMPEG_CMDS"]239) and Config.FFMPEG_CMDS:240ffmpeg_dict = deepcopy(Config.FFMPEG_CMDS)241else:242ffmpeg_dict = None243cmds = []244for key in list(self.ffmpeg_cmds):245if isinstance(key, tuple):246cmds.extend(list(key))247elif ffmpeg_dict is not None:248if key in ffmpeg_dict.keys():249for ind, vl in enumerate(ffmpeg_dict[key]):250if variables := set(findall(r"\{(.*?)\}", vl)):251ff_values = (252self.user_dict.get("FFMPEG_VARIABLES", {})253.get(key, {})254.get(str(ind), {})255)256if Counter(list(variables)) == Counter(257list(ff_values.keys())258):259cmds.append(vl.format(**ff_values))260else:261cmds.append(vl)262self.ffmpeg_cmds = cmds263264if not self.is_leech:265self.stop_duplicate = (266self.user_dict.get("STOP_DUPLICATE")267or "STOP_DUPLICATE" not in self.user_dict268and Config.STOP_DUPLICATE269)270default_upload = (271self.user_dict.get("DEFAULT_UPLOAD", "") or Config.DEFAULT_UPLOAD272)273if (not self.up_dest and default_upload == "rc") or self.up_dest == "rc":274self.up_dest = self.user_dict.get("RCLONE_PATH") or Config.RCLONE_PATH275elif (not self.up_dest and default_upload == "gd") or self.up_dest == "gd":276self.up_dest = self.user_dict.get("GDRIVE_ID") or Config.GDRIVE_ID277if not self.up_dest:278raise ValueError("No Upload Destination!")279if self.up_dest not in ["rcl", "gdl"]:280if is_gdrive_id(self.up_dest):281if not self.up_dest.startswith(282("mtp:", "tp:", "sa:")283) and self.user_dict.get("USER_TOKENS", False):284self.up_dest = f"mtp:{self.up_dest}"285elif is_rclone_path(self.up_dest):286if not self.up_dest.startswith("mrcc:") and self.user_dict.get(287"USER_TOKENS", False288):289self.up_dest = f"mrcc:{self.up_dest}"290self.up_dest = self.up_dest.strip("/")291else:292raise ValueError("Wrong Upload Destination!")293await self.is_token_exists(self.up_dest, "up")294295if self.up_dest == "rcl":296if self.is_clone:297if not is_rclone_path(self.link):298raise ValueError(299"You can't clone from different types of tools"300)301config_path = self.get_config_path(self.link)302else:303config_path = None304self.up_dest = await RcloneList(self).get_rclone_path(305"rcu", config_path306)307if not is_rclone_path(self.up_dest):308raise ValueError(self.up_dest)309elif self.up_dest == "gdl":310if self.is_clone:311if not is_gdrive_link(self.link):312raise ValueError(313"You can't clone from different types of tools"314)315token_path = self.get_token_path(self.link)316else:317token_path = None318self.up_dest = await GoogleDriveList(self).get_target_id(319"gdu", token_path320)321if not is_gdrive_id(self.up_dest):322raise ValueError(self.up_dest)323elif self.is_clone:324if is_gdrive_link(self.link) and self.get_token_path(325self.link326) != self.get_token_path(self.up_dest):327raise ValueError("You must use the same token to clone!")328elif is_rclone_path(self.link) and self.get_config_path(329self.link330) != self.get_config_path(self.up_dest):331raise ValueError("You must use the same config to clone!")332else:333self.up_dest = (334self.up_dest335or self.user_dict.get("LEECH_DUMP_CHAT")336or (337Config.LEECH_DUMP_CHAT338if "LEECH_DUMP_CHAT" not in self.user_dict339else None340)341)342self.hybrid_leech = TgClient.IS_PREMIUM_USER and (343self.user_dict.get("HYBRID_LEECH")344or Config.HYBRID_LEECH345and "HYBRID_LEECH" not in self.user_dict346)347if self.bot_trans:348self.user_transmission = False349self.hybrid_leech = False350if self.user_trans:351self.user_transmission = TgClient.IS_PREMIUM_USER352if self.up_dest:353if not isinstance(self.up_dest, int):354if self.up_dest.startswith("b:"):355self.up_dest = self.up_dest.replace("b:", "", 1)356self.user_transmission = False357self.hybrid_leech = False358elif self.up_dest.startswith("u:"):359self.up_dest = self.up_dest.replace("u:", "", 1)360self.user_transmission = TgClient.IS_PREMIUM_USER361elif self.up_dest.startswith("h:"):362self.up_dest = self.up_dest.replace("h:", "", 1)363self.user_transmission = TgClient.IS_PREMIUM_USER364self.hybrid_leech = self.user_transmission365if "|" in self.up_dest:366self.up_dest, self.chat_thread_id = list(367map(368lambda x: int(x) if x.lstrip("-").isdigit() else x,369self.up_dest.split("|", 1),370)371)372elif self.up_dest.lstrip("-").isdigit():373self.up_dest = int(self.up_dest)374elif self.up_dest.lower() == "pm":375self.up_dest = self.user_id376377if self.user_transmission:378try:379chat = await TgClient.user.get_chat(self.up_dest)380except:381chat = None382if chat is None:383LOGGER.warning(384"Account of user session can't find the the destination chat!"385)386self.user_transmission = False387self.hybrid_leech = False388else:389if chat.type.name not in [390"SUPERGROUP",391"CHANNEL",392"GROUP",393"FORUM",394]:395self.user_transmission = False396self.hybrid_leech = False397elif chat.is_admin:398member = await chat.get_member(TgClient.user.me.id)399if (400not member.privileges.can_manage_chat401or not member.privileges.can_delete_messages402):403self.user_transmission = False404self.hybrid_leech = False405LOGGER.warning(406"Enable manage chat and delete messages to account of the user session from administration settings!"407)408else:409LOGGER.warning(410"Promote the account of the user session to admin in the chat to get the benefit of user transmission!"411)412self.user_transmission = False413self.hybrid_leech = False414415if not self.user_transmission or self.hybrid_leech:416try:417chat = await self.client.get_chat(self.up_dest)418except:419chat = None420if chat is None:421if self.user_transmission:422self.hybrid_leech = False423else:424raise ValueError("Chat not found!")425else:426if chat.type.name in [427"SUPERGROUP",428"CHANNEL",429"GROUP",430"FORUM",431]:432if not chat.is_admin:433raise ValueError(434"Bot is not admin in the destination chat!"435)436else:437member = await chat.get_member(self.client.me.id)438if (439not member.privileges.can_manage_chat440or not member.privileges.can_delete_messages441):442if not self.user_transmission:443raise ValueError(444"You don't have enough privileges in this chat! Enable manage chat and delete messages for this bot!"445)446else:447self.hybrid_leech = False448else:449try:450await self.client.send_chat_action(451self.up_dest, ChatAction.TYPING452)453except:454raise ValueError("Start the bot and try again!")455elif (456self.user_transmission or self.hybrid_leech457) and not self.is_super_chat:458self.user_transmission = False459self.hybrid_leech = False460if self.split_size:461if self.split_size.isdigit():462self.split_size = int(self.split_size)463else:464self.split_size = get_size_bytes(self.split_size)465self.split_size = (466self.split_size467or self.user_dict.get("LEECH_SPLIT_SIZE")468or Config.LEECH_SPLIT_SIZE469)470self.equal_splits = (471self.user_dict.get("EQUAL_SPLITS")472or Config.EQUAL_SPLITS473and "EQUAL_SPLITS" not in self.user_dict474)475self.max_split_size = (476TgClient.MAX_SPLIT_SIZE if self.user_transmission else 2097152000477)478self.split_size = min(self.split_size, self.max_split_size)479480if not self.as_doc:481self.as_doc = (482not self.as_med483if self.as_med484else (485self.user_dict.get("AS_DOCUMENT", False)486or Config.AS_DOCUMENT487and "AS_DOCUMENT" not in self.user_dict488)489)490491self.thumbnail_layout = (492self.thumbnail_layout493or self.user_dict.get("THUMBNAIL_LAYOUT", False)494or (495Config.THUMBNAIL_LAYOUT496if "THUMBNAIL_LAYOUT" not in self.user_dict497else ""498)499)500501self.clone_dump_chats = self.user_dict.get("CLONE_DUMP_CHATS", {}) or (502Config.CLONE_DUMP_CHATS503if "CLONE_DUMP_CHATS" not in self.user_dict and Config.CLONE_DUMP_CHATS504else {}505)506if self.clone_dump_chats:507if isinstance(self.clone_dump_chats, int):508self.clone_dump_chats = [self.clone_dump_chats]509elif isinstance(self.clone_dump_chats, str):510if self.clone_dump_chats.startswith(511"["512) and self.clone_dump_chats.endswith("]"):513self.clone_dump_chats = eval(self.clone_dump_chats)514else:515self.clone_dump_chats = [self.clone_dump_chats]516temp_dict = {}517for ch in self.clone_dump_chats:518if isinstance(ch, str) and "|" in ch:519ci, ti = map(520lambda x: int(x) if x.lstrip("-").isdigit() else x,521ch.split("|", 1),522)523temp_dict[ci] = {"thread_id": ti, "last_sent_msg": None}524elif isinstance(ch, str):525if ch.lower() == "pm":526ci = self.user_id527else:528ci = int(ch) if ch.lstrip("-").isdigit() else ch529temp_dict[ci] = {"thread_id": None, "last_sent_msg": None}530else:531temp_dict[ch] = {"thread_id": None, "last_sent_msg": None}532self.clone_dump_chats = temp_dict533if self.thumb != "none" and is_telegram_link(self.thumb):534msg = (await get_tg_link_message(self.thumb))[0]535self.thumb = (536await create_thumb(msg) if msg.photo or msg.document else ""537)538539async def get_tag(self, text: list):540if len(text) > 1 and text[1].startswith("Tag: "):541self.is_rss = True542user_info = text[1].split("Tag: ")543if len(user_info) >= 3:544id_ = user_info[-1]545self.tag = " ".join(user_info[:-1])546else:547self.tag, id_ = text[1].split("Tag: ")[1].split()548self.user = self.message.from_user = await self.client.get_users(int(id_))549self.user_id = self.user.id550self.user_dict = user_data.get(self.user_id, {})551try:552await self.message.unpin()553except:554pass555if self.user:556if username := self.user.username:557self.tag = f"@{username}"558elif hasattr(self.user, "mention"):559self.tag = self.user.mention560else:561self.tag = self.user.title562563@new_task564async def run_multi(self, input_list, obj):565await sleep(7)566if not self.multi_tag and self.multi > 1:567self.multi_tag = token_urlsafe(3)568multi_tags.add(self.multi_tag)569elif self.multi <= 1:570if self.multi_tag in multi_tags:571multi_tags.discard(self.multi_tag)572return573if self.multi_tag and self.multi_tag not in multi_tags:574await send_message(575self.message, f"{self.tag} Multi Task has been cancelled!"576)577await send_status_message(self.message)578async with task_dict_lock:579for fd_name in self.same_dir:580self.same_dir[fd_name]["total"] -= self.multi581return582if len(self.bulk) != 0:583msg = input_list[:1]584msg.append(f"{self.bulk[0]} -i {self.multi - 1} {self.options}")585msgts = " ".join(msg)586if self.multi > 2:587msgts += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"588nextmsg = await send_message(self.message, msgts)589else:590msg = [s.strip() for s in input_list]591index = msg.index("-i")592msg[index + 1] = f"{self.multi - 1}"593nextmsg = await self.client.get_messages(594chat_id=self.message.chat.id,595message_ids=self.message.reply_to_message_id + 1,596)597msgts = " ".join(msg)598if self.multi > 2:599msgts += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"600nextmsg = await send_message(nextmsg, msgts)601nextmsg = await self.client.get_messages(602chat_id=self.message.chat.id, message_ids=nextmsg.id603)604if self.message.from_user:605nextmsg.from_user = self.user606else:607nextmsg.sender_chat = self.user608if intervals["stopAll"]:609return610await obj(611self.client,612nextmsg,613self.is_qbit,614self.is_leech,615self.is_jd,616self.is_nzb,617self.same_dir,618self.bulk,619self.multi_tag,620self.options,621).new_event()622623async def init_bulk(self, input_list, bulk_start, bulk_end, obj):624try:625self.bulk = await extract_bulk_links(self.message, bulk_start, bulk_end)626if len(self.bulk) == 0:627raise ValueError("Bulk Empty!")628b_msg = input_list[:1]629self.options = input_list[1:]630index = self.options.index("-b")631del self.options[index]632if bulk_start or bulk_end:633del self.options[index]634self.options = " ".join(self.options)635b_msg.append(f"{self.bulk[0]} -i {len(self.bulk)} {self.options}")636msg = " ".join(b_msg)637if len(self.bulk) > 2:638self.multi_tag = token_urlsafe(3)639multi_tags.add(self.multi_tag)640msg += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"641nextmsg = await send_message(self.message, msg)642nextmsg = await self.client.get_messages(643chat_id=self.message.chat.id, message_ids=nextmsg.id644)645if self.message.from_user:646nextmsg.from_user = self.user647else:648nextmsg.sender_chat = self.user649await obj(650self.client,651nextmsg,652self.is_qbit,653self.is_leech,654self.is_jd,655self.is_nzb,656self.same_dir,657self.bulk,658self.multi_tag,659self.options,660).new_event()661except Exception as e:662await send_message(663self.message,664f"Reply to text file or to telegram message that have links separated by new line! {e}",665)666667async def proceed_extract(self, dl_path, gid):668pswd = self.extract if isinstance(self.extract, str) else ""669self.files_to_proceed = []670if self.is_file and is_archive(dl_path):671self.files_to_proceed.append(dl_path)672else:673for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):674for file_ in files:675if (676is_first_archive_split(file_)677or is_archive(file_)678and not file_.strip().lower().endswith(".rar")679):680f_path = ospath.join(dirpath, file_)681self.files_to_proceed.append(f_path)682683if not self.files_to_proceed:684return dl_path685t_path = dl_path686sevenz = SevenZ(self)687LOGGER.info(f"Extracting: {self.name}")688async with task_dict_lock:689task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Extract")690for dirpath, _, files in await sync_to_async(691walk, self.up_dir or self.dir, topdown=False692):693code = 0694for file_ in files:695if self.is_cancelled:696return False697if (698is_first_archive_split(file_)699or is_archive(file_)700and not file_.strip().lower().endswith(".rar")701):702703self.proceed_count += 1704f_path = ospath.join(dirpath, file_)705t_path = get_base_name(f_path) if self.is_file else dirpath706if not self.is_file:707self.subname = file_708code = await sevenz.extract(f_path, t_path, pswd)709if self.is_cancelled:710return code711if code == 0:712for file_ in files:713if is_archive_split(file_) or is_archive(file_):714del_path = ospath.join(dirpath, file_)715try:716await remove(del_path)717except:718self.is_cancelled = True719if self.proceed_count == 0:720LOGGER.info("No files able to extract!")721return t_path if self.is_file and code == 0 else dl_path722723async def proceed_ffmpeg(self, dl_path, gid):724checked = False725inputs = {}726cmds = [727[part.strip() for part in split(item) if part.strip()]728for item in self.ffmpeg_cmds729]730try:731ffmpeg = FFMpeg(self)732for ffmpeg_cmd in cmds:733self.proceed_count = 0734cmd = [735"taskset",736"-c",737f"{cores}",738"ffmpeg",739"-hide_banner",740"-loglevel",741"error",742"-progress",743"pipe:1",744] + ffmpeg_cmd745if "-del" in cmd:746cmd.remove("-del")747delete_files = True748else:749delete_files = False750input_indexes = [751index for index, value in enumerate(cmd) if value == "-i"752]753input_file = next(754(755cmd[index + 1]756for index in input_indexes757if cmd[index + 1].startswith("mltb")758),759"",760)761if not input_file:762LOGGER.error("Wrong FFmpeg cmd!")763return dl_path764if input_file.strip().endswith(".video"):765ext = "video"766elif input_file.strip().endswith(".audio"):767ext = "audio"768elif "." not in input_file:769ext = "all"770else:771ext = ospath.splitext(input_file)[-1].lower()772if await aiopath.isfile(dl_path):773is_video, is_audio, _ = await get_document_type(dl_path)774if not is_video and not is_audio:775break776elif is_video and ext == "audio":777break778elif is_audio and not is_video and ext == "video":779break780elif ext not in [781"all",782"audio",783"video",784] and not dl_path.strip().lower().endswith(ext):785break786new_folder = ospath.splitext(dl_path)[0]787name = ospath.basename(dl_path)788await makedirs(new_folder, exist_ok=True)789file_path = f"{new_folder}/{name}"790await move(dl_path, file_path)791if not checked:792checked = True793async with task_dict_lock:794task_dict[self.mid] = FFmpegStatus(795self, ffmpeg, gid, "FFmpeg"796)797self.progress = False798await cpu_eater_lock.acquire()799self.progress = True800LOGGER.info(f"Running ffmpeg cmd for: {file_path}")801var_cmd = cmd.copy()802for index in input_indexes:803if cmd[index + 1].startswith("mltb"):804var_cmd[index + 1] = file_path805elif is_telegram_link(cmd[index + 1]):806msg = (await get_tg_link_message(cmd[index + 1]))[0]807file_dir = await temp_download(msg)808inputs[index + 1] = file_dir809var_cmd[index + 1] = file_dir810self.subsize = self.size811res = await ffmpeg.ffmpeg_cmds(var_cmd, file_path)812if res:813if delete_files:814await remove(file_path)815if len(await listdir(new_folder)) == 1:816folder = new_folder.rsplit("/", 1)[0]817self.name = ospath.basename(res[0])818if self.name.startswith("ffmpeg"):819self.name = self.name.split(".", 1)[-1]820dl_path = ospath.join(folder, self.name)821await move(res[0], dl_path)822await rmtree(new_folder)823else:824dl_path = new_folder825self.name = new_folder.rsplit("/", 1)[-1]826else:827dl_path = new_folder828self.name = new_folder.rsplit("/", 1)[-1]829else:830await move(file_path, dl_path)831await rmtree(new_folder)832else:833for dirpath, _, files in await sync_to_async(834walk, dl_path, topdown=False835):836for file_ in files:837var_cmd = cmd.copy()838if self.is_cancelled:839return False840f_path = ospath.join(dirpath, file_)841is_video, is_audio, _ = await get_document_type(f_path)842if not is_video and not is_audio:843continue844elif is_video and ext == "audio":845continue846elif is_audio and not is_video and ext == "video":847continue848elif ext not in [849"all",850"audio",851"video",852] and not f_path.strip().lower().endswith(ext):853continue854self.proceed_count += 1855for index in input_indexes:856if cmd[index + 1].startswith("mltb"):857var_cmd[index + 1] = f_path858elif is_telegram_link(cmd[index + 1]):859msg = (await get_tg_link_message(cmd[index + 1]))[0]860file_dir = await temp_download(msg)861inputs[index + 1] = file_dir862var_cmd[index + 1] = file_dir863if not checked:864checked = True865async with task_dict_lock:866task_dict[self.mid] = FFmpegStatus(867self, ffmpeg, gid, "FFmpeg"868)869self.progress = False870await cpu_eater_lock.acquire()871self.progress = True872LOGGER.info(f"Running ffmpeg cmd for: {f_path}")873self.subsize = await get_path_size(f_path)874self.subname = file_875res = await ffmpeg.ffmpeg_cmds(var_cmd, f_path)876if res and delete_files:877await remove(f_path)878if len(res) == 1:879file_name = ospath.basename(res[0])880if file_name.startswith("ffmpeg"):881newname = file_name.split(".", 1)[-1]882newres = ospath.join(dirpath, newname)883await move(res[0], newres)884for inp in inputs.values():885if "/temp/" in inp and aiopath.exists(inp):886await remove(inp)887finally:888if checked:889cpu_eater_lock.release()890return dl_path891892async def substitute(self, dl_path):893def perform_substitution(name, substitutions):894for substitution in substitutions:895sen = False896pattern = substitution[0]897if pattern.startswith('"') and pattern.endswith('"'):898pattern = pattern.strip('"')899if len(substitution) > 1:900if len(substitution) > 2:901sen = substitution[2] == "s"902res = substitution[1]903elif len(substitution[1]) == 0:904res = " "905else:906res = substitution[1]907else:908res = ""909try:910name = sub(pattern, res, name, flags=I if sen else 0)911except Exception as e:912LOGGER.error(913f"Substitute Error: pattern: {pattern} res: {res}. Error: {e}"914)915return False916if len(name.encode()) > 255:917LOGGER.error(f"Substitute: {name} is too long")918return False919return name920921if self.is_file:922up_dir, name = dl_path.rsplit("/", 1)923new_name = perform_substitution(name, self.name_sub)924if not new_name:925return dl_path926new_path = ospath.join(up_dir, new_name)927await move(dl_path, new_path)928return new_path929else:930for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):931for file_ in files:932f_path = ospath.join(dirpath, file_)933new_name = perform_substitution(file_, self.name_sub)934if not new_name:935continue936await move(f_path, ospath.join(dirpath, new_name))937return dl_path938939async def generate_screenshots(self, dl_path):940ss_nb = int(self.screen_shots) if isinstance(self.screen_shots, str) else 10941if self.is_file:942if (await get_document_type(dl_path))[0]:943LOGGER.info(f"Creating Screenshot for: {dl_path}")944res = await take_ss(dl_path, ss_nb)945if res:946new_folder = ospath.splitext(dl_path)[0]947name = ospath.basename(dl_path)948await makedirs(new_folder, exist_ok=True)949await gather(950move(dl_path, f"{new_folder}/{name}"),951move(res, new_folder),952)953return new_folder954else:955LOGGER.info(f"Creating Screenshot for: {dl_path}")956for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):957for file_ in files:958f_path = ospath.join(dirpath, file_)959if (await get_document_type(f_path))[0]:960await take_ss(f_path, ss_nb)961return dl_path962963async def convert_media(self, dl_path, gid):964fvext = []965if self.convert_video:966vdata = self.convert_video.split()967vext = vdata[0].lower()968if len(vdata) > 2:969if "+" in vdata[1].split():970vstatus = "+"971elif "-" in vdata[1].split():972vstatus = "-"973else:974vstatus = ""975fvext.extend(f".{ext.lower()}" for ext in vdata[2:])976else:977vstatus = ""978else:979vext = ""980vstatus = ""981982faext = []983if self.convert_audio:984adata = self.convert_audio.split()985aext = adata[0].lower()986if len(adata) > 2:987if "+" in adata[1].split():988astatus = "+"989elif "-" in adata[1].split():990astatus = "-"991else:992astatus = ""993faext.extend(f".{ext.lower()}" for ext in adata[2:])994else:995astatus = ""996else:997aext = ""998astatus = ""9991000self.files_to_proceed = {}1001all_files = []1002if self.is_file:1003all_files.append(dl_path)1004else:1005for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):1006for file_ in files:1007f_path = ospath.join(dirpath, file_)1008all_files.append(f_path)10091010for f_path in all_files:1011is_video, is_audio, _ = await get_document_type(f_path)1012if (1013is_video1014and vext1015and not f_path.strip().lower().endswith(f".{vext}")1016and (1017vstatus == "+"1018and f_path.strip().lower().endswith(tuple(fvext))1019or vstatus == "-"1020and not f_path.strip().lower().endswith(tuple(fvext))1021or not vstatus1022)1023):1024self.files_to_proceed[f_path] = "video"1025elif (1026is_audio1027and aext1028and not is_video1029and not f_path.strip().lower().endswith(f".{aext}")1030and (1031astatus == "+"1032and f_path.strip().lower().endswith(tuple(faext))1033or astatus == "-"1034and not f_path.strip().lower().endswith(tuple(faext))1035or not astatus1036)1037):1038self.files_to_proceed[f_path] = "audio"1039del all_files10401041if self.files_to_proceed:1042ffmpeg = FFMpeg(self)1043async with task_dict_lock:1044task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Convert")1045self.progress = False1046async with cpu_eater_lock:1047self.progress = True1048for f_path, f_type in self.files_to_proceed.items():1049self.proceed_count += 11050LOGGER.info(f"Converting: {f_path}")1051if self.is_file:1052self.subsize = self.size1053else:1054self.subsize = await get_path_size(f_path)1055self.subname = ospath.basename(f_path)1056if f_type == "video":1057res = await ffmpeg.convert_video(f_path, vext)1058else:1059res = await ffmpeg.convert_audio(f_path, aext)1060if res:1061try:1062await remove(f_path)1063except:1064self.is_cancelled = True1065return False1066if self.is_file:1067return res1068return dl_path10691070async def generate_sample_video(self, dl_path, gid):1071data = (1072self.sample_video.split(":") if isinstance(self.sample_video, str) else ""1073)1074if data:1075sample_duration = int(data[0]) if data[0] else 601076part_duration = int(data[1]) if len(data) > 1 else 41077else:1078sample_duration = 601079part_duration = 410801081self.files_to_proceed = {}1082if self.is_file and (await get_document_type(dl_path))[0]:1083file_ = ospath.basename(dl_path)1084self.files_to_proceed[dl_path] = file_1085else:1086for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):1087for file_ in files:1088f_path = ospath.join(dirpath, file_)1089if (await get_document_type(f_path))[0]:1090self.files_to_proceed[f_path] = file_1091if self.files_to_proceed:1092ffmpeg = FFMpeg(self)1093async with task_dict_lock:1094task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Sample Video")1095self.progress = False1096async with cpu_eater_lock:1097self.progress = True1098LOGGER.info(f"Creating Sample video: {self.name}")1099for f_path, file_ in self.files_to_proceed.items():1100self.proceed_count += 11101if self.is_file:1102self.subsize = self.size1103else:1104self.subsize = await get_path_size(f_path)1105self.subname = file_1106res = await ffmpeg.sample_video(1107f_path, sample_duration, part_duration1108)1109if res and self.is_file:1110new_folder = ospath.splitext(f_path)[0]1111await makedirs(new_folder, exist_ok=True)1112await gather(1113move(f_path, f"{new_folder}/{file_}"),1114move(res, f"{new_folder}/SAMPLE.{file_}"),1115)1116return new_folder1117return dl_path11181119async def proceed_compress(self, dl_path, gid):1120pswd = self.compress if isinstance(self.compress, str) else ""1121if self.is_leech and self.is_file:1122new_folder = ospath.splitext(dl_path)[0]1123name = ospath.basename(dl_path)1124await makedirs(new_folder, exist_ok=True)1125new_dl_path = f"{new_folder}/{name}"1126await move(dl_path, new_dl_path)1127dl_path = new_dl_path1128up_path = f"{new_dl_path}.zip"1129self.is_file = False1130else:1131up_path = f"{dl_path}.zip"1132sevenz = SevenZ(self)1133async with task_dict_lock:1134task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Zip")1135return await sevenz.zip(dl_path, up_path, pswd)11361137async def proceed_split(self, dl_path, gid):1138self.files_to_proceed = {}1139if self.is_file:1140f_size = await get_path_size(dl_path)1141if f_size > self.split_size:1142self.files_to_proceed[dl_path] = [f_size, ospath.basename(dl_path)]1143else:1144for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):1145for file_ in files:1146f_path = ospath.join(dirpath, file_)1147f_size = await get_path_size(f_path)1148if f_size > self.split_size:1149self.files_to_proceed[f_path] = [f_size, file_]1150if self.files_to_proceed:1151ffmpeg = FFMpeg(self)1152async with task_dict_lock:1153task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Split")1154LOGGER.info(f"Splitting: {self.name}")1155for f_path, (f_size, file_) in self.files_to_proceed.items():1156self.proceed_count += 11157if self.is_file:1158self.subsize = self.size1159else:1160self.subsize = f_size1161self.subname = file_1162parts = -(-f_size // self.split_size)1163if self.equal_splits:1164split_size = (f_size // parts) + (f_size % parts)1165else:1166split_size = self.split_size1167if not self.as_doc and (await get_document_type(f_path))[0]:1168self.progress = True1169res = await ffmpeg.split(f_path, file_, parts, split_size)1170else:1171self.progress = False1172res = await split_file(f_path, split_size, self)1173if self.is_cancelled:1174return False1175if res or f_size >= self.max_split_size:1176try:1177await remove(f_path)1178except:1179self.is_cancelled = True118011811182