Path: blob/master/sabnzbdapi/requests.py
1630 views
from httpx import AsyncClient, AsyncHTTPTransport, Timeout1from urllib3 import disable_warnings2from urllib3.exceptions import InsecureRequestWarning34from .job_functions import JobFunctions5from .exception import APIConnectionError678class SabnzbdClient(JobFunctions):910LOGGED_IN = False1112def __init__(13self,14host: str,15api_key: str,16port: str = "8070",17VERIFY_CERTIFICATE: bool = False,18RETRIES: int = 10,19HTTPX_REQUETS_ARGS: dict = None,20):21if HTTPX_REQUETS_ARGS is None:22HTTPX_REQUETS_ARGS = {}23self._base_url = f"{host.rstrip('/')}:{port}"24self._default_params = {"apikey": api_key, "output": "json"}25self._VERIFY_CERTIFICATE = VERIFY_CERTIFICATE26self._RETRIES = RETRIES27self._HTTPX_REQUETS_ARGS = HTTPX_REQUETS_ARGS28self._http_session = None29if not self._VERIFY_CERTIFICATE:30disable_warnings(InsecureRequestWarning)31super().__init__()3233def _session(self):34if self._http_session is not None:35return self._http_session3637transport = AsyncHTTPTransport(38retries=self._RETRIES, verify=self._VERIFY_CERTIFICATE39)4041self._http_session = AsyncClient(42base_url=self._base_url,43transport=transport,44timeout=Timeout(connect=60, read=60, write=60, pool=None),45follow_redirects=True,46verify=self._VERIFY_CERTIFICATE,47**self._HTTPX_REQUETS_ARGS,48)4950return self._http_session5152async def call(53self,54params: dict = None,55requests_args: dict = None,56**kwargs,57):58if requests_args is None:59requests_args = {}60session = self._session()61params |= kwargs62res = await session.get(63url="/sabnzbd/api",64params={**self._default_params, **params},65**requests_args,66)67response = res.json()68if response is None:69raise APIConnectionError("Failed to connect to API!")70return response7172async def close(self):73if self._http_session is not None:74await self._http_session.aclose()75self._http_session = None767778