Path: blob/master/bot/helper/ext_utils/files_utils.py
1630 views
from aioshutil import rmtree as aiormtree, move1from asyncio import create_subprocess_exec, wait_for2from asyncio.subprocess import PIPE3from magic import Magic4from os import walk, path as ospath, readlink5from re import split as re_split, I, search as re_search, escape6from aiofiles.os import (7remove,8path as aiopath,9listdir,10rmdir,11readlink as aioreadlink,12symlink,13makedirs as aiomakedirs,14)1516from ... import LOGGER, DOWNLOAD_DIR17from ...core.torrent_manager import TorrentManager18from .bot_utils import sync_to_async, cmd_exec19from .exceptions import NotSupportedExtractionArchive2021ARCH_EXT = [22".tar.bz2",23".tar.gz",24".bz2",25".gz",26".tar.xz",27".tar",28".tbz2",29".tgz",30".lzma2",31".zip",32".7z",33".z",34".rar",35".iso",36".wim",37".cab",38".apm",39".arj",40".chm",41".cpio",42".cramfs",43".deb",44".dmg",45".fat",46".hfs",47".lzh",48".lzma",49".mbr",50".msi",51".mslz",52".nsis",53".ntfs",54".rpm",55".squashfs",56".udf",57".vhd",58".xar",59".zst",60".zstd",61".cbz",62".apfs",63".ar",64".qcow",65".macho",66".exe",67".dll",68".sys",69".pmd",70".swf",71".swfc",72".simg",73".vdi",74".vhdx",75".vmdk",76".gzip",77".lzma86",78".sha256",79".sha512",80".sha224",81".sha384",82".sha1",83".md5",84".crc32",85".crc64",86]878889FIRST_SPLIT_REGEX = (90r"\.part0*1\.rar$|\.7z\.0*1$|\.zip\.0*1$|^(?!.*\.part\d+\.rar$).*\.rar$"91)9293SPLIT_REGEX = r"\.r\d+$|\.7z\.\d+$|\.z\d+$|\.zip\.\d+$|\.part\d+\.rar$"949596def is_first_archive_split(file):97return bool(re_search(FIRST_SPLIT_REGEX, file.lower(), I))9899100def is_archive(file):101return file.strip().lower().endswith(tuple(ARCH_EXT))102103104def is_archive_split(file):105return bool(re_search(SPLIT_REGEX, file.lower(), I))106107108async def clean_target(opath):109if await aiopath.exists(opath):110LOGGER.info(f"Cleaning Target: {opath}")111try:112if await aiopath.isdir(opath):113await aiormtree(opath, ignore_errors=True)114else:115await remove(opath)116except Exception as e:117LOGGER.error(str(e))118119120async def clean_download(opath):121if await aiopath.exists(opath):122LOGGER.info(f"Cleaning Download: {opath}")123try:124await aiormtree(opath, ignore_errors=True)125except Exception as e:126LOGGER.error(str(e))127128129async def clean_all():130await TorrentManager.remove_all()131LOGGER.info("Cleaning Download Directory")132await (await create_subprocess_exec("rm", "-rf", DOWNLOAD_DIR)).wait()133await aiomakedirs(DOWNLOAD_DIR, exist_ok=True)134135136async def clean_unwanted(opath):137LOGGER.info(f"Cleaning unwanted files/folders: {opath}")138for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):139for filee in files:140f_path = ospath.join(dirpath, filee)141if filee.strip().endswith(".parts") and filee.startswith("."):142await remove(f_path)143if dirpath.strip().endswith(".unwanted"):144await aiormtree(dirpath, ignore_errors=True)145for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):146if not await listdir(dirpath):147await rmdir(dirpath)148149150async def get_path_size(opath):151total_size = 0152if await aiopath.isfile(opath):153if await aiopath.islink(opath):154opath = await aioreadlink(opath)155return await aiopath.getsize(opath)156for root, _, files in await sync_to_async(walk, opath):157for f in files:158abs_path = ospath.join(root, f)159if await aiopath.islink(abs_path):160abs_path = await aioreadlink(abs_path)161total_size += await aiopath.getsize(abs_path)162return total_size163164165async def count_files_and_folders(opath):166total_files = 0167total_folders = 0168for _, dirs, files in await sync_to_async(walk, opath):169total_files += len(files)170total_folders += len(dirs)171return total_folders, total_files172173174def get_base_name(orig_path):175extension = next(176(ext for ext in ARCH_EXT if orig_path.strip().lower().endswith(ext)), ""177)178if extension != "":179return re_split(f"{extension}$", orig_path, maxsplit=1, flags=I)[0]180else:181raise NotSupportedExtractionArchive("File format not supported for extraction")182183184async def create_recursive_symlink(source, destination):185if ospath.isdir(source):186await aiomakedirs(destination, exist_ok=True)187for item in await listdir(source):188item_source = ospath.join(source, item)189item_dest = ospath.join(destination, item)190await create_recursive_symlink(item_source, item_dest)191elif ospath.isfile(source):192try:193await symlink(source, destination)194except FileExistsError:195LOGGER.error(f"Shortcut already exists: {destination}")196except Exception as e:197LOGGER.error(f"Error creating shortcut for {source}: {e}")198199200def get_mime_type(file_path):201if ospath.islink(file_path):202file_path = readlink(file_path)203mime = Magic(mime=True)204mime_type = mime.from_file(file_path)205mime_type = mime_type or "text/plain"206return mime_type207208209async def remove_excluded_files(fpath, ee):210for root, _, files in await sync_to_async(walk, fpath):211if root.strip().endswith("/yt-dlp-thumb"):212continue213for f in files:214if f.strip().lower().endswith(tuple(ee)):215await remove(ospath.join(root, f))216217218async def remove_non_included_files(fpath, ie):219for root, _, files in await sync_to_async(walk, fpath):220if root.strip().endswith("/yt-dlp-thumb"):221continue222for f in files:223if f.strip().lower().endswith(tuple(ie)):224continue225await remove(ospath.join(root, f))226227228async def move_and_merge(source, destination, mid):229if not await aiopath.exists(destination):230await aiomakedirs(destination, exist_ok=True)231for item in await listdir(source):232item = item.strip()233src_path = f"{source}/{item}"234dest_path = f"{destination}/{item}"235if await aiopath.isdir(src_path):236if await aiopath.exists(dest_path):237await move_and_merge(src_path, dest_path, mid)238else:239await move(src_path, dest_path)240else:241if item.endswith((".aria2", ".!qB")):242continue243if await aiopath.exists(dest_path):244dest_path = f"{destination}/{mid}-{item}"245await move(src_path, dest_path)246247248async def join_files(opath):249files = await listdir(opath)250results = []251exists = False252for file_ in files:253if re_search(r"\.0+2$", file_) and await sync_to_async(254get_mime_type, f"{opath}/{file_}"255) not in ["application/x-7z-compressed", "application/zip"]:256exists = True257final_name = file_.rsplit(".", 1)[0]258fpath = f"{opath}/{final_name}"259cmd = f'cat "{fpath}."* > "{fpath}"'260_, stderr, code = await cmd_exec(cmd, True)261if code != 0:262LOGGER.error(f"Failed to join {final_name}, stderr: {stderr}")263if await aiopath.isfile(fpath):264await remove(fpath)265else:266results.append(final_name)267268if not exists:269LOGGER.warning("No files to join!")270elif results:271LOGGER.info("Join Completed!")272for res in results:273for file_ in files:274if re_search(rf"{escape(res)}\.0[0-9]+$", file_):275await remove(f"{opath}/{file_}")276277278async def split_file(f_path, split_size, listener):279out_path = f"{f_path}."280if listener.is_cancelled:281return False282listener.subproc = await create_subprocess_exec(283"split",284"--numeric-suffixes=1",285"--suffix-length=3",286f"--bytes={split_size}",287f_path,288out_path,289stderr=PIPE,290)291_, stderr = await listener.subproc.communicate()292code = listener.subproc.returncode293if listener.is_cancelled:294return False295if code == -9:296listener.is_cancelled = True297return False298elif code != 0:299try:300stderr = stderr.decode().strip()301except:302stderr = "Unable to decode the error!"303LOGGER.error(f"{stderr}. Split Document: {f_path}")304return True305306307class SevenZ:308def __init__(self, listener):309self._listener = listener310self._processed_bytes = 0311self._percentage = "0%"312313@property314def processed_bytes(self):315return self._processed_bytes316317@property318def progress(self):319return self._percentage320321async def _sevenz_progress(self):322pattern = (323r"(\d+)\s+bytes|Total Physical Size\s*=\s*(\d+)|Physical Size\s*=\s*(\d+)"324)325while not (326self._listener.subproc.returncode is not None327or self._listener.is_cancelled328or self._listener.subproc.stdout.at_eof()329):330try:331line = await wait_for(self._listener.subproc.stdout.readline(), 2)332except:333break334line = line.decode().strip()335if "%" in line:336perc = line.split("%", 1)[0]337if perc.isdigit():338self._percentage = f"{perc}%"339self._processed_bytes = (int(perc) / 100) * self._listener.subsize340else:341self._percentage = "0%"342continue343if match := re_search(pattern, line):344self._listener.subsize = int(match[1] or match[2] or match[3])345s = b""346while not (347self._listener.is_cancelled348or self._listener.subproc.returncode is not None349or self._listener.subproc.stdout.at_eof()350):351try:352char = await wait_for(self._listener.subproc.stdout.read(1), 60)353except:354break355if not char:356break357s += char358if char == b"%":359try:360self._percentage = s.decode().rsplit(" ", 1)[-1].strip()361self._processed_bytes = (362int(self._percentage.strip("%")) / 100363) * self._listener.subsize364except:365self._processed_bytes = 0366self._percentage = "0%"367s = b""368369self._processed_bytes = 0370self._percentage = "0%"371372async def extract(self, f_path, t_path, pswd):373cmd = [374"7z",375"x",376f"-p{pswd}",377f_path,378f"-o{t_path}",379"-aot",380"-xr!@PaxHeader",381"-bsp1",382"-bse1",383"-bb3",384]385if not pswd:386del cmd[2]387if self._listener.is_cancelled:388return False389self._listener.subproc = await create_subprocess_exec(390*cmd,391stdout=PIPE,392stderr=PIPE,393)394await self._sevenz_progress()395_, stderr = await self._listener.subproc.communicate()396code = self._listener.subproc.returncode397if self._listener.is_cancelled:398return False399if code == -9:400self._listener.is_cancelled = True401return False402elif code != 0:403try:404stderr = stderr.decode().strip()405except:406stderr = "Unable to decode the error!"407LOGGER.error(f"{stderr}. Unable to extract archive!. Path: {f_path}")408return code409410async def zip(self, dl_path, up_path, pswd):411size = await get_path_size(dl_path)412if self._listener.equal_splits:413parts = -(-size // self._listener.split_size)414split_size = (size // parts) + (size % parts)415else:416split_size = self._listener.split_size417cmd = [418"7z",419f"-v{split_size}b",420"a",421"-mx=0",422f"-p{pswd}",423up_path,424dl_path,425"-bsp1",426"-bse1",427"-bb3",428]429if self._listener.is_leech and int(size) > self._listener.split_size:430if not pswd:431del cmd[4]432LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}.0*")433else:434del cmd[1]435if not pswd:436del cmd[3]437LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}")438if self._listener.is_cancelled:439return False440self._listener.subproc = await create_subprocess_exec(441*cmd, stdout=PIPE, stderr=PIPE442)443await self._sevenz_progress()444_, stderr = await self._listener.subproc.communicate()445code = self._listener.subproc.returncode446if self._listener.is_cancelled:447return False448if code == -9:449self._listener.is_cancelled = True450return False451elif code == 0:452await clean_target(dl_path)453return up_path454else:455if await aiopath.exists(up_path):456await remove(up_path)457try:458stderr = stderr.decode().strip()459except:460stderr = "Unable to decode the error!"461LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}")462return dl_path463464465