Path: blob/master/bot/core/torrent_manager.py
1630 views
from aioaria2 import Aria2WebsocketClient1from aioqbt.client import create_client2from asyncio import gather, TimeoutError3from aiohttp import ClientError4from pathlib import Path5from inspect import iscoroutinefunction6from tenacity import (7retry,8stop_after_attempt,9wait_exponential,10retry_if_exception_type,11)1213from .. import LOGGER, aria2_options141516def wrap_with_retry(obj, max_retries=3):17for attr_name in dir(obj):18if attr_name.startswith("_"):19continue2021attr = getattr(obj, attr_name)22if iscoroutinefunction(attr):23retry_policy = retry(24stop=stop_after_attempt(max_retries),25wait=wait_exponential(multiplier=1, min=1, max=5),26retry=retry_if_exception_type(27(ClientError, TimeoutError, RuntimeError)28),29)30wrapped = retry_policy(attr)31setattr(obj, attr_name, wrapped)32return obj333435class TorrentManager:36aria2 = None37qbittorrent = None3839@classmethod40async def initiate(cls):41cls.aria2, cls.qbittorrent = await gather(42Aria2WebsocketClient.new("http://localhost:6800/jsonrpc"),43create_client("http://localhost:8090/api/v2/"),44)45cls.qbittorrent = wrap_with_retry(cls.qbittorrent)4647@classmethod48async def close_all(cls):49await gather(cls.aria2.close(), cls.qbittorrent.close())5051@classmethod52async def aria2_remove(cls, download):53if download.get("status", "") in ["active", "paused", "waiting"]:54await cls.aria2.forceRemove(download.get("gid", ""))55else:56try:57await cls.aria2.removeDownloadResult(download.get("gid", ""))58except:59pass6061@classmethod62async def remove_all(cls):63await cls.pause_all()64await gather(65cls.qbittorrent.torrents.delete("all", False),66cls.aria2.purgeDownloadResult(),67)68downloads = []69results = await gather(cls.aria2.tellActive(), cls.aria2.tellWaiting(0, 1000))70for res in results:71downloads.extend(res)72tasks = []73tasks.extend(74cls.aria2.forceRemove(download.get("gid")) for download in downloads75)76try:77await gather(*tasks)78except:79pass8081@classmethod82async def overall_speed(cls):83s1, s2 = await gather(84cls.qbittorrent.transfer.info(), cls.aria2.getGlobalStat()85)86download_speed = s1.dl_info_speed + int(s2.get("downloadSpeed", "0"))87upload_speed = s1.up_info_speed + int(s2.get("uploadSpeed", "0"))88return download_speed, upload_speed8990@classmethod91async def pause_all(cls):92await gather(cls.aria2.forcePauseAll(), cls.qbittorrent.torrents.stop("all"))9394@classmethod95async def change_aria2_option(cls, key, value):96downloads = []97results = await gather(cls.aria2.tellActive(), cls.aria2.tellWaiting(0, 1000))98for res in results:99downloads.extend(res)100tasks = []101for download in downloads:102if download.get("status", "") != "complete":103tasks.append(cls.aria2.changeOption(download.get("gid"), {key: value}))104if tasks:105try:106await gather(*tasks)107except Exception as e:108LOGGER.error(e)109if key not in ["checksum", "index-out", "out", "pause", "select-file"]:110await cls.aria2.changeGlobalOption({key: value})111aria2_options[key] = value112113114def aria2_name(download_info):115if "bittorrent" in download_info and download_info["bittorrent"].get("info"):116return download_info["bittorrent"]["info"]["name"]117elif download_info.get("files"):118if download_info["files"][0]["path"].startswith("[METADATA]"):119return download_info["files"][0]["path"]120file_path = download_info["files"][0]["path"]121dir_path = download_info["dir"]122if file_path.startswith(dir_path):123return Path(file_path[len(dir_path) + 1 :]).parts[0]124else:125return ""126else:127return ""128129130def is_metadata(download_info):131return any(132f["path"].startswith("[METADATA]") for f in download_info.get("files", [])133)134135136