Path: blob/master/bot/modules/clone.py
1619 views
from asyncio import gather1from json import loads2from secrets import token_urlsafe3from aiofiles.os import remove45from .. import LOGGER, task_dict, task_dict_lock, bot_loop6from ..helper.ext_utils.bot_utils import (7sync_to_async,8cmd_exec,9arg_parser,10COMMAND_USAGE,11)12from ..helper.ext_utils.exceptions import DirectDownloadLinkException13from ..helper.ext_utils.links_utils import (14is_gdrive_link,15is_share_link,16is_rclone_path,17is_gdrive_id,18)19from ..helper.ext_utils.task_manager import stop_duplicate_check20from ..helper.listeners.task_listener import TaskListener21from ..helper.mirror_leech_utils.download_utils.direct_link_generator import (22direct_link_generator,23)24from ..helper.mirror_leech_utils.gdrive_utils.clone import GoogleDriveClone25from ..helper.mirror_leech_utils.gdrive_utils.count import GoogleDriveCount26from ..helper.mirror_leech_utils.rclone_utils.transfer import RcloneTransferHelper27from ..helper.mirror_leech_utils.status_utils.gdrive_status import GoogleDriveStatus28from ..helper.mirror_leech_utils.status_utils.rclone_status import RcloneStatus29from ..helper.telegram_helper.message_utils import (30send_message,31delete_message,32send_status_message,33)343536class Clone(TaskListener):37def __init__(38self,39client,40message,41_=None,42__=None,43___=None,44____=None,45_____=None,46bulk=None,47multi_tag=None,48options="",49):50if bulk is None:51bulk = []52self.message = message53self.client = client54self.multi_tag = multi_tag55self.options = options56self.same_dir = {}57self.bulk = bulk58super().__init__()59self.is_clone = True6061async def new_event(self):62text = self.message.text.split("\n")63input_list = text[0].split(" ")6465args = {66"link": "",67"-i": 0,68"-b": False,69"-n": "",70"-up": "",71"-rcf": "",72"-sync": False,73}7475arg_parser(input_list[1:], args)7677try:78self.multi = int(args["-i"])79except:80self.multi = 08182self.up_dest = args["-up"]83self.rc_flags = args["-rcf"]84self.link = args["link"]85self.name = args["-n"]8687is_bulk = args["-b"]88sync = args["-sync"]89bulk_start = 090bulk_end = 09192if not isinstance(is_bulk, bool):93dargs = is_bulk.split(":")94bulk_start = dargs[0] or 095if len(dargs) == 2:96bulk_end = dargs[1] or 097is_bulk = True9899if is_bulk:100await self.init_bulk(input_list, bulk_start, bulk_end, Clone)101return102103await self.get_tag(text)104105if not self.link and (reply_to := self.message.reply_to_message):106self.link = reply_to.text.split("\n", 1)[0].strip()107108await self.run_multi(input_list, Clone)109110if len(self.link) == 0:111await send_message(112self.message, COMMAND_USAGE["clone"][0], COMMAND_USAGE["clone"][1]113)114return115LOGGER.info(self.link)116try:117await self.before_start()118except Exception as e:119await send_message(self.message, e)120return121await self._proceed_to_clone(sync)122123async def _proceed_to_clone(self, sync):124if is_share_link(self.link):125try:126self.link = await sync_to_async(direct_link_generator, self.link)127LOGGER.info(f"Generated link: {self.link}")128except DirectDownloadLinkException as e:129LOGGER.error(str(e))130if str(e).startswith("ERROR:"):131await send_message(self.message, str(e))132return133if is_gdrive_link(self.link) or is_gdrive_id(self.link):134self.name, mime_type, self.size, files, _ = await sync_to_async(135GoogleDriveCount().count, self.link, self.user_id136)137if mime_type is None:138await send_message(self.message, self.name)139return140msg, button = await stop_duplicate_check(self)141if msg:142await send_message(self.message, msg, button)143return144await self.on_download_start()145LOGGER.info(f"Clone Started: Name: {self.name} - Source: {self.link}")146drive = GoogleDriveClone(self)147if files <= 10:148msg = await send_message(149self.message, f"Cloning: <code>{self.link}</code>"150)151else:152msg = ""153gid = token_urlsafe(12)154async with task_dict_lock:155task_dict[self.mid] = GoogleDriveStatus(self, drive, gid, "cl")156if self.multi <= 1:157await send_status_message(self.message)158flink, mime_type, files, folders, dir_id = await sync_to_async(drive.clone)159if msg:160await delete_message(msg)161if not flink:162return163await self.on_upload_complete(164flink, files, folders, mime_type, dir_id=dir_id165)166LOGGER.info(f"Cloning Done: {self.name}")167elif is_rclone_path(self.link):168if self.link.startswith("mrcc:"):169self.link = self.link.replace("mrcc:", "", 1)170self.up_dest = self.up_dest.replace("mrcc:", "", 1)171config_path = f"rclone/{self.user_id}.conf"172else:173config_path = "rclone.conf"174175remote, src_path = self.link.split(":", 1)176self.link = src_path.strip("/")177if self.link.startswith("rclone_select"):178mime_type = "Folder"179src_path = ""180if not self.name:181self.name = self.link182else:183src_path = self.link184cmd = [185"rclone",186"lsjson",187"--fast-list",188"--stat",189"--no-modtime",190"--config",191config_path,192f"{remote}:{src_path}",193]194res = await cmd_exec(cmd)195if res[2] != 0:196if res[2] != -9:197msg = f"Error: While getting rclone stat. Path: {remote}:{src_path}. Stderr: {res[1][:4000]}"198await send_message(self.message, msg)199return200rstat = loads(res[0])201if rstat["IsDir"]:202if not self.name:203self.name = src_path.rsplit("/", 1)[-1] if src_path else remote204self.up_dest += (205self.name if self.up_dest.endswith(":") else f"/{self.name}"206)207mime_type = "Folder"208else:209if not self.name:210self.name = src_path.rsplit("/", 1)[-1]211mime_type = rstat["MimeType"]212213await self.on_download_start()214215RCTransfer = RcloneTransferHelper(self)216LOGGER.info(217f"Clone Started: Name: {self.name} - Source: {self.link} - Destination: {self.up_dest}"218)219gid = token_urlsafe(12)220async with task_dict_lock:221task_dict[self.mid] = RcloneStatus(self, RCTransfer, gid, "cl")222if self.multi <= 1:223await send_status_message(self.message)224method = "sync" if sync else "copy"225flink, destination = await RCTransfer.clone(226config_path,227remote,228src_path,229mime_type,230method,231)232if self.link.startswith("rclone_select"):233await remove(self.link)234if not destination:235return236LOGGER.info(f"Cloning Done: {self.name}")237cmd1 = [238"rclone",239"lsf",240"--fast-list",241"-R",242"--files-only",243"--config",244config_path,245destination,246]247cmd2 = [248"rclone",249"lsf",250"--fast-list",251"-R",252"--dirs-only",253"--config",254config_path,255destination,256]257cmd3 = [258"rclone",259"size",260"--fast-list",261"--json",262"--config",263config_path,264destination,265]266res1, res2, res3 = await gather(267cmd_exec(cmd1),268cmd_exec(cmd2),269cmd_exec(cmd3),270)271if res1[2] != 0 or res2[2] != 0 or res3[2] != 0:272if res1[2] == -9:273return274files = None275folders = None276self.size = 0277error = res1[1] or res2[1] or res3[1]278msg = f"Error: While getting rclone stat. Path: {destination}. Stderr: {error[:4000]}"279await self.on_upload_error(msg)280else:281files = len(res1[0].split("\n"))282folders = len(res2[0].strip().split("\n")) if res2[0] else 0283rsize = loads(res3[0])284self.size = rsize["bytes"]285await self.on_upload_complete(286flink, files, folders, mime_type, destination287)288else:289await send_message(290self.message, COMMAND_USAGE["clone"][0], COMMAND_USAGE["clone"][1]291)292293294async def clone_node(client, message):295bot_loop.create_task(Clone(client, message).new_event())296297298