Path: blob/master/bot/helper/mirror_leech_utils/telegram_uploader.py
1641 views
from PIL import Image1from aioshutil import rmtree2from asyncio import sleep3from logging import getLogger4from natsort import natsorted5from os import walk, path as ospath6from time import time7from re import match as re_match, sub as re_sub8from pyrogram.errors import FloodWait, RPCError, FloodPremiumWait, BadRequest9from pyrogram.types import (10InputMediaVideo,11InputMediaDocument,12InputMediaPhoto,13)14from aiofiles.os import (15remove,16path as aiopath,17rename,18)19from tenacity import (20retry,21wait_exponential,22stop_after_attempt,23retry_if_exception_type,24RetryError,25)2627from ... import intervals28from ...core.config_manager import Config29from ...core.telegram_manager import TgClient30from ..ext_utils.bot_utils import sync_to_async31from ..ext_utils.files_utils import is_archive, get_base_name32from ..telegram_helper.message_utils import delete_message33from ..ext_utils.media_utils import (34get_media_info,35get_document_type,36get_video_thumbnail,37get_audio_thumbnail,38get_multiple_frames_thumbnail,39)4041LOGGER = getLogger(__name__)424344class TelegramUploader:45def __init__(self, listener, path):46self._last_uploaded = 047self._processed_bytes = 048self._listener = listener49self._path = path50self._start_time = time()51self._total_files = 052self._thumb = self._listener.thumb or f"thumbnails/{listener.user_id}.jpg"53self._msgs_dict = {}54self._corrupted = 055self._is_corrupted = False56self._media_dict = {"videos": {}, "documents": {}}57self._last_msg_in_group = False58self._up_path = ""59self._lprefix = ""60self._media_group = False61self._is_private = False62self._sent_msg = None63self._user_session = self._listener.user_transmission64self._error = ""65self._base_msg = None66self._files_links = False6768async def _upload_progress(self, current, _):69if self._listener.is_cancelled:70if self._user_session:71TgClient.user.stop_transmission()72else:73self._listener.client.stop_transmission()74chunk_size = current - self._last_uploaded75self._last_uploaded = current76self._processed_bytes += chunk_size7778async def _user_settings(self):79self._media_group = self._listener.user_dict.get("MEDIA_GROUP", False) or (80Config.MEDIA_GROUP81if "MEDIA_GROUP" not in self._listener.user_dict82else False83)84self._lprefix = self._listener.user_dict.get("LEECH_FILENAME_PREFIX") or (85Config.LEECH_FILENAME_PREFIX86if "LEECH_FILENAME_PREFIX" not in self._listener.user_dict87else ""88)89self._files_links = self._listener.user_dict.get("FILES_LINKS", False) or (90Config.FILES_LINKS91if "FILES_LINKS" not in self._listener.user_dict92else False93)94if self._thumb != "none" and not await aiopath.exists(self._thumb):95self._thumb = None9697async def _msg_to_reply(self):98if self._listener.up_dest:99msg = (100self._listener.message.link101if self._listener.is_super_chat102else self._listener.message.text.lstrip("/")103)104try:105if self._user_session:106self._sent_msg = await TgClient.user.send_message(107chat_id=self._listener.up_dest,108text=msg,109message_thread_id=self._listener.chat_thread_id,110disable_notification=True,111)112else:113self._sent_msg = await self._listener.client.send_message(114chat_id=self._listener.up_dest,115text=msg,116message_thread_id=self._listener.chat_thread_id,117disable_notification=True,118)119self._is_private = self._sent_msg.chat.type.name == "PRIVATE"120except Exception as e:121await self._listener.on_upload_error(str(e))122return False123finally:124self._base_msg = self._sent_msg125elif self._user_session:126self._sent_msg = await TgClient.user.get_messages(127chat_id=self._listener.message.chat.id, message_ids=self._listener.mid128)129if self._sent_msg is None:130self._sent_msg = await TgClient.user.send_message(131chat_id=self._listener.message.chat.id,132text="Deleted Cmd Message! Don't delete the cmd message again!",133disable_notification=True,134)135else:136self._sent_msg = self._listener.message137return True138139async def _prepare_file(self, file_, dirpath):140if self._lprefix:141cap_mono = f"{self._lprefix} <code>{file_}</code>"142self._lprefix = re_sub("<.*?>", "", self._lprefix)143new_path = ospath.join(dirpath, f"{self._lprefix} {file_}")144await rename(self._up_path, new_path)145self._up_path = new_path146else:147cap_mono = f"<code>{file_}</code>"148if len(file_) > 60:149if is_archive(file_):150name = get_base_name(file_)151ext = file_.split(name, 1)[1]152elif match := re_match(r".+(?=\..+\.0*\d+$)|.+(?=\.part\d+\..+$)", file_):153name = match.group(0)154ext = file_.split(name, 1)[1]155elif len(fsplit := ospath.splitext(file_)) > 1:156name = fsplit[0]157ext = fsplit[1]158else:159name = file_160ext = ""161extn = len(ext)162remain = 60 - extn163name = name[:remain]164new_path = ospath.join(dirpath, f"{name}{ext}")165await rename(self._up_path, new_path)166self._up_path = new_path167return cap_mono168169def _get_input_media(self, subkey, key):170rlist = []171for msg in self._media_dict[key][subkey]:172if key == "videos":173input_media = InputMediaVideo(174media=msg.video.file_id, caption=msg.caption175)176else:177input_media = InputMediaDocument(178media=msg.document.file_id, caption=msg.caption179)180rlist.append(input_media)181return rlist182183async def _send_screenshots(self, dirpath, outputs):184inputs = [185InputMediaPhoto(ospath.join(dirpath, p), p.rsplit("/", 1)[-1])186for p in outputs187]188for i in range(0, len(inputs), 10):189batch = inputs[i : i + 10]190self._sent_msg = (191await self._sent_msg.reply_media_group(192media=batch,193disable_notification=True,194)195)[-1]196197async def _send_media_group(self, subkey, key, msgs):198for index, msg in enumerate(msgs):199if self._listener.hybrid_leech or not self._user_session:200msgs[index] = await self._listener.client.get_messages(201chat_id=msg[0], message_ids=msg[1]202)203else:204msgs[index] = await TgClient.user.get_messages(205chat_id=msg[0], message_ids=msg[1]206)207msgs_list = await msgs[0].reply_to_message.reply_media_group(208media=self._get_input_media(subkey, key),209disable_notification=True,210)211for msg in msgs:212if msg.link in self._msgs_dict:213del self._msgs_dict[msg.link]214await delete_message(msg)215del self._media_dict[key][subkey]216if self._files_links and (217self._listener.is_super_chat or self._listener.up_dest218):219for m in msgs_list:220self._msgs_dict[m.link] = m.caption221self._sent_msg = msgs_list[-1]222if self._base_msg:223await delete_message(self._base_msg)224self._base_msg = None225226async def upload(self):227await self._user_settings()228res = await self._msg_to_reply()229if not res:230return231for dirpath, _, files in natsorted(await sync_to_async(walk, self._path)):232if dirpath.strip().endswith("/yt-dlp-thumb"):233continue234if dirpath.strip().endswith("_mltbss"):235await self._send_screenshots(dirpath, files)236await rmtree(dirpath, ignore_errors=True)237continue238for file_ in natsorted(files):239self._error = ""240self._up_path = f_path = ospath.join(dirpath, file_)241if not await aiopath.exists(self._up_path):242if intervals["stopAll"]:243return244LOGGER.error(f"{self._up_path} not exists! Continue uploading!")245continue246try:247f_size = await aiopath.getsize(self._up_path)248self._total_files += 1249if f_size == 0:250LOGGER.error(251f"{self._up_path} size is zero, telegram don't upload zero size files"252)253self._corrupted += 1254continue255if self._listener.is_cancelled:256return257cap_mono = await self._prepare_file(file_, dirpath)258if self._last_msg_in_group:259group_lists = [260x for v in self._media_dict.values() for x in v.keys()261]262match = re_match(r".+(?=\.0*\d+$)|.+(?=\.part\d+\..+$)", f_path)263if not match or match and match.group(0) not in group_lists:264for key, value in list(self._media_dict.items()):265for subkey, msgs in list(value.items()):266if len(msgs) > 1:267await self._send_media_group(subkey, key, msgs)268if self._listener.hybrid_leech and self._listener.user_transmission:269self._user_session = f_size > 2097152000270if self._user_session:271self._sent_msg = await TgClient.user.get_messages(272chat_id=self._sent_msg.chat.id,273message_ids=self._sent_msg.id,274)275else:276self._sent_msg = await self._listener.client.get_messages(277chat_id=self._sent_msg.chat.id,278message_ids=self._sent_msg.id,279)280self._last_msg_in_group = False281self._last_uploaded = 0282await self._upload_file(cap_mono, file_, f_path)283if not self._sent_msg.media_group_id:284for ch, ch_data in list(285self._listener.clone_dump_chats.items()286):287try:288res = await TgClient.bot.copy_message(289chat_id=ch,290from_chat_id=self._sent_msg.chat.id,291message_id=self._sent_msg.id,292message_thread_id=ch_data["thread_id"],293disable_notification=True,294reply_to_message_id=ch_data["last_sent_msg"],295)296self._listener.clone_dump_chats[ch][297"last_sent_msg"298] = res.id299except Exception as e:300LOGGER.error(301f"Can't forward message to clone dump chat: {ch}. Error: {e}"302)303if self._listener.is_cancelled:304return305if (306self._files_links307and not self._is_corrupted308and (self._listener.is_super_chat or self._listener.up_dest)309and not self._is_private310):311self._msgs_dict[self._sent_msg.link] = file_312await sleep(1)313except Exception as err:314if isinstance(err, RetryError):315LOGGER.info(316f"Total Attempts: {err.last_attempt.attempt_number}"317)318err = err.last_attempt.exception()319LOGGER.error(f"{err}. Path: {self._up_path}")320self._error = str(err)321self._corrupted += 1322if self._listener.is_cancelled:323return324if not self._listener.is_cancelled and await aiopath.exists(325self._up_path326):327await remove(self._up_path)328for key, value in list(self._media_dict.items()):329for subkey, msgs in list(value.items()):330if len(msgs) > 1:331try:332await self._send_media_group(subkey, key, msgs)333except Exception as e:334LOGGER.info(335f"While sending media group at the end of task. Error: {e}"336)337if self._base_msg:338await delete_message(self._base_msg)339self._base_msg = None340if self._listener.is_cancelled:341return342if self._total_files == 0:343await self._listener.on_upload_error(344"No files to upload. In case you have filled EXCLUDED/INCLUDED EXTENSIONS, then check if all files have those extensions or not."345)346return347if self._total_files <= self._corrupted:348await self._listener.on_upload_error(349f"Files Corrupted or unable to upload. {self._error or 'Check logs!'}"350)351return352LOGGER.info(f"Leech Completed: {self._listener.name}")353await self._listener.on_upload_complete(354None, self._msgs_dict, self._total_files, self._corrupted355)356return357358@retry(359wait=wait_exponential(multiplier=2, min=4, max=8),360stop=stop_after_attempt(3),361retry=retry_if_exception_type(Exception),362)363async def _upload_file(self, cap_mono, file, o_path, force_document=False):364if (365self._thumb is not None366and not await aiopath.exists(self._thumb)367and self._thumb != "none"368):369self._thumb = None370thumb = self._thumb371self._is_corrupted = False372try:373is_video, is_audio, is_image = await get_document_type(self._up_path)374375if not is_image and thumb is None:376file_name = ospath.splitext(file)[0]377thumb_path = f"{self._path}/yt-dlp-thumb/{file_name}.jpg"378if await aiopath.isfile(thumb_path):379thumb = thumb_path380elif await aiopath.isfile(thumb_path.replace("/yt-dlp-thumb", "")):381thumb = thumb_path.replace("/yt-dlp-thumb", "")382elif is_audio and not is_video:383thumb = await get_audio_thumbnail(self._up_path)384385if (386self._listener.as_doc387or force_document388or (not is_video and not is_audio and not is_image)389):390key = "documents"391if is_video and thumb is None:392thumb = await get_video_thumbnail(self._up_path, None)393394if self._listener.is_cancelled:395return396if thumb == "none":397thumb = None398self._sent_msg = await self._sent_msg.reply_document(399document=self._up_path,400thumb=thumb,401caption=cap_mono,402force_document=True,403disable_notification=True,404progress=self._upload_progress,405)406elif is_video:407key = "videos"408duration = (await get_media_info(self._up_path))[0]409if thumb is None and self._listener.thumbnail_layout:410thumb = await get_multiple_frames_thumbnail(411self._up_path,412self._listener.thumbnail_layout,413self._listener.screen_shots,414)415if thumb is None:416thumb = await get_video_thumbnail(self._up_path, duration)417if thumb is not None and thumb != "none":418with Image.open(thumb) as img:419width, height = img.size420else:421width = 480422height = 320423if self._listener.is_cancelled:424return425if thumb == "none":426thumb = None427self._sent_msg = await self._sent_msg.reply_video(428video=self._up_path,429caption=cap_mono,430duration=duration,431width=width,432height=height,433thumb=thumb,434supports_streaming=True,435disable_notification=True,436progress=self._upload_progress,437)438elif is_audio:439key = "audios"440duration, artist, title = await get_media_info(self._up_path)441if self._listener.is_cancelled:442return443if thumb == "none":444thumb = None445self._sent_msg = await self._sent_msg.reply_audio(446audio=self._up_path,447caption=cap_mono,448duration=duration,449performer=artist,450title=title,451thumb=thumb,452disable_notification=True,453progress=self._upload_progress,454)455else:456key = "photos"457if self._listener.is_cancelled:458return459self._sent_msg = await self._sent_msg.reply_photo(460photo=self._up_path,461caption=cap_mono,462disable_notification=True,463progress=self._upload_progress,464)465466if (467not self._listener.is_cancelled468and self._media_group469and (self._sent_msg.video or self._sent_msg.document)470):471key = "documents" if self._sent_msg.document else "videos"472if match := re_match(r".+(?=\.0*\d+$)|.+(?=\.part\d+\..+$)", o_path):473474pname = match.group(0)475if pname in self._media_dict[key].keys():476self._media_dict[key][pname].append(477[self._sent_msg.chat.id, self._sent_msg.id]478)479else:480self._media_dict[key][pname] = [481[self._sent_msg.chat.id, self._sent_msg.id]482]483msgs = self._media_dict[key][pname]484if len(msgs) == 10:485await self._send_media_group(pname, key, msgs)486else:487self._last_msg_in_group = True488if (489self._thumb is None490and thumb is not None491and await aiopath.exists(thumb)492):493await remove(thumb)494if self._base_msg and not self._last_msg_in_group:495await delete_message(self._base_msg)496self._base_msg = None497except (FloodWait, FloodPremiumWait) as f:498LOGGER.warning(str(f))499await sleep(f.value * 1.3)500if (501self._thumb is None502and thumb is not None503and await aiopath.exists(thumb)504):505await remove(thumb)506return await self._upload_file(cap_mono, file, o_path)507except Exception as err:508if (509self._thumb is None510and thumb is not None511and await aiopath.exists(thumb)512):513await remove(thumb)514err_type = "RPCError: " if isinstance(err, RPCError) else ""515LOGGER.error(f"{err_type}{err}. Path: {self._up_path}")516if isinstance(err, BadRequest) and key != "documents":517LOGGER.error(f"Retrying As Document. Path: {self._up_path}")518return await self._upload_file(cap_mono, file, o_path, True)519raise err520521@property522def speed(self):523try:524return self._processed_bytes / (time() - self._start_time)525except:526return 0527528@property529def processed_bytes(self):530return self._processed_bytes531532async def cancel_task(self):533self._listener.is_cancelled = True534LOGGER.info(f"Cancelling Upload: {self._listener.name}")535await self._listener.on_upload_error("your upload has been stopped!")536537538