Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/common.py
1630 views
1
from aiofiles.os import path as aiopath, remove, makedirs, listdir
2
from asyncio import sleep, gather
3
from os import walk, path as ospath
4
from secrets import token_urlsafe
5
from aioshutil import move, rmtree
6
from pyrogram.enums import ChatAction
7
from re import sub, I, findall
8
from shlex import split
9
from collections import Counter
10
from copy import deepcopy
11
12
from .. import (
13
user_data,
14
multi_tags,
15
LOGGER,
16
task_dict_lock,
17
task_dict,
18
excluded_extensions,
19
included_extensions,
20
cpu_eater_lock,
21
intervals,
22
DOWNLOAD_DIR,
23
cores,
24
)
25
from ..core.config_manager import Config
26
from ..core.telegram_manager import TgClient
27
from .ext_utils.bot_utils import new_task, sync_to_async, get_size_bytes
28
from .ext_utils.bulk_links import extract_bulk_links
29
from .mirror_leech_utils.gdrive_utils.list import GoogleDriveList
30
from .mirror_leech_utils.rclone_utils.list import RcloneList
31
from .mirror_leech_utils.status_utils.sevenz_status import SevenZStatus
32
from .mirror_leech_utils.status_utils.ffmpeg_status import FFmpegStatus
33
from .telegram_helper.bot_commands import BotCommands
34
from .ext_utils.files_utils import (
35
get_base_name,
36
is_first_archive_split,
37
is_archive,
38
is_archive_split,
39
get_path_size,
40
split_file,
41
SevenZ,
42
)
43
from .ext_utils.links_utils import (
44
is_gdrive_id,
45
is_rclone_path,
46
is_gdrive_link,
47
is_telegram_link,
48
)
49
from .ext_utils.media_utils import (
50
create_thumb,
51
take_ss,
52
get_document_type,
53
FFMpeg,
54
)
55
from .telegram_helper.message_utils import (
56
send_message,
57
send_status_message,
58
get_tg_link_message,
59
temp_download,
60
)
61
62
63
class TaskConfig:
64
def __init__(self):
65
self.mid = self.message.id
66
self.user = self.message.from_user or self.message.sender_chat
67
self.user_id = self.user.id
68
self.user_dict = user_data.get(self.user_id, {})
69
self.clone_dump_chats = {}
70
self.dir = f"{DOWNLOAD_DIR}{self.mid}"
71
self.up_dir = ""
72
self.link = ""
73
self.up_dest = ""
74
self.rc_flags = ""
75
self.tag = ""
76
self.name = ""
77
self.subname = ""
78
self.name_sub = ""
79
self.thumbnail_layout = ""
80
self.folder_name = ""
81
self.split_size = 0
82
self.max_split_size = 0
83
self.multi = 0
84
self.size = 0
85
self.subsize = 0
86
self.proceed_count = 0
87
self.is_leech = False
88
self.is_qbit = False
89
self.is_nzb = False
90
self.is_jd = False
91
self.is_clone = False
92
self.is_ytdlp = False
93
self.equal_splits = False
94
self.user_transmission = False
95
self.hybrid_leech = False
96
self.extract = False
97
self.compress = False
98
self.select = False
99
self.seed = False
100
self.compress = False
101
self.extract = False
102
self.join = False
103
self.private_link = False
104
self.stop_duplicate = False
105
self.sample_video = False
106
self.convert_audio = False
107
self.convert_video = False
108
self.screen_shots = False
109
self.is_cancelled = False
110
self.force_run = False
111
self.force_download = False
112
self.force_upload = False
113
self.is_torrent = False
114
self.as_med = False
115
self.as_doc = False
116
self.is_file = False
117
self.bot_trans = False
118
self.user_trans = False
119
self.is_rss = False
120
self.progress = True
121
self.ffmpeg_cmds = None
122
self.chat_thread_id = None
123
self.subproc = None
124
self.thumb = None
125
self.excluded_extensions = []
126
self.included_extensions = []
127
self.files_to_proceed = []
128
self.is_super_chat = self.message.chat.type.name in [
129
"SUPERGROUP",
130
"CHANNEL",
131
"FORUM",
132
]
133
134
def get_token_path(self, dest):
135
if dest.startswith("mtp:"):
136
return f"tokens/{self.user_id}.pickle"
137
elif (
138
dest.startswith("sa:")
139
or Config.USE_SERVICE_ACCOUNTS
140
and not dest.startswith("tp:")
141
):
142
return "accounts"
143
else:
144
return "token.pickle"
145
146
def get_config_path(self, dest):
147
return (
148
f"rclone/{self.user_id}.conf" if dest.startswith("mrcc:") else "rclone.conf"
149
)
150
151
async def is_token_exists(self, path, status):
152
if is_rclone_path(path):
153
config_path = self.get_config_path(path)
154
if config_path != "rclone.conf" and status == "up":
155
self.private_link = True
156
if not await aiopath.exists(config_path):
157
raise ValueError(f"Rclone Config: {config_path} not Exists!")
158
elif (
159
status == "dl"
160
and is_gdrive_link(path)
161
or status == "up"
162
and is_gdrive_id(path)
163
):
164
token_path = self.get_token_path(path)
165
if token_path.startswith("tokens/") and status == "up":
166
self.private_link = True
167
if not await aiopath.exists(token_path):
168
raise ValueError(f"NO TOKEN! {token_path} not Exists!")
169
170
async def before_start(self):
171
self.name_sub = (
172
self.name_sub
173
or self.user_dict.get("NAME_SUBSTITUTE", False)
174
or (
175
Config.NAME_SUBSTITUTE
176
if "NAME_SUBSTITUTE" not in self.user_dict
177
else ""
178
)
179
)
180
if self.name_sub:
181
self.name_sub = [x.split("/") for x in self.name_sub.split(" | ")]
182
self.excluded_extensions = self.user_dict.get("EXCLUDED_EXTENSIONS") or (
183
excluded_extensions
184
if "EXCLUDED_EXTENSIONS" not in self.user_dict
185
else ["aria2", "!qB"]
186
)
187
self.included_extensions = self.user_dict.get("INCLUDED_EXTENSIONS") or (
188
included_extensions if "INCLUDED_EXTENSIONS" not in self.user_dict else []
189
)
190
if not self.rc_flags:
191
if self.user_dict.get("RCLONE_FLAGS"):
192
self.rc_flags = self.user_dict["RCLONE_FLAGS"]
193
elif "RCLONE_FLAGS" not in self.user_dict and Config.RCLONE_FLAGS:
194
self.rc_flags = Config.RCLONE_FLAGS
195
if self.link not in ["rcl", "gdl"]:
196
if not self.is_jd:
197
if is_rclone_path(self.link):
198
if not self.link.startswith("mrcc:") and self.user_dict.get(
199
"USER_TOKENS", False
200
):
201
self.link = f"mrcc:{self.link}"
202
await self.is_token_exists(self.link, "dl")
203
elif is_gdrive_link(self.link):
204
if not self.link.startswith(
205
("mtp:", "tp:", "sa:")
206
) and self.user_dict.get("USER_TOKENS", False):
207
self.link = f"mtp:{self.link}"
208
await self.is_token_exists(self.link, "dl")
209
elif self.link == "rcl":
210
if not self.is_ytdlp and not self.is_jd:
211
self.link = await RcloneList(self).get_rclone_path("rcd")
212
if not is_rclone_path(self.link):
213
raise ValueError(self.link)
214
elif self.link == "gdl":
215
if not self.is_ytdlp and not self.is_jd:
216
self.link = await GoogleDriveList(self).get_target_id("gdd")
217
if not is_gdrive_id(self.link):
218
raise ValueError(self.link)
219
220
self.user_transmission = TgClient.IS_PREMIUM_USER and (
221
self.user_dict.get("USER_TRANSMISSION")
222
or Config.USER_TRANSMISSION
223
and "USER_TRANSMISSION" not in self.user_dict
224
)
225
226
if self.user_dict.get("UPLOAD_PATHS", False):
227
if self.up_dest in self.user_dict["UPLOAD_PATHS"]:
228
self.up_dest = self.user_dict["UPLOAD_PATHS"][self.up_dest]
229
elif (
230
"UPLOAD_PATHS" not in self.user_dict or not self.user_dict["UPLOAD_PATHS"]
231
) and Config.UPLOAD_PATHS:
232
if self.up_dest in Config.UPLOAD_PATHS:
233
self.up_dest = Config.UPLOAD_PATHS[self.up_dest]
234
235
if self.ffmpeg_cmds:
236
if self.user_dict.get("FFMPEG_CMDS", None):
237
ffmpeg_dict = deepcopy(self.user_dict["FFMPEG_CMDS"])
238
elif (
239
"FFMPEG_CMDS" not in self.user_dict or not self.user_dict["FFMPEG_CMDS"]
240
) and Config.FFMPEG_CMDS:
241
ffmpeg_dict = deepcopy(Config.FFMPEG_CMDS)
242
else:
243
ffmpeg_dict = None
244
cmds = []
245
for key in list(self.ffmpeg_cmds):
246
if isinstance(key, tuple):
247
cmds.extend(list(key))
248
elif ffmpeg_dict is not None:
249
if key in ffmpeg_dict.keys():
250
for ind, vl in enumerate(ffmpeg_dict[key]):
251
if variables := set(findall(r"\{(.*?)\}", vl)):
252
ff_values = (
253
self.user_dict.get("FFMPEG_VARIABLES", {})
254
.get(key, {})
255
.get(str(ind), {})
256
)
257
if Counter(list(variables)) == Counter(
258
list(ff_values.keys())
259
):
260
cmds.append(vl.format(**ff_values))
261
else:
262
cmds.append(vl)
263
self.ffmpeg_cmds = cmds
264
265
if not self.is_leech:
266
self.stop_duplicate = (
267
self.user_dict.get("STOP_DUPLICATE")
268
or "STOP_DUPLICATE" not in self.user_dict
269
and Config.STOP_DUPLICATE
270
)
271
default_upload = (
272
self.user_dict.get("DEFAULT_UPLOAD", "") or Config.DEFAULT_UPLOAD
273
)
274
if (not self.up_dest and default_upload == "rc") or self.up_dest == "rc":
275
self.up_dest = self.user_dict.get("RCLONE_PATH") or Config.RCLONE_PATH
276
elif (not self.up_dest and default_upload == "gd") or self.up_dest == "gd":
277
self.up_dest = self.user_dict.get("GDRIVE_ID") or Config.GDRIVE_ID
278
if not self.up_dest:
279
raise ValueError("No Upload Destination!")
280
if self.up_dest not in ["rcl", "gdl"]:
281
if is_gdrive_id(self.up_dest):
282
if not self.up_dest.startswith(
283
("mtp:", "tp:", "sa:")
284
) and self.user_dict.get("USER_TOKENS", False):
285
self.up_dest = f"mtp:{self.up_dest}"
286
elif is_rclone_path(self.up_dest):
287
if not self.up_dest.startswith("mrcc:") and self.user_dict.get(
288
"USER_TOKENS", False
289
):
290
self.up_dest = f"mrcc:{self.up_dest}"
291
self.up_dest = self.up_dest.strip("/")
292
else:
293
raise ValueError("Wrong Upload Destination!")
294
await self.is_token_exists(self.up_dest, "up")
295
296
if self.up_dest == "rcl":
297
if self.is_clone:
298
if not is_rclone_path(self.link):
299
raise ValueError(
300
"You can't clone from different types of tools"
301
)
302
config_path = self.get_config_path(self.link)
303
else:
304
config_path = None
305
self.up_dest = await RcloneList(self).get_rclone_path(
306
"rcu", config_path
307
)
308
if not is_rclone_path(self.up_dest):
309
raise ValueError(self.up_dest)
310
elif self.up_dest == "gdl":
311
if self.is_clone:
312
if not is_gdrive_link(self.link):
313
raise ValueError(
314
"You can't clone from different types of tools"
315
)
316
token_path = self.get_token_path(self.link)
317
else:
318
token_path = None
319
self.up_dest = await GoogleDriveList(self).get_target_id(
320
"gdu", token_path
321
)
322
if not is_gdrive_id(self.up_dest):
323
raise ValueError(self.up_dest)
324
elif self.is_clone:
325
if is_gdrive_link(self.link) and self.get_token_path(
326
self.link
327
) != self.get_token_path(self.up_dest):
328
raise ValueError("You must use the same token to clone!")
329
elif is_rclone_path(self.link) and self.get_config_path(
330
self.link
331
) != self.get_config_path(self.up_dest):
332
raise ValueError("You must use the same config to clone!")
333
else:
334
self.up_dest = (
335
self.up_dest
336
or self.user_dict.get("LEECH_DUMP_CHAT")
337
or (
338
Config.LEECH_DUMP_CHAT
339
if "LEECH_DUMP_CHAT" not in self.user_dict
340
else None
341
)
342
)
343
self.hybrid_leech = TgClient.IS_PREMIUM_USER and (
344
self.user_dict.get("HYBRID_LEECH")
345
or Config.HYBRID_LEECH
346
and "HYBRID_LEECH" not in self.user_dict
347
)
348
if self.bot_trans:
349
self.user_transmission = False
350
self.hybrid_leech = False
351
if self.user_trans:
352
self.user_transmission = TgClient.IS_PREMIUM_USER
353
if self.up_dest:
354
if not isinstance(self.up_dest, int):
355
if self.up_dest.startswith("b:"):
356
self.up_dest = self.up_dest.replace("b:", "", 1)
357
self.user_transmission = False
358
self.hybrid_leech = False
359
elif self.up_dest.startswith("u:"):
360
self.up_dest = self.up_dest.replace("u:", "", 1)
361
self.user_transmission = TgClient.IS_PREMIUM_USER
362
elif self.up_dest.startswith("h:"):
363
self.up_dest = self.up_dest.replace("h:", "", 1)
364
self.user_transmission = TgClient.IS_PREMIUM_USER
365
self.hybrid_leech = self.user_transmission
366
if "|" in self.up_dest:
367
self.up_dest, self.chat_thread_id = list(
368
map(
369
lambda x: int(x) if x.lstrip("-").isdigit() else x,
370
self.up_dest.split("|", 1),
371
)
372
)
373
elif self.up_dest.lstrip("-").isdigit():
374
self.up_dest = int(self.up_dest)
375
elif self.up_dest.lower() == "pm":
376
self.up_dest = self.user_id
377
378
if self.user_transmission:
379
try:
380
chat = await TgClient.user.get_chat(self.up_dest)
381
except:
382
chat = None
383
if chat is None:
384
LOGGER.warning(
385
"Account of user session can't find the the destination chat!"
386
)
387
self.user_transmission = False
388
self.hybrid_leech = False
389
else:
390
if chat.type.name not in [
391
"SUPERGROUP",
392
"CHANNEL",
393
"GROUP",
394
"FORUM",
395
]:
396
self.user_transmission = False
397
self.hybrid_leech = False
398
elif chat.is_admin:
399
member = await chat.get_member(TgClient.user.me.id)
400
if (
401
not member.privileges.can_manage_chat
402
or not member.privileges.can_delete_messages
403
):
404
self.user_transmission = False
405
self.hybrid_leech = False
406
LOGGER.warning(
407
"Enable manage chat and delete messages to account of the user session from administration settings!"
408
)
409
else:
410
LOGGER.warning(
411
"Promote the account of the user session to admin in the chat to get the benefit of user transmission!"
412
)
413
self.user_transmission = False
414
self.hybrid_leech = False
415
416
if not self.user_transmission or self.hybrid_leech:
417
try:
418
chat = await self.client.get_chat(self.up_dest)
419
except:
420
chat = None
421
if chat is None:
422
if self.user_transmission:
423
self.hybrid_leech = False
424
else:
425
raise ValueError("Chat not found!")
426
else:
427
if chat.type.name in [
428
"SUPERGROUP",
429
"CHANNEL",
430
"GROUP",
431
"FORUM",
432
]:
433
if not chat.is_admin:
434
raise ValueError(
435
"Bot is not admin in the destination chat!"
436
)
437
else:
438
member = await chat.get_member(self.client.me.id)
439
if (
440
not member.privileges.can_manage_chat
441
or not member.privileges.can_delete_messages
442
):
443
if not self.user_transmission:
444
raise ValueError(
445
"You don't have enough privileges in this chat! Enable manage chat and delete messages for this bot!"
446
)
447
else:
448
self.hybrid_leech = False
449
else:
450
try:
451
await self.client.send_chat_action(
452
self.up_dest, ChatAction.TYPING
453
)
454
except:
455
raise ValueError("Start the bot and try again!")
456
elif (
457
self.user_transmission or self.hybrid_leech
458
) and not self.is_super_chat:
459
self.user_transmission = False
460
self.hybrid_leech = False
461
if self.split_size:
462
if self.split_size.isdigit():
463
self.split_size = int(self.split_size)
464
else:
465
self.split_size = get_size_bytes(self.split_size)
466
self.split_size = (
467
self.split_size
468
or self.user_dict.get("LEECH_SPLIT_SIZE")
469
or Config.LEECH_SPLIT_SIZE
470
)
471
self.equal_splits = (
472
self.user_dict.get("EQUAL_SPLITS")
473
or Config.EQUAL_SPLITS
474
and "EQUAL_SPLITS" not in self.user_dict
475
)
476
self.max_split_size = (
477
TgClient.MAX_SPLIT_SIZE if self.user_transmission else 2097152000
478
)
479
self.split_size = min(self.split_size, self.max_split_size)
480
481
if not self.as_doc:
482
self.as_doc = (
483
not self.as_med
484
if self.as_med
485
else (
486
self.user_dict.get("AS_DOCUMENT", False)
487
or Config.AS_DOCUMENT
488
and "AS_DOCUMENT" not in self.user_dict
489
)
490
)
491
492
self.thumbnail_layout = (
493
self.thumbnail_layout
494
or self.user_dict.get("THUMBNAIL_LAYOUT", False)
495
or (
496
Config.THUMBNAIL_LAYOUT
497
if "THUMBNAIL_LAYOUT" not in self.user_dict
498
else ""
499
)
500
)
501
502
self.clone_dump_chats = self.user_dict.get("CLONE_DUMP_CHATS", {}) or (
503
Config.CLONE_DUMP_CHATS
504
if "CLONE_DUMP_CHATS" not in self.user_dict and Config.CLONE_DUMP_CHATS
505
else {}
506
)
507
if self.clone_dump_chats:
508
if isinstance(self.clone_dump_chats, int):
509
self.clone_dump_chats = [self.clone_dump_chats]
510
elif isinstance(self.clone_dump_chats, str):
511
if self.clone_dump_chats.startswith(
512
"["
513
) and self.clone_dump_chats.endswith("]"):
514
self.clone_dump_chats = eval(self.clone_dump_chats)
515
else:
516
self.clone_dump_chats = [self.clone_dump_chats]
517
temp_dict = {}
518
for ch in self.clone_dump_chats:
519
if isinstance(ch, str) and "|" in ch:
520
ci, ti = map(
521
lambda x: int(x) if x.lstrip("-").isdigit() else x,
522
ch.split("|", 1),
523
)
524
temp_dict[ci] = {"thread_id": ti, "last_sent_msg": None}
525
elif isinstance(ch, str):
526
if ch.lower() == "pm":
527
ci = self.user_id
528
else:
529
ci = int(ch) if ch.lstrip("-").isdigit() else ch
530
temp_dict[ci] = {"thread_id": None, "last_sent_msg": None}
531
else:
532
temp_dict[ch] = {"thread_id": None, "last_sent_msg": None}
533
self.clone_dump_chats = temp_dict
534
if self.thumb != "none" and is_telegram_link(self.thumb):
535
msg = (await get_tg_link_message(self.thumb))[0]
536
self.thumb = (
537
await create_thumb(msg) if msg.photo or msg.document else ""
538
)
539
540
async def get_tag(self, text: list):
541
if len(text) > 1 and text[1].startswith("Tag: "):
542
self.is_rss = True
543
user_info = text[1].split("Tag: ")
544
if len(user_info) >= 3:
545
id_ = user_info[-1]
546
self.tag = " ".join(user_info[:-1])
547
else:
548
self.tag, id_ = text[1].split("Tag: ")[1].split()
549
self.user = self.message.from_user = await self.client.get_users(int(id_))
550
self.user_id = self.user.id
551
self.user_dict = user_data.get(self.user_id, {})
552
try:
553
await self.message.unpin()
554
except:
555
pass
556
if self.user:
557
if username := self.user.username:
558
self.tag = f"@{username}"
559
elif hasattr(self.user, "mention"):
560
self.tag = self.user.mention
561
else:
562
self.tag = self.user.title
563
564
@new_task
565
async def run_multi(self, input_list, obj):
566
await sleep(7)
567
if not self.multi_tag and self.multi > 1:
568
self.multi_tag = token_urlsafe(3)
569
multi_tags.add(self.multi_tag)
570
elif self.multi <= 1:
571
if self.multi_tag in multi_tags:
572
multi_tags.discard(self.multi_tag)
573
return
574
if self.multi_tag and self.multi_tag not in multi_tags:
575
await send_message(
576
self.message, f"{self.tag} Multi Task has been cancelled!"
577
)
578
await send_status_message(self.message)
579
async with task_dict_lock:
580
for fd_name in self.same_dir:
581
self.same_dir[fd_name]["total"] -= self.multi
582
return
583
if len(self.bulk) != 0:
584
msg = input_list[:1]
585
msg.append(f"{self.bulk[0]} -i {self.multi - 1} {self.options}")
586
msgts = " ".join(msg)
587
if self.multi > 2:
588
msgts += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"
589
nextmsg = await send_message(self.message, msgts)
590
else:
591
msg = [s.strip() for s in input_list]
592
index = msg.index("-i")
593
msg[index + 1] = f"{self.multi - 1}"
594
nextmsg = await self.client.get_messages(
595
chat_id=self.message.chat.id,
596
message_ids=self.message.reply_to_message_id + 1,
597
)
598
msgts = " ".join(msg)
599
if self.multi > 2:
600
msgts += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"
601
nextmsg = await send_message(nextmsg, msgts)
602
nextmsg = await self.client.get_messages(
603
chat_id=self.message.chat.id, message_ids=nextmsg.id
604
)
605
if self.message.from_user:
606
nextmsg.from_user = self.user
607
else:
608
nextmsg.sender_chat = self.user
609
if intervals["stopAll"]:
610
return
611
await obj(
612
self.client,
613
nextmsg,
614
self.is_qbit,
615
self.is_leech,
616
self.is_jd,
617
self.is_nzb,
618
self.same_dir,
619
self.bulk,
620
self.multi_tag,
621
self.options,
622
).new_event()
623
624
async def init_bulk(self, input_list, bulk_start, bulk_end, obj):
625
try:
626
self.bulk = await extract_bulk_links(self.message, bulk_start, bulk_end)
627
if len(self.bulk) == 0:
628
raise ValueError("Bulk Empty!")
629
b_msg = input_list[:1]
630
self.options = input_list[1:]
631
index = self.options.index("-b")
632
del self.options[index]
633
if bulk_start or bulk_end:
634
del self.options[index]
635
self.options = " ".join(self.options)
636
b_msg.append(f"{self.bulk[0]} -i {len(self.bulk)} {self.options}")
637
msg = " ".join(b_msg)
638
if len(self.bulk) > 2:
639
self.multi_tag = token_urlsafe(3)
640
multi_tags.add(self.multi_tag)
641
msg += f"\nCancel Multi: <code>/{BotCommands.CancelTaskCommand[1]} {self.multi_tag}</code>"
642
nextmsg = await send_message(self.message, msg)
643
nextmsg = await self.client.get_messages(
644
chat_id=self.message.chat.id, message_ids=nextmsg.id
645
)
646
if self.message.from_user:
647
nextmsg.from_user = self.user
648
else:
649
nextmsg.sender_chat = self.user
650
await obj(
651
self.client,
652
nextmsg,
653
self.is_qbit,
654
self.is_leech,
655
self.is_jd,
656
self.is_nzb,
657
self.same_dir,
658
self.bulk,
659
self.multi_tag,
660
self.options,
661
).new_event()
662
except Exception as e:
663
await send_message(
664
self.message,
665
f"Reply to text file or to telegram message that have links separated by new line! {e}",
666
)
667
668
async def proceed_extract(self, dl_path, gid):
669
pswd = self.extract if isinstance(self.extract, str) else ""
670
self.files_to_proceed = []
671
if self.is_file and is_archive(dl_path):
672
self.files_to_proceed.append(dl_path)
673
else:
674
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
675
for file_ in files:
676
if (
677
is_first_archive_split(file_)
678
or is_archive(file_)
679
and not file_.strip().lower().endswith(".rar")
680
):
681
f_path = ospath.join(dirpath, file_)
682
self.files_to_proceed.append(f_path)
683
684
if not self.files_to_proceed:
685
return dl_path
686
t_path = dl_path
687
sevenz = SevenZ(self)
688
LOGGER.info(f"Extracting: {self.name}")
689
async with task_dict_lock:
690
task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Extract")
691
for dirpath, _, files in await sync_to_async(
692
walk, self.up_dir or self.dir, topdown=False
693
):
694
code = 0
695
for file_ in files:
696
if self.is_cancelled:
697
return False
698
if (
699
is_first_archive_split(file_)
700
or is_archive(file_)
701
and not file_.strip().lower().endswith(".rar")
702
):
703
704
self.proceed_count += 1
705
f_path = ospath.join(dirpath, file_)
706
t_path = get_base_name(f_path) if self.is_file else dirpath
707
if not self.is_file:
708
self.subname = file_
709
code = await sevenz.extract(f_path, t_path, pswd)
710
if self.is_cancelled:
711
return code
712
if code == 0:
713
for file_ in files:
714
if is_archive_split(file_) or is_archive(file_):
715
del_path = ospath.join(dirpath, file_)
716
try:
717
await remove(del_path)
718
except:
719
self.is_cancelled = True
720
if self.proceed_count == 0:
721
LOGGER.info("No files able to extract!")
722
return t_path if self.is_file and code == 0 else dl_path
723
724
async def proceed_ffmpeg(self, dl_path, gid):
725
checked = False
726
inputs = {}
727
cmds = [
728
[part.strip() for part in split(item) if part.strip()]
729
for item in self.ffmpeg_cmds
730
]
731
try:
732
ffmpeg = FFMpeg(self)
733
for ffmpeg_cmd in cmds:
734
self.proceed_count = 0
735
cmd = [
736
"taskset",
737
"-c",
738
f"{cores}",
739
"ffmpeg",
740
"-hide_banner",
741
"-loglevel",
742
"error",
743
"-progress",
744
"pipe:1",
745
] + ffmpeg_cmd
746
if "-del" in cmd:
747
cmd.remove("-del")
748
delete_files = True
749
else:
750
delete_files = False
751
input_indexes = [
752
index for index, value in enumerate(cmd) if value == "-i"
753
]
754
input_file = next(
755
(
756
cmd[index + 1]
757
for index in input_indexes
758
if cmd[index + 1].startswith("mltb")
759
),
760
"",
761
)
762
if not input_file:
763
LOGGER.error("Wrong FFmpeg cmd!")
764
return dl_path
765
if input_file.strip().endswith(".video"):
766
ext = "video"
767
elif input_file.strip().endswith(".audio"):
768
ext = "audio"
769
elif "." not in input_file:
770
ext = "all"
771
else:
772
ext = ospath.splitext(input_file)[-1].lower()
773
if await aiopath.isfile(dl_path):
774
is_video, is_audio, _ = await get_document_type(dl_path)
775
if not is_video and not is_audio:
776
break
777
elif is_video and ext == "audio":
778
break
779
elif is_audio and not is_video and ext == "video":
780
break
781
elif ext not in [
782
"all",
783
"audio",
784
"video",
785
] and not dl_path.strip().lower().endswith(ext):
786
break
787
new_folder = ospath.splitext(dl_path)[0]
788
name = ospath.basename(dl_path)
789
await makedirs(new_folder, exist_ok=True)
790
file_path = f"{new_folder}/{name}"
791
await move(dl_path, file_path)
792
if not checked:
793
checked = True
794
async with task_dict_lock:
795
task_dict[self.mid] = FFmpegStatus(
796
self, ffmpeg, gid, "FFmpeg"
797
)
798
self.progress = False
799
await cpu_eater_lock.acquire()
800
self.progress = True
801
LOGGER.info(f"Running ffmpeg cmd for: {file_path}")
802
var_cmd = cmd.copy()
803
for index in input_indexes:
804
if cmd[index + 1].startswith("mltb"):
805
var_cmd[index + 1] = file_path
806
elif is_telegram_link(cmd[index + 1]):
807
msg = (await get_tg_link_message(cmd[index + 1]))[0]
808
file_dir = await temp_download(msg)
809
inputs[index + 1] = file_dir
810
var_cmd[index + 1] = file_dir
811
self.subsize = self.size
812
res = await ffmpeg.ffmpeg_cmds(var_cmd, file_path)
813
if res:
814
if delete_files:
815
await remove(file_path)
816
if len(await listdir(new_folder)) == 1:
817
folder = new_folder.rsplit("/", 1)[0]
818
self.name = ospath.basename(res[0])
819
if self.name.startswith("ffmpeg"):
820
self.name = self.name.split(".", 1)[-1]
821
dl_path = ospath.join(folder, self.name)
822
await move(res[0], dl_path)
823
await rmtree(new_folder)
824
else:
825
dl_path = new_folder
826
self.name = new_folder.rsplit("/", 1)[-1]
827
else:
828
dl_path = new_folder
829
self.name = new_folder.rsplit("/", 1)[-1]
830
else:
831
await move(file_path, dl_path)
832
await rmtree(new_folder)
833
else:
834
for dirpath, _, files in await sync_to_async(
835
walk, dl_path, topdown=False
836
):
837
for file_ in files:
838
var_cmd = cmd.copy()
839
if self.is_cancelled:
840
return False
841
f_path = ospath.join(dirpath, file_)
842
is_video, is_audio, _ = await get_document_type(f_path)
843
if not is_video and not is_audio:
844
continue
845
elif is_video and ext == "audio":
846
continue
847
elif is_audio and not is_video and ext == "video":
848
continue
849
elif ext not in [
850
"all",
851
"audio",
852
"video",
853
] and not f_path.strip().lower().endswith(ext):
854
continue
855
self.proceed_count += 1
856
for index in input_indexes:
857
if cmd[index + 1].startswith("mltb"):
858
var_cmd[index + 1] = f_path
859
elif is_telegram_link(cmd[index + 1]):
860
msg = (await get_tg_link_message(cmd[index + 1]))[0]
861
file_dir = await temp_download(msg)
862
inputs[index + 1] = file_dir
863
var_cmd[index + 1] = file_dir
864
if not checked:
865
checked = True
866
async with task_dict_lock:
867
task_dict[self.mid] = FFmpegStatus(
868
self, ffmpeg, gid, "FFmpeg"
869
)
870
self.progress = False
871
await cpu_eater_lock.acquire()
872
self.progress = True
873
LOGGER.info(f"Running ffmpeg cmd for: {f_path}")
874
self.subsize = await get_path_size(f_path)
875
self.subname = file_
876
res = await ffmpeg.ffmpeg_cmds(var_cmd, f_path)
877
if res and delete_files:
878
await remove(f_path)
879
if len(res) == 1:
880
file_name = ospath.basename(res[0])
881
if file_name.startswith("ffmpeg"):
882
newname = file_name.split(".", 1)[-1]
883
newres = ospath.join(dirpath, newname)
884
await move(res[0], newres)
885
for inp in inputs.values():
886
if "/temp/" in inp and aiopath.exists(inp):
887
await remove(inp)
888
finally:
889
if checked:
890
cpu_eater_lock.release()
891
return dl_path
892
893
async def substitute(self, dl_path):
894
def perform_substitution(name, substitutions):
895
for substitution in substitutions:
896
sen = False
897
pattern = substitution[0]
898
if pattern.startswith('"') and pattern.endswith('"'):
899
pattern = pattern.strip('"')
900
if len(substitution) > 1:
901
if len(substitution) > 2:
902
sen = substitution[2] == "s"
903
res = substitution[1]
904
elif len(substitution[1]) == 0:
905
res = " "
906
else:
907
res = substitution[1]
908
else:
909
res = ""
910
try:
911
name = sub(pattern, res, name, flags=I if sen else 0)
912
except Exception as e:
913
LOGGER.error(
914
f"Substitute Error: pattern: {pattern} res: {res}. Error: {e}"
915
)
916
return False
917
if len(name.encode()) > 255:
918
LOGGER.error(f"Substitute: {name} is too long")
919
return False
920
return name
921
922
if self.is_file:
923
up_dir, name = dl_path.rsplit("/", 1)
924
new_name = perform_substitution(name, self.name_sub)
925
if not new_name:
926
return dl_path
927
new_path = ospath.join(up_dir, new_name)
928
await move(dl_path, new_path)
929
return new_path
930
else:
931
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
932
for file_ in files:
933
f_path = ospath.join(dirpath, file_)
934
new_name = perform_substitution(file_, self.name_sub)
935
if not new_name:
936
continue
937
await move(f_path, ospath.join(dirpath, new_name))
938
return dl_path
939
940
async def generate_screenshots(self, dl_path):
941
ss_nb = int(self.screen_shots) if isinstance(self.screen_shots, str) else 10
942
if self.is_file:
943
if (await get_document_type(dl_path))[0]:
944
LOGGER.info(f"Creating Screenshot for: {dl_path}")
945
res = await take_ss(dl_path, ss_nb)
946
if res:
947
new_folder = ospath.splitext(dl_path)[0]
948
name = ospath.basename(dl_path)
949
await makedirs(new_folder, exist_ok=True)
950
await gather(
951
move(dl_path, f"{new_folder}/{name}"),
952
move(res, new_folder),
953
)
954
return new_folder
955
else:
956
LOGGER.info(f"Creating Screenshot for: {dl_path}")
957
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
958
for file_ in files:
959
f_path = ospath.join(dirpath, file_)
960
if (await get_document_type(f_path))[0]:
961
await take_ss(f_path, ss_nb)
962
return dl_path
963
964
async def convert_media(self, dl_path, gid):
965
fvext = []
966
if self.convert_video:
967
vdata = self.convert_video.split()
968
vext = vdata[0].lower()
969
if len(vdata) > 2:
970
if "+" in vdata[1].split():
971
vstatus = "+"
972
elif "-" in vdata[1].split():
973
vstatus = "-"
974
else:
975
vstatus = ""
976
fvext.extend(f".{ext.lower()}" for ext in vdata[2:])
977
else:
978
vstatus = ""
979
else:
980
vext = ""
981
vstatus = ""
982
983
faext = []
984
if self.convert_audio:
985
adata = self.convert_audio.split()
986
aext = adata[0].lower()
987
if len(adata) > 2:
988
if "+" in adata[1].split():
989
astatus = "+"
990
elif "-" in adata[1].split():
991
astatus = "-"
992
else:
993
astatus = ""
994
faext.extend(f".{ext.lower()}" for ext in adata[2:])
995
else:
996
astatus = ""
997
else:
998
aext = ""
999
astatus = ""
1000
1001
self.files_to_proceed = {}
1002
all_files = []
1003
if self.is_file:
1004
all_files.append(dl_path)
1005
else:
1006
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
1007
for file_ in files:
1008
f_path = ospath.join(dirpath, file_)
1009
all_files.append(f_path)
1010
1011
for f_path in all_files:
1012
is_video, is_audio, _ = await get_document_type(f_path)
1013
if (
1014
is_video
1015
and vext
1016
and not f_path.strip().lower().endswith(f".{vext}")
1017
and (
1018
vstatus == "+"
1019
and f_path.strip().lower().endswith(tuple(fvext))
1020
or vstatus == "-"
1021
and not f_path.strip().lower().endswith(tuple(fvext))
1022
or not vstatus
1023
)
1024
):
1025
self.files_to_proceed[f_path] = "video"
1026
elif (
1027
is_audio
1028
and aext
1029
and not is_video
1030
and not f_path.strip().lower().endswith(f".{aext}")
1031
and (
1032
astatus == "+"
1033
and f_path.strip().lower().endswith(tuple(faext))
1034
or astatus == "-"
1035
and not f_path.strip().lower().endswith(tuple(faext))
1036
or not astatus
1037
)
1038
):
1039
self.files_to_proceed[f_path] = "audio"
1040
del all_files
1041
1042
if self.files_to_proceed:
1043
ffmpeg = FFMpeg(self)
1044
async with task_dict_lock:
1045
task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Convert")
1046
self.progress = False
1047
async with cpu_eater_lock:
1048
self.progress = True
1049
for f_path, f_type in self.files_to_proceed.items():
1050
self.proceed_count += 1
1051
LOGGER.info(f"Converting: {f_path}")
1052
if self.is_file:
1053
self.subsize = self.size
1054
else:
1055
self.subsize = await get_path_size(f_path)
1056
self.subname = ospath.basename(f_path)
1057
if f_type == "video":
1058
res = await ffmpeg.convert_video(f_path, vext)
1059
else:
1060
res = await ffmpeg.convert_audio(f_path, aext)
1061
if res:
1062
try:
1063
await remove(f_path)
1064
except:
1065
self.is_cancelled = True
1066
return False
1067
if self.is_file:
1068
return res
1069
return dl_path
1070
1071
async def generate_sample_video(self, dl_path, gid):
1072
data = (
1073
self.sample_video.split(":") if isinstance(self.sample_video, str) else ""
1074
)
1075
if data:
1076
sample_duration = int(data[0]) if data[0] else 60
1077
part_duration = int(data[1]) if len(data) > 1 else 4
1078
else:
1079
sample_duration = 60
1080
part_duration = 4
1081
1082
self.files_to_proceed = {}
1083
if self.is_file and (await get_document_type(dl_path))[0]:
1084
file_ = ospath.basename(dl_path)
1085
self.files_to_proceed[dl_path] = file_
1086
else:
1087
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
1088
for file_ in files:
1089
f_path = ospath.join(dirpath, file_)
1090
if (await get_document_type(f_path))[0]:
1091
self.files_to_proceed[f_path] = file_
1092
if self.files_to_proceed:
1093
ffmpeg = FFMpeg(self)
1094
async with task_dict_lock:
1095
task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Sample Video")
1096
self.progress = False
1097
async with cpu_eater_lock:
1098
self.progress = True
1099
LOGGER.info(f"Creating Sample video: {self.name}")
1100
for f_path, file_ in self.files_to_proceed.items():
1101
self.proceed_count += 1
1102
if self.is_file:
1103
self.subsize = self.size
1104
else:
1105
self.subsize = await get_path_size(f_path)
1106
self.subname = file_
1107
res = await ffmpeg.sample_video(
1108
f_path, sample_duration, part_duration
1109
)
1110
if res and self.is_file:
1111
new_folder = ospath.splitext(f_path)[0]
1112
await makedirs(new_folder, exist_ok=True)
1113
await gather(
1114
move(f_path, f"{new_folder}/{file_}"),
1115
move(res, f"{new_folder}/SAMPLE.{file_}"),
1116
)
1117
return new_folder
1118
return dl_path
1119
1120
async def proceed_compress(self, dl_path, gid):
1121
pswd = self.compress if isinstance(self.compress, str) else ""
1122
if self.is_leech and self.is_file:
1123
new_folder = ospath.splitext(dl_path)[0]
1124
name = ospath.basename(dl_path)
1125
await makedirs(new_folder, exist_ok=True)
1126
new_dl_path = f"{new_folder}/{name}"
1127
await move(dl_path, new_dl_path)
1128
dl_path = new_dl_path
1129
up_path = f"{new_dl_path}.zip"
1130
self.is_file = False
1131
else:
1132
up_path = f"{dl_path}.zip"
1133
sevenz = SevenZ(self)
1134
async with task_dict_lock:
1135
task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Zip")
1136
return await sevenz.zip(dl_path, up_path, pswd)
1137
1138
async def proceed_split(self, dl_path, gid):
1139
self.files_to_proceed = {}
1140
if self.is_file:
1141
f_size = await get_path_size(dl_path)
1142
if f_size > self.split_size:
1143
self.files_to_proceed[dl_path] = [f_size, ospath.basename(dl_path)]
1144
else:
1145
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
1146
for file_ in files:
1147
f_path = ospath.join(dirpath, file_)
1148
f_size = await get_path_size(f_path)
1149
if f_size > self.split_size:
1150
self.files_to_proceed[f_path] = [f_size, file_]
1151
if self.files_to_proceed:
1152
ffmpeg = FFMpeg(self)
1153
async with task_dict_lock:
1154
task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Split")
1155
LOGGER.info(f"Splitting: {self.name}")
1156
for f_path, (f_size, file_) in self.files_to_proceed.items():
1157
self.proceed_count += 1
1158
if self.is_file:
1159
self.subsize = self.size
1160
else:
1161
self.subsize = f_size
1162
self.subname = file_
1163
parts = -(-f_size // self.split_size)
1164
if self.equal_splits:
1165
split_size = (f_size // parts) + (f_size % parts)
1166
else:
1167
split_size = self.split_size
1168
if not self.as_doc and (await get_document_type(f_path))[0]:
1169
self.progress = True
1170
res = await ffmpeg.split(f_path, file_, parts, split_size)
1171
else:
1172
self.progress = False
1173
res = await split_file(f_path, split_size, self)
1174
if self.is_cancelled:
1175
return False
1176
if res or f_size >= self.max_split_size:
1177
try:
1178
await remove(f_path)
1179
except:
1180
self.is_cancelled = True
1181
1182