Path: blob/master/bot/helper/ext_utils/db_handler.py
1626 views
from aiofiles import open as aiopen1from aiofiles.os import path as aiopath2from importlib import import_module3from pymongo import AsyncMongoClient4from pymongo.server_api import ServerApi5from pymongo.errors import PyMongoError67from ... import LOGGER, user_data, rss_dict, qbit_options8from ...core.telegram_manager import TgClient9from ...core.config_manager import Config101112class DbManager:13def __init__(self):14self._return = True15self._conn = None16self.db = None1718async def connect(self):19try:20if self._conn is not None:21await self._conn.close()22self._conn = AsyncMongoClient(23Config.DATABASE_URL,24server_api=ServerApi("1"),25connectTimeoutMS=60000,26serverSelectionTimeoutMS=60000,27)28self.db = self._conn[Config.DATABASE_NAME]29self._return = False30except PyMongoError as e:31LOGGER.error(f"Error in DB connection: {e}")32self.db = None33self._return = True34self._conn = None3536async def disconnect(self):37self._return = True38if self._conn is not None:39await self._conn.close()40self._conn = None4142async def update_deploy_config(self):43if self._return:44return45settings = import_module("config")46config_file = {47key: value.strip() if isinstance(value, str) else value48for key, value in vars(settings).items()49if not key.startswith("__")50}51await self.db.settings.deployConfig.replace_one(52{"_id": TgClient.ID}, config_file, upsert=True53)5455async def update_config(self, dict_):56if self._return:57return58await self.db.settings.config.update_one(59{"_id": TgClient.ID}, {"$set": dict_}, upsert=True60)6162async def update_aria2(self, key, value):63if self._return:64return65await self.db.settings.aria2c.update_one(66{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True67)6869async def update_qbittorrent(self, key, value):70if self._return:71return72await self.db.settings.qbittorrent.update_one(73{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True74)7576async def save_qbit_settings(self):77if self._return:78return79await self.db.settings.qbittorrent.update_one(80{"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True81)8283async def update_private_file(self, path):84if self._return:85return86db_path = path.replace(".", "__")87if await aiopath.exists(path):88async with aiopen(path, "rb+") as pf:89pf_bin = await pf.read()90await self.db.settings.files.update_one(91{"_id": TgClient.ID}, {"$set": {db_path: pf_bin}}, upsert=True92)93if path == "config.py":94await self.update_deploy_config()95else:96await self.db.settings.files.update_one(97{"_id": TgClient.ID}, {"$unset": {db_path: ""}}, upsert=True98)99100async def update_nzb_config(self):101if self._return:102return103async with aiopen("sabnzbd/SABnzbd.ini", "rb+") as pf:104nzb_conf = await pf.read()105await self.db.settings.nzb.replace_one(106{"_id": TgClient.ID}, {"SABnzbd__ini": nzb_conf}, upsert=True107)108109async def update_user_data(self, user_id):110if self._return:111return112data = user_data.get(user_id, {})113data = data.copy()114for key in ("THUMBNAIL", "RCLONE_CONFIG", "TOKEN_PICKLE"):115data.pop(key, None)116pipeline = [117{118"$replaceRoot": {119"newRoot": {120"$mergeObjects": [121data,122{123"$arrayToObject": {124"$filter": {125"input": {"$objectToArray": "$$ROOT"},126"as": "field",127"cond": {128"$in": [129"$$field.k",130[131"THUMBNAIL",132"RCLONE_CONFIG",133"TOKEN_PICKLE",134],135]136},137}138}139},140]141}142}143}144]145await self.db.users.update_one({"_id": user_id}, pipeline, upsert=True)146147async def update_user_doc(self, user_id, key, path=""):148if self._return:149return150if path:151async with aiopen(path, "rb+") as doc:152doc_bin = await doc.read()153await self.db.users.update_one(154{"_id": user_id}, {"$set": {key: doc_bin}}, upsert=True155)156else:157await self.db.users.update_one(158{"_id": user_id}, {"$unset": {key: ""}}, upsert=True159)160161async def rss_update_all(self):162if self._return:163return164for user_id in list(rss_dict.keys()):165await self.db.rss[TgClient.ID].replace_one(166{"_id": user_id}, rss_dict[user_id], upsert=True167)168169async def rss_update(self, user_id):170if self._return:171return172await self.db.rss[TgClient.ID].replace_one(173{"_id": user_id}, rss_dict[user_id], upsert=True174)175176async def rss_delete(self, user_id):177if self._return:178return179await self.db.rss[TgClient.ID].delete_one({"_id": user_id})180181async def add_incomplete_task(self, cid, link, tag):182if self._return:183return184await self.db.tasks[TgClient.ID].insert_one(185{"_id": link, "cid": cid, "tag": tag}186)187188async def rm_complete_task(self, link):189if self._return:190return191await self.db.tasks[TgClient.ID].delete_one({"_id": link})192193async def get_incomplete_tasks(self):194notifier_dict = {}195if self._return:196return notifier_dict197if await self.db.tasks[TgClient.ID].find_one():198rows = self.db.tasks[TgClient.ID].find({})199async for row in rows:200if row["cid"] in list(notifier_dict.keys()):201if row["tag"] in list(notifier_dict[row["cid"]]):202notifier_dict[row["cid"]][row["tag"]].append(row["_id"])203else:204notifier_dict[row["cid"]][row["tag"]] = [row["_id"]]205else:206notifier_dict[row["cid"]] = {row["tag"]: [row["_id"]]}207await self.db.tasks[TgClient.ID].drop()208return notifier_dict209210async def trunc_table(self, name):211if self._return:212return213await self.db[name][TgClient.ID].drop()214215216database = DbManager()217218219