Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/modules/cancel_task.py
1620 views
1
from asyncio import sleep
2
3
from .. import task_dict, task_dict_lock, user_data, multi_tags
4
from ..core.config_manager import Config
5
from ..helper.ext_utils.bot_utils import new_task
6
from ..helper.ext_utils.status_utils import (
7
get_task_by_gid,
8
get_all_tasks,
9
MirrorStatus,
10
)
11
from ..helper.telegram_helper import button_build
12
from ..helper.telegram_helper.bot_commands import BotCommands
13
from ..helper.telegram_helper.filters import CustomFilters
14
from ..helper.telegram_helper.message_utils import (
15
send_message,
16
auto_delete_message,
17
delete_message,
18
edit_message,
19
)
20
21
22
@new_task
23
async def cancel(_, message):
24
user_id = message.from_user.id if message.from_user else message.sender_chat.id
25
msg = message.text.split()
26
if len(msg) > 1:
27
gid = msg[1]
28
if len(gid) == 4:
29
multi_tags.discard(gid)
30
return
31
else:
32
task = await get_task_by_gid(gid)
33
if task is None:
34
await send_message(message, f"GID: <code>{gid}</code> Not Found.")
35
return
36
elif reply_to_id := message.reply_to_message_id:
37
async with task_dict_lock:
38
task = task_dict.get(reply_to_id)
39
if task is None:
40
await send_message(message, "This is not an active task!")
41
return
42
elif len(msg) == 1:
43
msg = (
44
"Reply to an active Command message which was used to start the download"
45
f" or send <code>/{BotCommands.CancelTaskCommand[0]} GID</code> to cancel it!"
46
)
47
await send_message(message, msg)
48
return
49
if (
50
Config.OWNER_ID != user_id
51
and task.listener.user_id != user_id
52
and (user_id not in user_data or not user_data[user_id].get("SUDO"))
53
):
54
await send_message(message, "This task is not for you!")
55
return
56
obj = task.task()
57
await obj.cancel_task()
58
59
60
@new_task
61
async def cancel_multi(_, query):
62
data = query.data.split()
63
user_id = query.from_user.id
64
if user_id != int(data[1]) and not await CustomFilters.sudo("", query):
65
await query.answer("Not Yours!", show_alert=True)
66
return
67
tag = int(data[2])
68
if tag in multi_tags:
69
multi_tags.discard(int(data[2]))
70
msg = "Stopped!"
71
else:
72
msg = "Already Stopped/Finished!"
73
await query.answer(msg, show_alert=True)
74
await delete_message(query.message)
75
76
77
async def cancel_all(status, user_id):
78
matches = await get_all_tasks(status.strip(), user_id)
79
if not matches:
80
return False
81
for task in matches:
82
obj = task.task()
83
await obj.cancel_task()
84
await sleep(2)
85
return True
86
87
88
def create_cancel_buttons(is_sudo, user_id=""):
89
buttons = button_build.ButtonMaker()
90
buttons.data_button(
91
"Downloading", f"canall ms {MirrorStatus.STATUS_DOWNLOAD} {user_id}"
92
)
93
buttons.data_button(
94
"Uploading", f"canall ms {MirrorStatus.STATUS_UPLOAD} {user_id}"
95
)
96
buttons.data_button("Seeding", f"canall ms {MirrorStatus.STATUS_SEED} {user_id}")
97
buttons.data_button("Spltting", f"canall ms {MirrorStatus.STATUS_SPLIT} {user_id}")
98
buttons.data_button("Cloning", f"canall ms {MirrorStatus.STATUS_CLONE} {user_id}")
99
buttons.data_button(
100
"Extracting", f"canall ms {MirrorStatus.STATUS_EXTRACT} {user_id}"
101
)
102
buttons.data_button(
103
"Archiving", f"canall ms {MirrorStatus.STATUS_ARCHIVE} {user_id}"
104
)
105
buttons.data_button(
106
"QueuedDl", f"canall ms {MirrorStatus.STATUS_QUEUEDL} {user_id}"
107
)
108
buttons.data_button(
109
"QueuedUp", f"canall ms {MirrorStatus.STATUS_QUEUEUP} {user_id}"
110
)
111
buttons.data_button(
112
"SampleVideo", f"canall ms {MirrorStatus.STATUS_SAMVID} {user_id}"
113
)
114
buttons.data_button(
115
"ConvertMedia", f"canall ms {MirrorStatus.STATUS_CONVERT} {user_id}"
116
)
117
buttons.data_button("FFmpeg", f"canall ms {MirrorStatus.STATUS_FFMPEG} {user_id}")
118
buttons.data_button("Paused", f"canall ms {MirrorStatus.STATUS_PAUSED} {user_id}")
119
buttons.data_button("All", f"canall ms All {user_id}")
120
if is_sudo:
121
if user_id:
122
buttons.data_button("All Added Tasks", f"canall bot ms {user_id}")
123
else:
124
buttons.data_button("My Tasks", f"canall user ms {user_id}")
125
buttons.data_button("Close", f"canall close ms {user_id}")
126
return buttons.build_menu(2)
127
128
129
@new_task
130
async def cancel_all_buttons(_, message):
131
async with task_dict_lock:
132
count = len(task_dict)
133
if count == 0:
134
await send_message(message, "No active tasks!")
135
return
136
is_sudo = await CustomFilters.sudo("", message)
137
button = create_cancel_buttons(is_sudo, message.from_user.id)
138
can_msg = await send_message(message, "Choose tasks to cancel!", button)
139
await auto_delete_message(message, can_msg)
140
141
142
@new_task
143
async def cancel_all_update(_, query):
144
data = query.data.split()
145
message = query.message
146
reply_to = message.reply_to_message
147
user_id = int(data[3]) if len(data) > 3 else ""
148
is_sudo = await CustomFilters.sudo("", query)
149
if not is_sudo and user_id and user_id != query.from_user.id:
150
await query.answer("Not Yours!", show_alert=True)
151
else:
152
await query.answer()
153
if data[1] == "close":
154
await delete_message(reply_to)
155
await delete_message(message)
156
elif data[1] == "back":
157
button = create_cancel_buttons(is_sudo, user_id)
158
await edit_message(message, "Choose tasks to cancel!", button)
159
elif data[1] == "bot":
160
button = create_cancel_buttons(is_sudo, "")
161
await edit_message(message, "Choose tasks to cancel!", button)
162
elif data[1] == "user":
163
button = create_cancel_buttons(is_sudo, query.from_user.id)
164
await edit_message(message, "Choose tasks to cancel!", button)
165
elif data[1] == "ms":
166
buttons = button_build.ButtonMaker()
167
buttons.data_button("Yes!", f"canall {data[2]} confirm {user_id}")
168
buttons.data_button("Back", f"canall back confirm {user_id}")
169
buttons.data_button("Close", f"canall close confirm {user_id}")
170
button = buttons.build_menu(2)
171
await edit_message(
172
message, f"Are you sure you want to cancel all {data[2]} tasks", button
173
)
174
else:
175
button = create_cancel_buttons(is_sudo, user_id)
176
await edit_message(message, "Choose tasks to cancel.", button)
177
res = await cancel_all(data[1], user_id)
178
if not res:
179
await send_message(reply_to, f"No matching tasks for {data[1]}!")
180
181