Path: blob/master/bot/modules/chat_permission.py
1630 views
from .. import user_data1from ..helper.ext_utils.bot_utils import update_user_ldata, new_task2from ..helper.ext_utils.db_handler import database3from ..helper.telegram_helper.message_utils import send_message456@new_task7async def authorize(_, message):8msg = message.text.split()9thread_id = None10try:11if len(msg) > 1:12if "|" in msg:13chat_id, thread_id = list(map(int, msg[1].split("|")))14else:15chat_id = int(msg[1].strip())16elif (17reply_to := message.reply_to_message18) and reply_to.id != message.message_thread_id:19chat_id = (20reply_to.from_user.id if reply_to.from_user else reply_to.sender_chat.id21)22else:23if message.topic_message:24thread_id = message.message_thread_id25chat_id = message.chat.id26if chat_id in user_data and user_data[chat_id].get("AUTH"):27if (28thread_id is not None29and thread_id in user_data[chat_id].get("thread_ids", [])30or thread_id is None31):32msg = "Already Authorized!"33else:34if "thread_ids" in user_data[chat_id]:35user_data[chat_id]["thread_ids"].append(thread_id)36else:37user_data[chat_id]["thread_ids"] = [thread_id]38msg = "Authorized"39else:40update_user_ldata(chat_id, "AUTH", True)41if thread_id is not None:42update_user_ldata(chat_id, "thread_ids", [thread_id])43await database.update_user_data(chat_id)44msg = "Authorized"45except Exception as e:46msg = f"Error: {e}"47await send_message(message, msg)484950@new_task51async def unauthorize(_, message):52msg = message.text.split()53thread_id = None54try:55if len(msg) > 1:56if "|" in msg:57chat_id, thread_id = list(map(int, msg[1].split("|")))58else:59chat_id = int(msg[1].strip())60elif (61reply_to := message.reply_to_message62) and reply_to.id != message.message_thread_id:63chat_id = (64reply_to.from_user.id if reply_to.from_user else reply_to.sender_chat.id65)66else:67if message.topic_message:68thread_id = message.message_thread_id69chat_id = message.chat.id70if chat_id in user_data and user_data[chat_id].get("AUTH"):71if thread_id is not None and thread_id in user_data[chat_id].get(72"thread_ids", []73):74user_data[chat_id]["thread_ids"].remove(thread_id)75else:76update_user_ldata(chat_id, "AUTH", False)77await database.update_user_data(chat_id)78msg = "Unauthorized"79else:80msg = "Already Unauthorized! Authorized Chats added from config must be removed from config."81except Exception as e:82msg = f"Error: {e}"83await send_message(message, msg)848586@new_task87async def add_sudo(_, message):88id_ = ""89msg = message.text.split()90try:91if len(msg) > 1:92id_ = int(msg[1].strip())93elif reply_to := message.reply_to_message:94id_ = reply_to.from_user.id if reply_to.from_user else reply_to.sender_chat.id95if id_:96if id_ in user_data and user_data[id_].get("SUDO"):97msg = "Already Sudo!"98else:99update_user_ldata(id_, "SUDO", True)100await database.update_user_data(id_)101msg = "Promoted as Sudo"102else:103msg = "Give ID or Reply To message of whom you want to Promote."104except Exception as e:105msg = f"Error: {e}"106await send_message(message, msg)107108109@new_task110async def remove_sudo(_, message):111id_ = ""112msg = message.text.split()113try:114if len(msg) > 1:115id_ = int(msg[1].strip())116elif reply_to := message.reply_to_message:117id_ = reply_to.from_user.id if reply_to.from_user else reply_to.sender_chat.id118if id_:119if id_ in user_data and user_data[id_].get("SUDO"):120update_user_ldata(id_, "SUDO", False)121await database.update_user_data(id_)122msg = "Demoted"123else:124msg = "Already Not Sudo! Sudo users added from config must be removed from config."125else:126msg = "Give ID or Reply To message of whom you want to remove from Sudo"127except Exception as e:128msg = f"Error: {e}"129await send_message(message, msg)130131132