Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/sabnzbdapi/requests.py
1630 views
1
from httpx import AsyncClient, AsyncHTTPTransport, Timeout
2
from urllib3 import disable_warnings
3
from urllib3.exceptions import InsecureRequestWarning
4
5
from .job_functions import JobFunctions
6
from .exception import APIConnectionError
7
8
9
class SabnzbdClient(JobFunctions):
10
11
LOGGED_IN = False
12
13
def __init__(
14
self,
15
host: str,
16
api_key: str,
17
port: str = "8070",
18
VERIFY_CERTIFICATE: bool = False,
19
RETRIES: int = 10,
20
HTTPX_REQUETS_ARGS: dict = None,
21
):
22
if HTTPX_REQUETS_ARGS is None:
23
HTTPX_REQUETS_ARGS = {}
24
self._base_url = f"{host.rstrip('/')}:{port}"
25
self._default_params = {"apikey": api_key, "output": "json"}
26
self._VERIFY_CERTIFICATE = VERIFY_CERTIFICATE
27
self._RETRIES = RETRIES
28
self._HTTPX_REQUETS_ARGS = HTTPX_REQUETS_ARGS
29
self._http_session = None
30
if not self._VERIFY_CERTIFICATE:
31
disable_warnings(InsecureRequestWarning)
32
super().__init__()
33
34
def _session(self):
35
if self._http_session is not None:
36
return self._http_session
37
38
transport = AsyncHTTPTransport(
39
retries=self._RETRIES, verify=self._VERIFY_CERTIFICATE
40
)
41
42
self._http_session = AsyncClient(
43
base_url=self._base_url,
44
transport=transport,
45
timeout=Timeout(connect=60, read=60, write=60, pool=None),
46
follow_redirects=True,
47
verify=self._VERIFY_CERTIFICATE,
48
**self._HTTPX_REQUETS_ARGS,
49
)
50
51
return self._http_session
52
53
async def call(
54
self,
55
params: dict = None,
56
requests_args: dict = None,
57
**kwargs,
58
):
59
if requests_args is None:
60
requests_args = {}
61
session = self._session()
62
params |= kwargs
63
res = await session.get(
64
url="/sabnzbd/api",
65
params={**self._default_params, **params},
66
**requests_args,
67
)
68
response = res.json()
69
if response is None:
70
raise APIConnectionError("Failed to connect to API!")
71
return response
72
73
async def close(self):
74
if self._http_session is not None:
75
await self._http_session.aclose()
76
self._http_session = None
77
78