Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/modules/clone.py
1619 views
1
from asyncio import gather
2
from json import loads
3
from secrets import token_urlsafe
4
from aiofiles.os import remove
5
6
from .. import LOGGER, task_dict, task_dict_lock, bot_loop
7
from ..helper.ext_utils.bot_utils import (
8
sync_to_async,
9
cmd_exec,
10
arg_parser,
11
COMMAND_USAGE,
12
)
13
from ..helper.ext_utils.exceptions import DirectDownloadLinkException
14
from ..helper.ext_utils.links_utils import (
15
is_gdrive_link,
16
is_share_link,
17
is_rclone_path,
18
is_gdrive_id,
19
)
20
from ..helper.ext_utils.task_manager import stop_duplicate_check
21
from ..helper.listeners.task_listener import TaskListener
22
from ..helper.mirror_leech_utils.download_utils.direct_link_generator import (
23
direct_link_generator,
24
)
25
from ..helper.mirror_leech_utils.gdrive_utils.clone import GoogleDriveClone
26
from ..helper.mirror_leech_utils.gdrive_utils.count import GoogleDriveCount
27
from ..helper.mirror_leech_utils.rclone_utils.transfer import RcloneTransferHelper
28
from ..helper.mirror_leech_utils.status_utils.gdrive_status import GoogleDriveStatus
29
from ..helper.mirror_leech_utils.status_utils.rclone_status import RcloneStatus
30
from ..helper.telegram_helper.message_utils import (
31
send_message,
32
delete_message,
33
send_status_message,
34
)
35
36
37
class Clone(TaskListener):
38
def __init__(
39
self,
40
client,
41
message,
42
_=None,
43
__=None,
44
___=None,
45
____=None,
46
_____=None,
47
bulk=None,
48
multi_tag=None,
49
options="",
50
):
51
if bulk is None:
52
bulk = []
53
self.message = message
54
self.client = client
55
self.multi_tag = multi_tag
56
self.options = options
57
self.same_dir = {}
58
self.bulk = bulk
59
super().__init__()
60
self.is_clone = True
61
62
async def new_event(self):
63
text = self.message.text.split("\n")
64
input_list = text[0].split(" ")
65
66
args = {
67
"link": "",
68
"-i": 0,
69
"-b": False,
70
"-n": "",
71
"-up": "",
72
"-rcf": "",
73
"-sync": False,
74
}
75
76
arg_parser(input_list[1:], args)
77
78
try:
79
self.multi = int(args["-i"])
80
except:
81
self.multi = 0
82
83
self.up_dest = args["-up"]
84
self.rc_flags = args["-rcf"]
85
self.link = args["link"]
86
self.name = args["-n"]
87
88
is_bulk = args["-b"]
89
sync = args["-sync"]
90
bulk_start = 0
91
bulk_end = 0
92
93
if not isinstance(is_bulk, bool):
94
dargs = is_bulk.split(":")
95
bulk_start = dargs[0] or 0
96
if len(dargs) == 2:
97
bulk_end = dargs[1] or 0
98
is_bulk = True
99
100
if is_bulk:
101
await self.init_bulk(input_list, bulk_start, bulk_end, Clone)
102
return
103
104
await self.get_tag(text)
105
106
if not self.link and (reply_to := self.message.reply_to_message):
107
self.link = reply_to.text.split("\n", 1)[0].strip()
108
109
await self.run_multi(input_list, Clone)
110
111
if len(self.link) == 0:
112
await send_message(
113
self.message, COMMAND_USAGE["clone"][0], COMMAND_USAGE["clone"][1]
114
)
115
return
116
LOGGER.info(self.link)
117
try:
118
await self.before_start()
119
except Exception as e:
120
await send_message(self.message, e)
121
return
122
await self._proceed_to_clone(sync)
123
124
async def _proceed_to_clone(self, sync):
125
if is_share_link(self.link):
126
try:
127
self.link = await sync_to_async(direct_link_generator, self.link)
128
LOGGER.info(f"Generated link: {self.link}")
129
except DirectDownloadLinkException as e:
130
LOGGER.error(str(e))
131
if str(e).startswith("ERROR:"):
132
await send_message(self.message, str(e))
133
return
134
if is_gdrive_link(self.link) or is_gdrive_id(self.link):
135
self.name, mime_type, self.size, files, _ = await sync_to_async(
136
GoogleDriveCount().count, self.link, self.user_id
137
)
138
if mime_type is None:
139
await send_message(self.message, self.name)
140
return
141
msg, button = await stop_duplicate_check(self)
142
if msg:
143
await send_message(self.message, msg, button)
144
return
145
await self.on_download_start()
146
LOGGER.info(f"Clone Started: Name: {self.name} - Source: {self.link}")
147
drive = GoogleDriveClone(self)
148
if files <= 10:
149
msg = await send_message(
150
self.message, f"Cloning: <code>{self.link}</code>"
151
)
152
else:
153
msg = ""
154
gid = token_urlsafe(12)
155
async with task_dict_lock:
156
task_dict[self.mid] = GoogleDriveStatus(self, drive, gid, "cl")
157
if self.multi <= 1:
158
await send_status_message(self.message)
159
flink, mime_type, files, folders, dir_id = await sync_to_async(drive.clone)
160
if msg:
161
await delete_message(msg)
162
if not flink:
163
return
164
await self.on_upload_complete(
165
flink, files, folders, mime_type, dir_id=dir_id
166
)
167
LOGGER.info(f"Cloning Done: {self.name}")
168
elif is_rclone_path(self.link):
169
if self.link.startswith("mrcc:"):
170
self.link = self.link.replace("mrcc:", "", 1)
171
self.up_dest = self.up_dest.replace("mrcc:", "", 1)
172
config_path = f"rclone/{self.user_id}.conf"
173
else:
174
config_path = "rclone.conf"
175
176
remote, src_path = self.link.split(":", 1)
177
self.link = src_path.strip("/")
178
if self.link.startswith("rclone_select"):
179
mime_type = "Folder"
180
src_path = ""
181
if not self.name:
182
self.name = self.link
183
else:
184
src_path = self.link
185
cmd = [
186
"rclone",
187
"lsjson",
188
"--fast-list",
189
"--stat",
190
"--no-modtime",
191
"--config",
192
config_path,
193
f"{remote}:{src_path}",
194
]
195
res = await cmd_exec(cmd)
196
if res[2] != 0:
197
if res[2] != -9:
198
msg = f"Error: While getting rclone stat. Path: {remote}:{src_path}. Stderr: {res[1][:4000]}"
199
await send_message(self.message, msg)
200
return
201
rstat = loads(res[0])
202
if rstat["IsDir"]:
203
if not self.name:
204
self.name = src_path.rsplit("/", 1)[-1] if src_path else remote
205
self.up_dest += (
206
self.name if self.up_dest.endswith(":") else f"/{self.name}"
207
)
208
mime_type = "Folder"
209
else:
210
if not self.name:
211
self.name = src_path.rsplit("/", 1)[-1]
212
mime_type = rstat["MimeType"]
213
214
await self.on_download_start()
215
216
RCTransfer = RcloneTransferHelper(self)
217
LOGGER.info(
218
f"Clone Started: Name: {self.name} - Source: {self.link} - Destination: {self.up_dest}"
219
)
220
gid = token_urlsafe(12)
221
async with task_dict_lock:
222
task_dict[self.mid] = RcloneStatus(self, RCTransfer, gid, "cl")
223
if self.multi <= 1:
224
await send_status_message(self.message)
225
method = "sync" if sync else "copy"
226
flink, destination = await RCTransfer.clone(
227
config_path,
228
remote,
229
src_path,
230
mime_type,
231
method,
232
)
233
if self.link.startswith("rclone_select"):
234
await remove(self.link)
235
if not destination:
236
return
237
LOGGER.info(f"Cloning Done: {self.name}")
238
cmd1 = [
239
"rclone",
240
"lsf",
241
"--fast-list",
242
"-R",
243
"--files-only",
244
"--config",
245
config_path,
246
destination,
247
]
248
cmd2 = [
249
"rclone",
250
"lsf",
251
"--fast-list",
252
"-R",
253
"--dirs-only",
254
"--config",
255
config_path,
256
destination,
257
]
258
cmd3 = [
259
"rclone",
260
"size",
261
"--fast-list",
262
"--json",
263
"--config",
264
config_path,
265
destination,
266
]
267
res1, res2, res3 = await gather(
268
cmd_exec(cmd1),
269
cmd_exec(cmd2),
270
cmd_exec(cmd3),
271
)
272
if res1[2] != 0 or res2[2] != 0 or res3[2] != 0:
273
if res1[2] == -9:
274
return
275
files = None
276
folders = None
277
self.size = 0
278
error = res1[1] or res2[1] or res3[1]
279
msg = f"Error: While getting rclone stat. Path: {destination}. Stderr: {error[:4000]}"
280
await self.on_upload_error(msg)
281
else:
282
files = len(res1[0].split("\n"))
283
folders = len(res2[0].strip().split("\n")) if res2[0] else 0
284
rsize = loads(res3[0])
285
self.size = rsize["bytes"]
286
await self.on_upload_complete(
287
flink, files, folders, mime_type, destination
288
)
289
else:
290
await send_message(
291
self.message, COMMAND_USAGE["clone"][0], COMMAND_USAGE["clone"][1]
292
)
293
294
295
async def clone_node(client, message):
296
bot_loop.create_task(Clone(client, message).new_event())
297
298