Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/ext_utils/files_utils.py
1630 views
1
from aioshutil import rmtree as aiormtree, move
2
from asyncio import create_subprocess_exec, wait_for
3
from asyncio.subprocess import PIPE
4
from magic import Magic
5
from os import walk, path as ospath, readlink
6
from re import split as re_split, I, search as re_search, escape
7
from aiofiles.os import (
8
remove,
9
path as aiopath,
10
listdir,
11
rmdir,
12
readlink as aioreadlink,
13
symlink,
14
makedirs as aiomakedirs,
15
)
16
17
from ... import LOGGER, DOWNLOAD_DIR
18
from ...core.torrent_manager import TorrentManager
19
from .bot_utils import sync_to_async, cmd_exec
20
from .exceptions import NotSupportedExtractionArchive
21
22
ARCH_EXT = [
23
".tar.bz2",
24
".tar.gz",
25
".bz2",
26
".gz",
27
".tar.xz",
28
".tar",
29
".tbz2",
30
".tgz",
31
".lzma2",
32
".zip",
33
".7z",
34
".z",
35
".rar",
36
".iso",
37
".wim",
38
".cab",
39
".apm",
40
".arj",
41
".chm",
42
".cpio",
43
".cramfs",
44
".deb",
45
".dmg",
46
".fat",
47
".hfs",
48
".lzh",
49
".lzma",
50
".mbr",
51
".msi",
52
".mslz",
53
".nsis",
54
".ntfs",
55
".rpm",
56
".squashfs",
57
".udf",
58
".vhd",
59
".xar",
60
".zst",
61
".zstd",
62
".cbz",
63
".apfs",
64
".ar",
65
".qcow",
66
".macho",
67
".exe",
68
".dll",
69
".sys",
70
".pmd",
71
".swf",
72
".swfc",
73
".simg",
74
".vdi",
75
".vhdx",
76
".vmdk",
77
".gzip",
78
".lzma86",
79
".sha256",
80
".sha512",
81
".sha224",
82
".sha384",
83
".sha1",
84
".md5",
85
".crc32",
86
".crc64",
87
]
88
89
90
FIRST_SPLIT_REGEX = (
91
r"\.part0*1\.rar$|\.7z\.0*1$|\.zip\.0*1$|^(?!.*\.part\d+\.rar$).*\.rar$"
92
)
93
94
SPLIT_REGEX = r"\.r\d+$|\.7z\.\d+$|\.z\d+$|\.zip\.\d+$|\.part\d+\.rar$"
95
96
97
def is_first_archive_split(file):
98
return bool(re_search(FIRST_SPLIT_REGEX, file.lower(), I))
99
100
101
def is_archive(file):
102
return file.strip().lower().endswith(tuple(ARCH_EXT))
103
104
105
def is_archive_split(file):
106
return bool(re_search(SPLIT_REGEX, file.lower(), I))
107
108
109
async def clean_target(opath):
110
if await aiopath.exists(opath):
111
LOGGER.info(f"Cleaning Target: {opath}")
112
try:
113
if await aiopath.isdir(opath):
114
await aiormtree(opath, ignore_errors=True)
115
else:
116
await remove(opath)
117
except Exception as e:
118
LOGGER.error(str(e))
119
120
121
async def clean_download(opath):
122
if await aiopath.exists(opath):
123
LOGGER.info(f"Cleaning Download: {opath}")
124
try:
125
await aiormtree(opath, ignore_errors=True)
126
except Exception as e:
127
LOGGER.error(str(e))
128
129
130
async def clean_all():
131
await TorrentManager.remove_all()
132
LOGGER.info("Cleaning Download Directory")
133
await (await create_subprocess_exec("rm", "-rf", DOWNLOAD_DIR)).wait()
134
await aiomakedirs(DOWNLOAD_DIR, exist_ok=True)
135
136
137
async def clean_unwanted(opath):
138
LOGGER.info(f"Cleaning unwanted files/folders: {opath}")
139
for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):
140
for filee in files:
141
f_path = ospath.join(dirpath, filee)
142
if filee.strip().endswith(".parts") and filee.startswith("."):
143
await remove(f_path)
144
if dirpath.strip().endswith(".unwanted"):
145
await aiormtree(dirpath, ignore_errors=True)
146
for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):
147
if not await listdir(dirpath):
148
await rmdir(dirpath)
149
150
151
async def get_path_size(opath):
152
total_size = 0
153
if await aiopath.isfile(opath):
154
if await aiopath.islink(opath):
155
opath = await aioreadlink(opath)
156
return await aiopath.getsize(opath)
157
for root, _, files in await sync_to_async(walk, opath):
158
for f in files:
159
abs_path = ospath.join(root, f)
160
if await aiopath.islink(abs_path):
161
abs_path = await aioreadlink(abs_path)
162
total_size += await aiopath.getsize(abs_path)
163
return total_size
164
165
166
async def count_files_and_folders(opath):
167
total_files = 0
168
total_folders = 0
169
for _, dirs, files in await sync_to_async(walk, opath):
170
total_files += len(files)
171
total_folders += len(dirs)
172
return total_folders, total_files
173
174
175
def get_base_name(orig_path):
176
extension = next(
177
(ext for ext in ARCH_EXT if orig_path.strip().lower().endswith(ext)), ""
178
)
179
if extension != "":
180
return re_split(f"{extension}$", orig_path, maxsplit=1, flags=I)[0]
181
else:
182
raise NotSupportedExtractionArchive("File format not supported for extraction")
183
184
185
async def create_recursive_symlink(source, destination):
186
if ospath.isdir(source):
187
await aiomakedirs(destination, exist_ok=True)
188
for item in await listdir(source):
189
item_source = ospath.join(source, item)
190
item_dest = ospath.join(destination, item)
191
await create_recursive_symlink(item_source, item_dest)
192
elif ospath.isfile(source):
193
try:
194
await symlink(source, destination)
195
except FileExistsError:
196
LOGGER.error(f"Shortcut already exists: {destination}")
197
except Exception as e:
198
LOGGER.error(f"Error creating shortcut for {source}: {e}")
199
200
201
def get_mime_type(file_path):
202
if ospath.islink(file_path):
203
file_path = readlink(file_path)
204
mime = Magic(mime=True)
205
mime_type = mime.from_file(file_path)
206
mime_type = mime_type or "text/plain"
207
return mime_type
208
209
210
async def remove_excluded_files(fpath, ee):
211
for root, _, files in await sync_to_async(walk, fpath):
212
if root.strip().endswith("/yt-dlp-thumb"):
213
continue
214
for f in files:
215
if f.strip().lower().endswith(tuple(ee)):
216
await remove(ospath.join(root, f))
217
218
219
async def remove_non_included_files(fpath, ie):
220
for root, _, files in await sync_to_async(walk, fpath):
221
if root.strip().endswith("/yt-dlp-thumb"):
222
continue
223
for f in files:
224
if f.strip().lower().endswith(tuple(ie)):
225
continue
226
await remove(ospath.join(root, f))
227
228
229
async def move_and_merge(source, destination, mid):
230
if not await aiopath.exists(destination):
231
await aiomakedirs(destination, exist_ok=True)
232
for item in await listdir(source):
233
item = item.strip()
234
src_path = f"{source}/{item}"
235
dest_path = f"{destination}/{item}"
236
if await aiopath.isdir(src_path):
237
if await aiopath.exists(dest_path):
238
await move_and_merge(src_path, dest_path, mid)
239
else:
240
await move(src_path, dest_path)
241
else:
242
if item.endswith((".aria2", ".!qB")):
243
continue
244
if await aiopath.exists(dest_path):
245
dest_path = f"{destination}/{mid}-{item}"
246
await move(src_path, dest_path)
247
248
249
async def join_files(opath):
250
files = await listdir(opath)
251
results = []
252
exists = False
253
for file_ in files:
254
if re_search(r"\.0+2$", file_) and await sync_to_async(
255
get_mime_type, f"{opath}/{file_}"
256
) not in ["application/x-7z-compressed", "application/zip"]:
257
exists = True
258
final_name = file_.rsplit(".", 1)[0]
259
fpath = f"{opath}/{final_name}"
260
cmd = f'cat "{fpath}."* > "{fpath}"'
261
_, stderr, code = await cmd_exec(cmd, True)
262
if code != 0:
263
LOGGER.error(f"Failed to join {final_name}, stderr: {stderr}")
264
if await aiopath.isfile(fpath):
265
await remove(fpath)
266
else:
267
results.append(final_name)
268
269
if not exists:
270
LOGGER.warning("No files to join!")
271
elif results:
272
LOGGER.info("Join Completed!")
273
for res in results:
274
for file_ in files:
275
if re_search(rf"{escape(res)}\.0[0-9]+$", file_):
276
await remove(f"{opath}/{file_}")
277
278
279
async def split_file(f_path, split_size, listener):
280
out_path = f"{f_path}."
281
if listener.is_cancelled:
282
return False
283
listener.subproc = await create_subprocess_exec(
284
"split",
285
"--numeric-suffixes=1",
286
"--suffix-length=3",
287
f"--bytes={split_size}",
288
f_path,
289
out_path,
290
stderr=PIPE,
291
)
292
_, stderr = await listener.subproc.communicate()
293
code = listener.subproc.returncode
294
if listener.is_cancelled:
295
return False
296
if code == -9:
297
listener.is_cancelled = True
298
return False
299
elif code != 0:
300
try:
301
stderr = stderr.decode().strip()
302
except:
303
stderr = "Unable to decode the error!"
304
LOGGER.error(f"{stderr}. Split Document: {f_path}")
305
return True
306
307
308
class SevenZ:
309
def __init__(self, listener):
310
self._listener = listener
311
self._processed_bytes = 0
312
self._percentage = "0%"
313
314
@property
315
def processed_bytes(self):
316
return self._processed_bytes
317
318
@property
319
def progress(self):
320
return self._percentage
321
322
async def _sevenz_progress(self):
323
pattern = (
324
r"(\d+)\s+bytes|Total Physical Size\s*=\s*(\d+)|Physical Size\s*=\s*(\d+)"
325
)
326
while not (
327
self._listener.subproc.returncode is not None
328
or self._listener.is_cancelled
329
or self._listener.subproc.stdout.at_eof()
330
):
331
try:
332
line = await wait_for(self._listener.subproc.stdout.readline(), 2)
333
except:
334
break
335
line = line.decode().strip()
336
if "%" in line:
337
perc = line.split("%", 1)[0]
338
if perc.isdigit():
339
self._percentage = f"{perc}%"
340
self._processed_bytes = (int(perc) / 100) * self._listener.subsize
341
else:
342
self._percentage = "0%"
343
continue
344
if match := re_search(pattern, line):
345
self._listener.subsize = int(match[1] or match[2] or match[3])
346
s = b""
347
while not (
348
self._listener.is_cancelled
349
or self._listener.subproc.returncode is not None
350
or self._listener.subproc.stdout.at_eof()
351
):
352
try:
353
char = await wait_for(self._listener.subproc.stdout.read(1), 60)
354
except:
355
break
356
if not char:
357
break
358
s += char
359
if char == b"%":
360
try:
361
self._percentage = s.decode().rsplit(" ", 1)[-1].strip()
362
self._processed_bytes = (
363
int(self._percentage.strip("%")) / 100
364
) * self._listener.subsize
365
except:
366
self._processed_bytes = 0
367
self._percentage = "0%"
368
s = b""
369
370
self._processed_bytes = 0
371
self._percentage = "0%"
372
373
async def extract(self, f_path, t_path, pswd):
374
cmd = [
375
"7z",
376
"x",
377
f"-p{pswd}",
378
f_path,
379
f"-o{t_path}",
380
"-aot",
381
"-xr!@PaxHeader",
382
"-bsp1",
383
"-bse1",
384
"-bb3",
385
]
386
if not pswd:
387
del cmd[2]
388
if self._listener.is_cancelled:
389
return False
390
self._listener.subproc = await create_subprocess_exec(
391
*cmd,
392
stdout=PIPE,
393
stderr=PIPE,
394
)
395
await self._sevenz_progress()
396
_, stderr = await self._listener.subproc.communicate()
397
code = self._listener.subproc.returncode
398
if self._listener.is_cancelled:
399
return False
400
if code == -9:
401
self._listener.is_cancelled = True
402
return False
403
elif code != 0:
404
try:
405
stderr = stderr.decode().strip()
406
except:
407
stderr = "Unable to decode the error!"
408
LOGGER.error(f"{stderr}. Unable to extract archive!. Path: {f_path}")
409
return code
410
411
async def zip(self, dl_path, up_path, pswd):
412
size = await get_path_size(dl_path)
413
if self._listener.equal_splits:
414
parts = -(-size // self._listener.split_size)
415
split_size = (size // parts) + (size % parts)
416
else:
417
split_size = self._listener.split_size
418
cmd = [
419
"7z",
420
f"-v{split_size}b",
421
"a",
422
"-mx=0",
423
f"-p{pswd}",
424
up_path,
425
dl_path,
426
"-bsp1",
427
"-bse1",
428
"-bb3",
429
]
430
if self._listener.is_leech and int(size) > self._listener.split_size:
431
if not pswd:
432
del cmd[4]
433
LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}.0*")
434
else:
435
del cmd[1]
436
if not pswd:
437
del cmd[3]
438
LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}")
439
if self._listener.is_cancelled:
440
return False
441
self._listener.subproc = await create_subprocess_exec(
442
*cmd, stdout=PIPE, stderr=PIPE
443
)
444
await self._sevenz_progress()
445
_, stderr = await self._listener.subproc.communicate()
446
code = self._listener.subproc.returncode
447
if self._listener.is_cancelled:
448
return False
449
if code == -9:
450
self._listener.is_cancelled = True
451
return False
452
elif code == 0:
453
await clean_target(dl_path)
454
return up_path
455
else:
456
if await aiopath.exists(up_path):
457
await remove(up_path)
458
try:
459
stderr = stderr.decode().strip()
460
except:
461
stderr = "Unable to decode the error!"
462
LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}")
463
return dl_path
464
465