Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/mirror_leech_utils/telegram_uploader.py
1641 views
1
from PIL import Image
2
from aioshutil import rmtree
3
from asyncio import sleep
4
from logging import getLogger
5
from natsort import natsorted
6
from os import walk, path as ospath
7
from time import time
8
from re import match as re_match, sub as re_sub
9
from pyrogram.errors import FloodWait, RPCError, FloodPremiumWait, BadRequest
10
from pyrogram.types import (
11
InputMediaVideo,
12
InputMediaDocument,
13
InputMediaPhoto,
14
)
15
from aiofiles.os import (
16
remove,
17
path as aiopath,
18
rename,
19
)
20
from tenacity import (
21
retry,
22
wait_exponential,
23
stop_after_attempt,
24
retry_if_exception_type,
25
RetryError,
26
)
27
28
from ... import intervals
29
from ...core.config_manager import Config
30
from ...core.telegram_manager import TgClient
31
from ..ext_utils.bot_utils import sync_to_async
32
from ..ext_utils.files_utils import is_archive, get_base_name
33
from ..telegram_helper.message_utils import delete_message
34
from ..ext_utils.media_utils import (
35
get_media_info,
36
get_document_type,
37
get_video_thumbnail,
38
get_audio_thumbnail,
39
get_multiple_frames_thumbnail,
40
)
41
42
LOGGER = getLogger(__name__)
43
44
45
class TelegramUploader:
46
def __init__(self, listener, path):
47
self._last_uploaded = 0
48
self._processed_bytes = 0
49
self._listener = listener
50
self._path = path
51
self._start_time = time()
52
self._total_files = 0
53
self._thumb = self._listener.thumb or f"thumbnails/{listener.user_id}.jpg"
54
self._msgs_dict = {}
55
self._corrupted = 0
56
self._is_corrupted = False
57
self._media_dict = {"videos": {}, "documents": {}}
58
self._last_msg_in_group = False
59
self._up_path = ""
60
self._lprefix = ""
61
self._media_group = False
62
self._is_private = False
63
self._sent_msg = None
64
self._user_session = self._listener.user_transmission
65
self._error = ""
66
self._base_msg = None
67
self._files_links = False
68
69
async def _upload_progress(self, current, _):
70
if self._listener.is_cancelled:
71
if self._user_session:
72
TgClient.user.stop_transmission()
73
else:
74
self._listener.client.stop_transmission()
75
chunk_size = current - self._last_uploaded
76
self._last_uploaded = current
77
self._processed_bytes += chunk_size
78
79
async def _user_settings(self):
80
self._media_group = self._listener.user_dict.get("MEDIA_GROUP", False) or (
81
Config.MEDIA_GROUP
82
if "MEDIA_GROUP" not in self._listener.user_dict
83
else False
84
)
85
self._lprefix = self._listener.user_dict.get("LEECH_FILENAME_PREFIX") or (
86
Config.LEECH_FILENAME_PREFIX
87
if "LEECH_FILENAME_PREFIX" not in self._listener.user_dict
88
else ""
89
)
90
self._files_links = self._listener.user_dict.get("FILES_LINKS", False) or (
91
Config.FILES_LINKS
92
if "FILES_LINKS" not in self._listener.user_dict
93
else False
94
)
95
if self._thumb != "none" and not await aiopath.exists(self._thumb):
96
self._thumb = None
97
98
async def _msg_to_reply(self):
99
if self._listener.up_dest:
100
msg = (
101
self._listener.message.link
102
if self._listener.is_super_chat
103
else self._listener.message.text.lstrip("/")
104
)
105
try:
106
if self._user_session:
107
self._sent_msg = await TgClient.user.send_message(
108
chat_id=self._listener.up_dest,
109
text=msg,
110
message_thread_id=self._listener.chat_thread_id,
111
disable_notification=True,
112
)
113
else:
114
self._sent_msg = await self._listener.client.send_message(
115
chat_id=self._listener.up_dest,
116
text=msg,
117
message_thread_id=self._listener.chat_thread_id,
118
disable_notification=True,
119
)
120
self._is_private = self._sent_msg.chat.type.name == "PRIVATE"
121
except Exception as e:
122
await self._listener.on_upload_error(str(e))
123
return False
124
finally:
125
self._base_msg = self._sent_msg
126
elif self._user_session:
127
self._sent_msg = await TgClient.user.get_messages(
128
chat_id=self._listener.message.chat.id, message_ids=self._listener.mid
129
)
130
if self._sent_msg is None:
131
self._sent_msg = await TgClient.user.send_message(
132
chat_id=self._listener.message.chat.id,
133
text="Deleted Cmd Message! Don't delete the cmd message again!",
134
disable_notification=True,
135
)
136
else:
137
self._sent_msg = self._listener.message
138
return True
139
140
async def _prepare_file(self, file_, dirpath):
141
if self._lprefix:
142
cap_mono = f"{self._lprefix} <code>{file_}</code>"
143
self._lprefix = re_sub("<.*?>", "", self._lprefix)
144
new_path = ospath.join(dirpath, f"{self._lprefix} {file_}")
145
await rename(self._up_path, new_path)
146
self._up_path = new_path
147
else:
148
cap_mono = f"<code>{file_}</code>"
149
if len(file_) > 60:
150
if is_archive(file_):
151
name = get_base_name(file_)
152
ext = file_.split(name, 1)[1]
153
elif match := re_match(r".+(?=\..+\.0*\d+$)|.+(?=\.part\d+\..+$)", file_):
154
name = match.group(0)
155
ext = file_.split(name, 1)[1]
156
elif len(fsplit := ospath.splitext(file_)) > 1:
157
name = fsplit[0]
158
ext = fsplit[1]
159
else:
160
name = file_
161
ext = ""
162
extn = len(ext)
163
remain = 60 - extn
164
name = name[:remain]
165
new_path = ospath.join(dirpath, f"{name}{ext}")
166
await rename(self._up_path, new_path)
167
self._up_path = new_path
168
return cap_mono
169
170
def _get_input_media(self, subkey, key):
171
rlist = []
172
for msg in self._media_dict[key][subkey]:
173
if key == "videos":
174
input_media = InputMediaVideo(
175
media=msg.video.file_id, caption=msg.caption
176
)
177
else:
178
input_media = InputMediaDocument(
179
media=msg.document.file_id, caption=msg.caption
180
)
181
rlist.append(input_media)
182
return rlist
183
184
async def _send_screenshots(self, dirpath, outputs):
185
inputs = [
186
InputMediaPhoto(ospath.join(dirpath, p), p.rsplit("/", 1)[-1])
187
for p in outputs
188
]
189
for i in range(0, len(inputs), 10):
190
batch = inputs[i : i + 10]
191
self._sent_msg = (
192
await self._sent_msg.reply_media_group(
193
media=batch,
194
disable_notification=True,
195
)
196
)[-1]
197
198
async def _send_media_group(self, subkey, key, msgs):
199
for index, msg in enumerate(msgs):
200
if self._listener.hybrid_leech or not self._user_session:
201
msgs[index] = await self._listener.client.get_messages(
202
chat_id=msg[0], message_ids=msg[1]
203
)
204
else:
205
msgs[index] = await TgClient.user.get_messages(
206
chat_id=msg[0], message_ids=msg[1]
207
)
208
msgs_list = await msgs[0].reply_to_message.reply_media_group(
209
media=self._get_input_media(subkey, key),
210
disable_notification=True,
211
)
212
for msg in msgs:
213
if msg.link in self._msgs_dict:
214
del self._msgs_dict[msg.link]
215
await delete_message(msg)
216
del self._media_dict[key][subkey]
217
if self._files_links and (
218
self._listener.is_super_chat or self._listener.up_dest
219
):
220
for m in msgs_list:
221
self._msgs_dict[m.link] = m.caption
222
self._sent_msg = msgs_list[-1]
223
if self._base_msg:
224
await delete_message(self._base_msg)
225
self._base_msg = None
226
227
async def upload(self):
228
await self._user_settings()
229
res = await self._msg_to_reply()
230
if not res:
231
return
232
for dirpath, _, files in natsorted(await sync_to_async(walk, self._path)):
233
if dirpath.strip().endswith("/yt-dlp-thumb"):
234
continue
235
if dirpath.strip().endswith("_mltbss"):
236
await self._send_screenshots(dirpath, files)
237
await rmtree(dirpath, ignore_errors=True)
238
continue
239
for file_ in natsorted(files):
240
self._error = ""
241
self._up_path = f_path = ospath.join(dirpath, file_)
242
if not await aiopath.exists(self._up_path):
243
if intervals["stopAll"]:
244
return
245
LOGGER.error(f"{self._up_path} not exists! Continue uploading!")
246
continue
247
try:
248
f_size = await aiopath.getsize(self._up_path)
249
self._total_files += 1
250
if f_size == 0:
251
LOGGER.error(
252
f"{self._up_path} size is zero, telegram don't upload zero size files"
253
)
254
self._corrupted += 1
255
continue
256
if self._listener.is_cancelled:
257
return
258
cap_mono = await self._prepare_file(file_, dirpath)
259
if self._last_msg_in_group:
260
group_lists = [
261
x for v in self._media_dict.values() for x in v.keys()
262
]
263
match = re_match(r".+(?=\.0*\d+$)|.+(?=\.part\d+\..+$)", f_path)
264
if not match or match and match.group(0) not in group_lists:
265
for key, value in list(self._media_dict.items()):
266
for subkey, msgs in list(value.items()):
267
if len(msgs) > 1:
268
await self._send_media_group(subkey, key, msgs)
269
if self._listener.hybrid_leech and self._listener.user_transmission:
270
self._user_session = f_size > 2097152000
271
if self._user_session:
272
self._sent_msg = await TgClient.user.get_messages(
273
chat_id=self._sent_msg.chat.id,
274
message_ids=self._sent_msg.id,
275
)
276
else:
277
self._sent_msg = await self._listener.client.get_messages(
278
chat_id=self._sent_msg.chat.id,
279
message_ids=self._sent_msg.id,
280
)
281
self._last_msg_in_group = False
282
self._last_uploaded = 0
283
await self._upload_file(cap_mono, file_, f_path)
284
if not self._sent_msg.media_group_id:
285
for ch, ch_data in list(
286
self._listener.clone_dump_chats.items()
287
):
288
try:
289
res = await TgClient.bot.copy_message(
290
chat_id=ch,
291
from_chat_id=self._sent_msg.chat.id,
292
message_id=self._sent_msg.id,
293
message_thread_id=ch_data["thread_id"],
294
disable_notification=True,
295
reply_to_message_id=ch_data["last_sent_msg"],
296
)
297
self._listener.clone_dump_chats[ch][
298
"last_sent_msg"
299
] = res.id
300
except Exception as e:
301
LOGGER.error(
302
f"Can't forward message to clone dump chat: {ch}. Error: {e}"
303
)
304
if self._listener.is_cancelled:
305
return
306
if (
307
self._files_links
308
and not self._is_corrupted
309
and (self._listener.is_super_chat or self._listener.up_dest)
310
and not self._is_private
311
):
312
self._msgs_dict[self._sent_msg.link] = file_
313
await sleep(1)
314
except Exception as err:
315
if isinstance(err, RetryError):
316
LOGGER.info(
317
f"Total Attempts: {err.last_attempt.attempt_number}"
318
)
319
err = err.last_attempt.exception()
320
LOGGER.error(f"{err}. Path: {self._up_path}")
321
self._error = str(err)
322
self._corrupted += 1
323
if self._listener.is_cancelled:
324
return
325
if not self._listener.is_cancelled and await aiopath.exists(
326
self._up_path
327
):
328
await remove(self._up_path)
329
for key, value in list(self._media_dict.items()):
330
for subkey, msgs in list(value.items()):
331
if len(msgs) > 1:
332
try:
333
await self._send_media_group(subkey, key, msgs)
334
except Exception as e:
335
LOGGER.info(
336
f"While sending media group at the end of task. Error: {e}"
337
)
338
if self._base_msg:
339
await delete_message(self._base_msg)
340
self._base_msg = None
341
if self._listener.is_cancelled:
342
return
343
if self._total_files == 0:
344
await self._listener.on_upload_error(
345
"No files to upload. In case you have filled EXCLUDED/INCLUDED EXTENSIONS, then check if all files have those extensions or not."
346
)
347
return
348
if self._total_files <= self._corrupted:
349
await self._listener.on_upload_error(
350
f"Files Corrupted or unable to upload. {self._error or 'Check logs!'}"
351
)
352
return
353
LOGGER.info(f"Leech Completed: {self._listener.name}")
354
await self._listener.on_upload_complete(
355
None, self._msgs_dict, self._total_files, self._corrupted
356
)
357
return
358
359
@retry(
360
wait=wait_exponential(multiplier=2, min=4, max=8),
361
stop=stop_after_attempt(3),
362
retry=retry_if_exception_type(Exception),
363
)
364
async def _upload_file(self, cap_mono, file, o_path, force_document=False):
365
if (
366
self._thumb is not None
367
and not await aiopath.exists(self._thumb)
368
and self._thumb != "none"
369
):
370
self._thumb = None
371
thumb = self._thumb
372
self._is_corrupted = False
373
try:
374
is_video, is_audio, is_image = await get_document_type(self._up_path)
375
376
if not is_image and thumb is None:
377
file_name = ospath.splitext(file)[0]
378
thumb_path = f"{self._path}/yt-dlp-thumb/{file_name}.jpg"
379
if await aiopath.isfile(thumb_path):
380
thumb = thumb_path
381
elif await aiopath.isfile(thumb_path.replace("/yt-dlp-thumb", "")):
382
thumb = thumb_path.replace("/yt-dlp-thumb", "")
383
elif is_audio and not is_video:
384
thumb = await get_audio_thumbnail(self._up_path)
385
386
if (
387
self._listener.as_doc
388
or force_document
389
or (not is_video and not is_audio and not is_image)
390
):
391
key = "documents"
392
if is_video and thumb is None:
393
thumb = await get_video_thumbnail(self._up_path, None)
394
395
if self._listener.is_cancelled:
396
return
397
if thumb == "none":
398
thumb = None
399
self._sent_msg = await self._sent_msg.reply_document(
400
document=self._up_path,
401
thumb=thumb,
402
caption=cap_mono,
403
force_document=True,
404
disable_notification=True,
405
progress=self._upload_progress,
406
)
407
elif is_video:
408
key = "videos"
409
duration = (await get_media_info(self._up_path))[0]
410
if thumb is None and self._listener.thumbnail_layout:
411
thumb = await get_multiple_frames_thumbnail(
412
self._up_path,
413
self._listener.thumbnail_layout,
414
self._listener.screen_shots,
415
)
416
if thumb is None:
417
thumb = await get_video_thumbnail(self._up_path, duration)
418
if thumb is not None and thumb != "none":
419
with Image.open(thumb) as img:
420
width, height = img.size
421
else:
422
width = 480
423
height = 320
424
if self._listener.is_cancelled:
425
return
426
if thumb == "none":
427
thumb = None
428
self._sent_msg = await self._sent_msg.reply_video(
429
video=self._up_path,
430
caption=cap_mono,
431
duration=duration,
432
width=width,
433
height=height,
434
thumb=thumb,
435
supports_streaming=True,
436
disable_notification=True,
437
progress=self._upload_progress,
438
)
439
elif is_audio:
440
key = "audios"
441
duration, artist, title = await get_media_info(self._up_path)
442
if self._listener.is_cancelled:
443
return
444
if thumb == "none":
445
thumb = None
446
self._sent_msg = await self._sent_msg.reply_audio(
447
audio=self._up_path,
448
caption=cap_mono,
449
duration=duration,
450
performer=artist,
451
title=title,
452
thumb=thumb,
453
disable_notification=True,
454
progress=self._upload_progress,
455
)
456
else:
457
key = "photos"
458
if self._listener.is_cancelled:
459
return
460
self._sent_msg = await self._sent_msg.reply_photo(
461
photo=self._up_path,
462
caption=cap_mono,
463
disable_notification=True,
464
progress=self._upload_progress,
465
)
466
467
if (
468
not self._listener.is_cancelled
469
and self._media_group
470
and (self._sent_msg.video or self._sent_msg.document)
471
):
472
key = "documents" if self._sent_msg.document else "videos"
473
if match := re_match(r".+(?=\.0*\d+$)|.+(?=\.part\d+\..+$)", o_path):
474
475
pname = match.group(0)
476
if pname in self._media_dict[key].keys():
477
self._media_dict[key][pname].append(
478
[self._sent_msg.chat.id, self._sent_msg.id]
479
)
480
else:
481
self._media_dict[key][pname] = [
482
[self._sent_msg.chat.id, self._sent_msg.id]
483
]
484
msgs = self._media_dict[key][pname]
485
if len(msgs) == 10:
486
await self._send_media_group(pname, key, msgs)
487
else:
488
self._last_msg_in_group = True
489
if (
490
self._thumb is None
491
and thumb is not None
492
and await aiopath.exists(thumb)
493
):
494
await remove(thumb)
495
if self._base_msg and not self._last_msg_in_group:
496
await delete_message(self._base_msg)
497
self._base_msg = None
498
except (FloodWait, FloodPremiumWait) as f:
499
LOGGER.warning(str(f))
500
await sleep(f.value * 1.3)
501
if (
502
self._thumb is None
503
and thumb is not None
504
and await aiopath.exists(thumb)
505
):
506
await remove(thumb)
507
return await self._upload_file(cap_mono, file, o_path)
508
except Exception as err:
509
if (
510
self._thumb is None
511
and thumb is not None
512
and await aiopath.exists(thumb)
513
):
514
await remove(thumb)
515
err_type = "RPCError: " if isinstance(err, RPCError) else ""
516
LOGGER.error(f"{err_type}{err}. Path: {self._up_path}")
517
if isinstance(err, BadRequest) and key != "documents":
518
LOGGER.error(f"Retrying As Document. Path: {self._up_path}")
519
return await self._upload_file(cap_mono, file, o_path, True)
520
raise err
521
522
@property
523
def speed(self):
524
try:
525
return self._processed_bytes / (time() - self._start_time)
526
except:
527
return 0
528
529
@property
530
def processed_bytes(self):
531
return self._processed_bytes
532
533
async def cancel_task(self):
534
self._listener.is_cancelled = True
535
LOGGER.info(f"Cancelling Upload: {self._listener.name}")
536
await self._listener.on_upload_error("your upload has been stopped!")
537
538