Path: blob/master/bot/helper/ext_utils/task_manager.py
1628 views
from asyncio import Event12from ... import (3queued_dl,4queued_up,5non_queued_up,6non_queued_dl,7queue_dict_lock,8LOGGER,9)10from ...core.config_manager import Config11from ..mirror_leech_utils.gdrive_utils.search import GoogleDriveSearch12from .bot_utils import sync_to_async, get_telegraph_list13from .files_utils import get_base_name14from .links_utils import is_gdrive_id151617async def stop_duplicate_check(listener):18if (19listener.is_leech20or not listener.stop_duplicate21or listener.same_dir22or listener.select23or not is_gdrive_id(listener.up_dest)24):25return False, None2627name = listener.name28LOGGER.info(f"Checking File/Folder if already in Drive: {name}")2930if listener.compress:31name = f"{name}.zip"32elif listener.extract:33try:34name = get_base_name(name)35except:36name = None3738if name is not None:39telegraph_content, contents_no = await sync_to_async(40GoogleDriveSearch(stop_dup=True, no_multi=listener.is_clone).drive_list,41name,42listener.up_dest,43listener.user_id,44)45if telegraph_content:46msg = f"File/Folder is already available in Drive.\nHere are {contents_no} list results:"47button = await get_telegraph_list(telegraph_content)48return msg, button4950return False, None515253async def check_running_tasks(listener, state="dl"):54all_limit = Config.QUEUE_ALL55state_limit = Config.QUEUE_DOWNLOAD if state == "dl" else Config.QUEUE_UPLOAD56event = None57is_over_limit = False58async with queue_dict_lock:59if state == "up" and listener.mid in non_queued_dl:60non_queued_dl.remove(listener.mid)61if (62(all_limit or state_limit)63and not listener.force_run64and not (listener.force_upload and state == "up")65and not (listener.force_download and state == "dl")66):67dl_count = len(non_queued_dl)68up_count = len(non_queued_up)69t_count = dl_count if state == "dl" else up_count70is_over_limit = (71all_limit72and dl_count + up_count >= all_limit73and (not state_limit or t_count >= state_limit)74) or (state_limit and t_count >= state_limit)75if is_over_limit:76event = Event()77if state == "dl":78queued_dl[listener.mid] = event79else:80queued_up[listener.mid] = event81if not is_over_limit:82if state == "up":83non_queued_up.add(listener.mid)84else:85non_queued_dl.add(listener.mid)8687return is_over_limit, event888990async def start_dl_from_queued(mid: int):91queued_dl[mid].set()92del queued_dl[mid]93non_queued_dl.add(mid)949596async def start_up_from_queued(mid: int):97queued_up[mid].set()98del queued_up[mid]99non_queued_up.add(mid)100101102async def start_from_queued():103if all_limit := Config.QUEUE_ALL:104dl_limit = Config.QUEUE_DOWNLOAD105up_limit = Config.QUEUE_UPLOAD106async with queue_dict_lock:107dl = len(non_queued_dl)108up = len(non_queued_up)109all_ = dl + up110if all_ < all_limit:111f_tasks = all_limit - all_112if queued_up and (not up_limit or up < up_limit):113for index, mid in enumerate(list(queued_up.keys()), start=1):114await start_up_from_queued(mid)115f_tasks -= 1116if f_tasks == 0 or (up_limit and index >= up_limit - up):117break118if queued_dl and (not dl_limit or dl < dl_limit) and f_tasks != 0:119for index, mid in enumerate(list(queued_dl.keys()), start=1):120await start_dl_from_queued(mid)121if (dl_limit and index >= dl_limit - dl) or index == f_tasks:122break123return124125if up_limit := Config.QUEUE_UPLOAD:126async with queue_dict_lock:127up = len(non_queued_up)128if queued_up and up < up_limit:129f_tasks = up_limit - up130for index, mid in enumerate(list(queued_up.keys()), start=1):131await start_up_from_queued(mid)132if index == f_tasks:133break134else:135async with queue_dict_lock:136if queued_up:137for mid in list(queued_up.keys()):138await start_up_from_queued(mid)139140if dl_limit := Config.QUEUE_DOWNLOAD:141async with queue_dict_lock:142dl = len(non_queued_dl)143if queued_dl and dl < dl_limit:144f_tasks = dl_limit - dl145for index, mid in enumerate(list(queued_dl.keys()), start=1):146await start_dl_from_queued(mid)147if index == f_tasks:148break149else:150async with queue_dict_lock:151if queued_dl:152for mid in list(queued_dl.keys()):153await start_dl_from_queued(mid)154155156