Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/ext_utils/db_handler.py
1626 views
1
from aiofiles import open as aiopen
2
from aiofiles.os import path as aiopath
3
from importlib import import_module
4
from pymongo import AsyncMongoClient
5
from pymongo.server_api import ServerApi
6
from pymongo.errors import PyMongoError
7
8
from ... import LOGGER, user_data, rss_dict, qbit_options
9
from ...core.telegram_manager import TgClient
10
from ...core.config_manager import Config
11
12
13
class DbManager:
14
def __init__(self):
15
self._return = True
16
self._conn = None
17
self.db = None
18
19
async def connect(self):
20
try:
21
if self._conn is not None:
22
await self._conn.close()
23
self._conn = AsyncMongoClient(
24
Config.DATABASE_URL,
25
server_api=ServerApi("1"),
26
connectTimeoutMS=60000,
27
serverSelectionTimeoutMS=60000,
28
)
29
self.db = self._conn[Config.DATABASE_NAME]
30
self._return = False
31
except PyMongoError as e:
32
LOGGER.error(f"Error in DB connection: {e}")
33
self.db = None
34
self._return = True
35
self._conn = None
36
37
async def disconnect(self):
38
self._return = True
39
if self._conn is not None:
40
await self._conn.close()
41
self._conn = None
42
43
async def update_deploy_config(self):
44
if self._return:
45
return
46
settings = import_module("config")
47
config_file = {
48
key: value.strip() if isinstance(value, str) else value
49
for key, value in vars(settings).items()
50
if not key.startswith("__")
51
}
52
await self.db.settings.deployConfig.replace_one(
53
{"_id": TgClient.ID}, config_file, upsert=True
54
)
55
56
async def update_config(self, dict_):
57
if self._return:
58
return
59
await self.db.settings.config.update_one(
60
{"_id": TgClient.ID}, {"$set": dict_}, upsert=True
61
)
62
63
async def update_aria2(self, key, value):
64
if self._return:
65
return
66
await self.db.settings.aria2c.update_one(
67
{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True
68
)
69
70
async def update_qbittorrent(self, key, value):
71
if self._return:
72
return
73
await self.db.settings.qbittorrent.update_one(
74
{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True
75
)
76
77
async def save_qbit_settings(self):
78
if self._return:
79
return
80
await self.db.settings.qbittorrent.update_one(
81
{"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True
82
)
83
84
async def update_private_file(self, path):
85
if self._return:
86
return
87
db_path = path.replace(".", "__")
88
if await aiopath.exists(path):
89
async with aiopen(path, "rb+") as pf:
90
pf_bin = await pf.read()
91
await self.db.settings.files.update_one(
92
{"_id": TgClient.ID}, {"$set": {db_path: pf_bin}}, upsert=True
93
)
94
if path == "config.py":
95
await self.update_deploy_config()
96
else:
97
await self.db.settings.files.update_one(
98
{"_id": TgClient.ID}, {"$unset": {db_path: ""}}, upsert=True
99
)
100
101
async def update_nzb_config(self):
102
if self._return:
103
return
104
async with aiopen("sabnzbd/SABnzbd.ini", "rb+") as pf:
105
nzb_conf = await pf.read()
106
await self.db.settings.nzb.replace_one(
107
{"_id": TgClient.ID}, {"SABnzbd__ini": nzb_conf}, upsert=True
108
)
109
110
async def update_user_data(self, user_id):
111
if self._return:
112
return
113
data = user_data.get(user_id, {})
114
data = data.copy()
115
for key in ("THUMBNAIL", "RCLONE_CONFIG", "TOKEN_PICKLE"):
116
data.pop(key, None)
117
pipeline = [
118
{
119
"$replaceRoot": {
120
"newRoot": {
121
"$mergeObjects": [
122
data,
123
{
124
"$arrayToObject": {
125
"$filter": {
126
"input": {"$objectToArray": "$$ROOT"},
127
"as": "field",
128
"cond": {
129
"$in": [
130
"$$field.k",
131
[
132
"THUMBNAIL",
133
"RCLONE_CONFIG",
134
"TOKEN_PICKLE",
135
],
136
]
137
},
138
}
139
}
140
},
141
]
142
}
143
}
144
}
145
]
146
await self.db.users.update_one({"_id": user_id}, pipeline, upsert=True)
147
148
async def update_user_doc(self, user_id, key, path=""):
149
if self._return:
150
return
151
if path:
152
async with aiopen(path, "rb+") as doc:
153
doc_bin = await doc.read()
154
await self.db.users.update_one(
155
{"_id": user_id}, {"$set": {key: doc_bin}}, upsert=True
156
)
157
else:
158
await self.db.users.update_one(
159
{"_id": user_id}, {"$unset": {key: ""}}, upsert=True
160
)
161
162
async def rss_update_all(self):
163
if self._return:
164
return
165
for user_id in list(rss_dict.keys()):
166
await self.db.rss[TgClient.ID].replace_one(
167
{"_id": user_id}, rss_dict[user_id], upsert=True
168
)
169
170
async def rss_update(self, user_id):
171
if self._return:
172
return
173
await self.db.rss[TgClient.ID].replace_one(
174
{"_id": user_id}, rss_dict[user_id], upsert=True
175
)
176
177
async def rss_delete(self, user_id):
178
if self._return:
179
return
180
await self.db.rss[TgClient.ID].delete_one({"_id": user_id})
181
182
async def add_incomplete_task(self, cid, link, tag):
183
if self._return:
184
return
185
await self.db.tasks[TgClient.ID].insert_one(
186
{"_id": link, "cid": cid, "tag": tag}
187
)
188
189
async def rm_complete_task(self, link):
190
if self._return:
191
return
192
await self.db.tasks[TgClient.ID].delete_one({"_id": link})
193
194
async def get_incomplete_tasks(self):
195
notifier_dict = {}
196
if self._return:
197
return notifier_dict
198
if await self.db.tasks[TgClient.ID].find_one():
199
rows = self.db.tasks[TgClient.ID].find({})
200
async for row in rows:
201
if row["cid"] in list(notifier_dict.keys()):
202
if row["tag"] in list(notifier_dict[row["cid"]]):
203
notifier_dict[row["cid"]][row["tag"]].append(row["_id"])
204
else:
205
notifier_dict[row["cid"]][row["tag"]] = [row["_id"]]
206
else:
207
notifier_dict[row["cid"]] = {row["tag"]: [row["_id"]]}
208
await self.db.tasks[TgClient.ID].drop()
209
return notifier_dict
210
211
async def trunc_table(self, name):
212
if self._return:
213
return
214
await self.db[name][TgClient.ID].drop()
215
216
217
database = DbManager()
218
219