Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/ext_utils/task_manager.py
1628 views
1
from asyncio import Event
2
3
from ... import (
4
queued_dl,
5
queued_up,
6
non_queued_up,
7
non_queued_dl,
8
queue_dict_lock,
9
LOGGER,
10
)
11
from ...core.config_manager import Config
12
from ..mirror_leech_utils.gdrive_utils.search import GoogleDriveSearch
13
from .bot_utils import sync_to_async, get_telegraph_list
14
from .files_utils import get_base_name
15
from .links_utils import is_gdrive_id
16
17
18
async def stop_duplicate_check(listener):
19
if (
20
listener.is_leech
21
or not listener.stop_duplicate
22
or listener.same_dir
23
or listener.select
24
or not is_gdrive_id(listener.up_dest)
25
):
26
return False, None
27
28
name = listener.name
29
LOGGER.info(f"Checking File/Folder if already in Drive: {name}")
30
31
if listener.compress:
32
name = f"{name}.zip"
33
elif listener.extract:
34
try:
35
name = get_base_name(name)
36
except:
37
name = None
38
39
if name is not None:
40
telegraph_content, contents_no = await sync_to_async(
41
GoogleDriveSearch(stop_dup=True, no_multi=listener.is_clone).drive_list,
42
name,
43
listener.up_dest,
44
listener.user_id,
45
)
46
if telegraph_content:
47
msg = f"File/Folder is already available in Drive.\nHere are {contents_no} list results:"
48
button = await get_telegraph_list(telegraph_content)
49
return msg, button
50
51
return False, None
52
53
54
async def check_running_tasks(listener, state="dl"):
55
all_limit = Config.QUEUE_ALL
56
state_limit = Config.QUEUE_DOWNLOAD if state == "dl" else Config.QUEUE_UPLOAD
57
event = None
58
is_over_limit = False
59
async with queue_dict_lock:
60
if state == "up" and listener.mid in non_queued_dl:
61
non_queued_dl.remove(listener.mid)
62
if (
63
(all_limit or state_limit)
64
and not listener.force_run
65
and not (listener.force_upload and state == "up")
66
and not (listener.force_download and state == "dl")
67
):
68
dl_count = len(non_queued_dl)
69
up_count = len(non_queued_up)
70
t_count = dl_count if state == "dl" else up_count
71
is_over_limit = (
72
all_limit
73
and dl_count + up_count >= all_limit
74
and (not state_limit or t_count >= state_limit)
75
) or (state_limit and t_count >= state_limit)
76
if is_over_limit:
77
event = Event()
78
if state == "dl":
79
queued_dl[listener.mid] = event
80
else:
81
queued_up[listener.mid] = event
82
if not is_over_limit:
83
if state == "up":
84
non_queued_up.add(listener.mid)
85
else:
86
non_queued_dl.add(listener.mid)
87
88
return is_over_limit, event
89
90
91
async def start_dl_from_queued(mid: int):
92
queued_dl[mid].set()
93
del queued_dl[mid]
94
non_queued_dl.add(mid)
95
96
97
async def start_up_from_queued(mid: int):
98
queued_up[mid].set()
99
del queued_up[mid]
100
non_queued_up.add(mid)
101
102
103
async def start_from_queued():
104
if all_limit := Config.QUEUE_ALL:
105
dl_limit = Config.QUEUE_DOWNLOAD
106
up_limit = Config.QUEUE_UPLOAD
107
async with queue_dict_lock:
108
dl = len(non_queued_dl)
109
up = len(non_queued_up)
110
all_ = dl + up
111
if all_ < all_limit:
112
f_tasks = all_limit - all_
113
if queued_up and (not up_limit or up < up_limit):
114
for index, mid in enumerate(list(queued_up.keys()), start=1):
115
await start_up_from_queued(mid)
116
f_tasks -= 1
117
if f_tasks == 0 or (up_limit and index >= up_limit - up):
118
break
119
if queued_dl and (not dl_limit or dl < dl_limit) and f_tasks != 0:
120
for index, mid in enumerate(list(queued_dl.keys()), start=1):
121
await start_dl_from_queued(mid)
122
if (dl_limit and index >= dl_limit - dl) or index == f_tasks:
123
break
124
return
125
126
if up_limit := Config.QUEUE_UPLOAD:
127
async with queue_dict_lock:
128
up = len(non_queued_up)
129
if queued_up and up < up_limit:
130
f_tasks = up_limit - up
131
for index, mid in enumerate(list(queued_up.keys()), start=1):
132
await start_up_from_queued(mid)
133
if index == f_tasks:
134
break
135
else:
136
async with queue_dict_lock:
137
if queued_up:
138
for mid in list(queued_up.keys()):
139
await start_up_from_queued(mid)
140
141
if dl_limit := Config.QUEUE_DOWNLOAD:
142
async with queue_dict_lock:
143
dl = len(non_queued_dl)
144
if queued_dl and dl < dl_limit:
145
f_tasks = dl_limit - dl
146
for index, mid in enumerate(list(queued_dl.keys()), start=1):
147
await start_dl_from_queued(mid)
148
if index == f_tasks:
149
break
150
else:
151
async with queue_dict_lock:
152
if queued_dl:
153
for mid in list(queued_dl.keys()):
154
await start_dl_from_queued(mid)
155
156