Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/telegram_helper/message_utils.py
1635 views
1
from asyncio import sleep
2
from pyrogram.errors import FloodWait, FloodPremiumWait
3
from re import match as re_match
4
from time import time
5
6
from ... import LOGGER, status_dict, task_dict_lock, intervals, DOWNLOAD_DIR
7
from ...core.config_manager import Config
8
from ...core.telegram_manager import TgClient
9
from ..ext_utils.bot_utils import SetInterval
10
from ..ext_utils.exceptions import TgLinkException
11
from ..ext_utils.status_utils import get_readable_message
12
13
14
async def send_message(message, text, buttons=None, block=True):
15
try:
16
return await message.reply(
17
text=text,
18
disable_notification=True,
19
reply_markup=buttons,
20
)
21
except FloodWait as f:
22
LOGGER.warning(str(f))
23
if not block:
24
return str(f)
25
await sleep(f.value * 1.2)
26
return await send_message(message, text, buttons)
27
except Exception as e:
28
LOGGER.error(str(e))
29
return str(e)
30
31
32
async def edit_message(message, text, buttons=None, block=True):
33
try:
34
return await message.edit(
35
text=text,
36
reply_markup=buttons,
37
)
38
except FloodWait as f:
39
LOGGER.warning(str(f))
40
if not block:
41
return str(f)
42
await sleep(f.value * 1.2)
43
return await edit_message(message, text, buttons)
44
except Exception as e:
45
LOGGER.error(str(e))
46
return str(e)
47
48
49
async def send_file(message, file, caption=""):
50
try:
51
return await message.reply_document(
52
document=file, caption=caption, disable_notification=True
53
)
54
except FloodWait as f:
55
LOGGER.warning(str(f))
56
await sleep(f.value * 1.2)
57
return await send_file(message, file, caption)
58
except Exception as e:
59
LOGGER.error(str(e))
60
return str(e)
61
62
63
async def send_rss(text, chat_id, thread_id):
64
try:
65
app = TgClient.user or TgClient.bot
66
return await app.send_message(
67
chat_id=chat_id,
68
text=text,
69
message_thread_id=thread_id,
70
disable_notification=True,
71
)
72
except (FloodWait, FloodPremiumWait) as f:
73
LOGGER.warning(str(f))
74
await sleep(f.value * 1.2)
75
return await send_rss(text, chat_id, thread_id)
76
except Exception as e:
77
LOGGER.error(str(e))
78
return str(e)
79
80
81
async def delete_message(message):
82
try:
83
await message.delete()
84
except Exception as e:
85
LOGGER.error(str(e))
86
87
88
async def auto_delete_message(cmd_message=None, bot_message=None):
89
await sleep(60)
90
if cmd_message is not None:
91
await delete_message(cmd_message)
92
if bot_message is not None:
93
await delete_message(bot_message)
94
95
96
async def delete_status():
97
async with task_dict_lock:
98
for key, data in list(status_dict.items()):
99
try:
100
await delete_message(data["message"])
101
del status_dict[key]
102
except Exception as e:
103
LOGGER.error(str(e))
104
105
106
async def get_tg_link_message(link):
107
message = None
108
links = []
109
if link.startswith("https://t.me/"):
110
private = False
111
msg = re_match(
112
r"https:\/\/t\.me\/(?:c\/)?([^\/]+)(?:\/[^\/]+)?\/([0-9-]+)", link
113
)
114
else:
115
private = True
116
msg = re_match(
117
r"tg:\/\/openmessage\?user_id=([0-9]+)&message_id=([0-9-]+)", link
118
)
119
if not TgClient.user:
120
raise TgLinkException("USER_SESSION_STRING required for this private link!")
121
122
chat = msg[1]
123
msg_id = msg[2]
124
if "-" in msg_id:
125
start_id, end_id = msg_id.split("-")
126
msg_id = start_id = int(start_id)
127
end_id = int(end_id)
128
btw = end_id - start_id
129
if private:
130
link = link.split("&message_id=")[0]
131
links.append(f"{link}&message_id={start_id}")
132
for _ in range(btw):
133
start_id += 1
134
links.append(f"{link}&message_id={start_id}")
135
else:
136
link = link.rsplit("/", 1)[0]
137
links.append(f"{link}/{start_id}")
138
for _ in range(btw):
139
start_id += 1
140
links.append(f"{link}/{start_id}")
141
else:
142
msg_id = int(msg_id)
143
144
if chat.isdigit():
145
chat = int(chat) if private else int(f"-100{chat}")
146
147
if not private:
148
try:
149
message = await TgClient.bot.get_messages(chat_id=chat, message_ids=msg_id)
150
if message.empty:
151
private = True
152
except Exception as e:
153
private = True
154
if not TgClient.user:
155
raise e
156
157
if not private:
158
return (links, "bot") if links else (message, "bot")
159
elif TgClient.user:
160
try:
161
user_message = await TgClient.user.get_messages(
162
chat_id=chat, message_ids=msg_id
163
)
164
except Exception as e:
165
raise TgLinkException(
166
f"You don't have access to this chat!. ERROR: {e}"
167
) from e
168
if not user_message.empty:
169
return (links, "user") if links else (user_message, "user")
170
else:
171
raise TgLinkException("Private: Please report!")
172
173
174
async def temp_download(msg):
175
path = f"{DOWNLOAD_DIR}temp"
176
return await msg.download(file_name=f"{path}/")
177
178
179
async def update_status_message(sid, force=False):
180
if intervals["stopAll"]:
181
return
182
async with task_dict_lock:
183
if not status_dict.get(sid):
184
if obj := intervals["status"].get(sid):
185
obj.cancel()
186
del intervals["status"][sid]
187
return
188
if not force and time() - status_dict[sid]["time"] < 3:
189
return
190
status_dict[sid]["time"] = time()
191
page_no = status_dict[sid]["page_no"]
192
status = status_dict[sid]["status"]
193
is_user = status_dict[sid]["is_user"]
194
page_step = status_dict[sid]["page_step"]
195
text, buttons = await get_readable_message(
196
sid, is_user, page_no, status, page_step
197
)
198
if text is None:
199
del status_dict[sid]
200
if obj := intervals["status"].get(sid):
201
obj.cancel()
202
del intervals["status"][sid]
203
return
204
if text != status_dict[sid]["message"].text:
205
message = await edit_message(
206
status_dict[sid]["message"], text, buttons, block=False
207
)
208
if isinstance(message, str):
209
if message.startswith("Telegram says: [40"):
210
del status_dict[sid]
211
if obj := intervals["status"].get(sid):
212
obj.cancel()
213
del intervals["status"][sid]
214
else:
215
LOGGER.error(
216
f"Status with id: {sid} haven't been updated. Error: {message}"
217
)
218
return
219
status_dict[sid]["message"].text = text
220
status_dict[sid]["time"] = time()
221
222
223
async def send_status_message(msg, user_id=0):
224
if intervals["stopAll"]:
225
return
226
sid = user_id or msg.chat.id
227
is_user = bool(user_id)
228
async with task_dict_lock:
229
if sid in status_dict:
230
page_no = status_dict[sid]["page_no"]
231
status = status_dict[sid]["status"]
232
page_step = status_dict[sid]["page_step"]
233
text, buttons = await get_readable_message(
234
sid, is_user, page_no, status, page_step
235
)
236
if text is None:
237
del status_dict[sid]
238
if obj := intervals["status"].get(sid):
239
obj.cancel()
240
del intervals["status"][sid]
241
return
242
old_message = status_dict[sid]["message"]
243
message = await send_message(msg, text, buttons, block=False)
244
if isinstance(message, str):
245
LOGGER.error(
246
f"Status with id: {sid} haven't been sent. Error: {message}"
247
)
248
return
249
await delete_message(old_message)
250
message.text = text
251
status_dict[sid].update({"message": message, "time": time()})
252
else:
253
text, buttons = await get_readable_message(sid, is_user)
254
if text is None:
255
return
256
message = await send_message(msg, text, buttons, block=False)
257
if isinstance(message, str):
258
LOGGER.error(
259
f"Status with id: {sid} haven't been sent. Error: {message}"
260
)
261
return
262
message.text = text
263
status_dict[sid] = {
264
"message": message,
265
"time": time(),
266
"page_no": 1,
267
"page_step": 1,
268
"status": "All",
269
"is_user": is_user,
270
}
271
if not intervals["status"].get(sid) and not is_user:
272
intervals["status"][sid] = SetInterval(
273
Config.STATUS_UPDATE_INTERVAL, update_status_message, sid
274
)
275
276