Path: blob/main/singlestoredb/management/manager.py
469 views
#!/usr/bin/env python1"""SingleStoreDB Base Manager."""2import os3import sys4import time5from typing import Any6from typing import Dict7from typing import List8from typing import Optional9from typing import Union10from urllib.parse import urljoin1112import jwt13import requests1415from .. import config16from ..exceptions import ManagementError17from .utils import get_token181920def set_organization(kwargs: Dict[str, Any]) -> None:21"""Set the organization ID in the dictionary."""22if kwargs.get('params', {}).get('organizationID', None):23return2425org = os.environ.get('SINGLESTOREDB_ORGANIZATION')26if org:27if 'params' not in kwargs:28kwargs['params'] = {}29kwargs['params']['organizationID'] = org303132def is_jwt(token: str) -> bool:33"""Is the given token a JWT?"""34try:35jwt.decode(token, options={'verify_signature': False})36return True37except jwt.DecodeError:38return False394041class Manager(object):42"""SingleStoreDB manager base class."""4344#: Management API version if none is specified.45default_version = config.get_option('management.version') or 'v1'4647#: Base URL if none is specified.48default_base_url = config.get_option('management.base_url') \49or 'https://api.singlestore.com'5051#: Object type52obj_type = ''5354def __init__(55self, access_token: Optional[str] = None, version: Optional[str] = None,56base_url: Optional[str] = None, *, organization_id: Optional[str] = None,57):58from .. import __version__ as client_version59new_access_token = (60access_token or get_token()61)62if not new_access_token:63raise ManagementError(msg='No management token was configured.')6465self._is_jwt = not access_token and new_access_token and is_jwt(new_access_token)66self._sess = requests.Session()67self._sess.headers.update({68'Authorization': f'Bearer {new_access_token}',69'Content-Type': 'application/json',70'Accept': 'application/json',71'User-Agent': f'SingleStoreDB-Python/{client_version}',72})7374self._base_url = urljoin(75base_url76or config.get_option('management.base_url')77or type(self).default_base_url,78version or type(self).default_version,79) + '/'8081self._params: Dict[str, str] = {}82if organization_id:83self._params['organizationID'] = organization_id8485def _check(86self, res: requests.Response, url: str, params: Dict[str, Any],87) -> requests.Response:88"""89Check the HTTP response status code and raise an exception as needed.9091Parameters92----------93res : requests.Response94HTTP response to check9596Returns97-------98requests.Response99100"""101if config.get_option('debug.queries'):102print(os.path.join(self._base_url, url), params, file=sys.stderr)103if res.status_code >= 400:104txt = res.text.strip()105msg = f'{txt}: /{url}'106if params:107new_params = params.copy()108if 'json' in new_params:109for k, v in new_params['json'].items():110if 'password' in k.lower() and v:111new_params['json'][k] = '*' * len(v)112msg += ': {}'.format(str(new_params))113raise ManagementError(errno=res.status_code, msg=msg, response=txt)114return res115116def _doit(117self,118method: str,119path: str,120*args: Any,121**kwargs: Any,122) -> requests.Response:123"""Perform HTTP request."""124# Refresh the JWT as needed125if self._is_jwt:126self._sess.headers.update({'Authorization': f'Bearer {get_token()}'})127return getattr(self._sess, method.lower())(128urljoin(self._base_url, path), *args, **kwargs,129)130131def _get(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:132"""133Invoke a GET request.134135Parameters136----------137path : str138Path of the resource139*args : positional arguments, optional140Arguments to add to the GET request141**kwargs : keyword arguments, optional142Keyword arguments to add to the GET request143144Returns145-------146requests.Response147148"""149if self._params:150params = dict(self._params)151params.update(kwargs.get('params', {}))152kwargs['params'] = params153set_organization(kwargs)154return self._check(self._doit('get', path, *args, **kwargs), path, kwargs)155156def _post(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:157"""158Invoke a POST request.159160Parameters161----------162path : str163Path of the resource164*args : positional arguments, optional165Arguments to add to the POST request166**kwargs : keyword arguments, optional167Keyword arguments to add to the POST request168169Returns170-------171requests.Response172173"""174if self._params:175params = dict(self._params)176params.update(kwargs.get('params', {}))177kwargs['params'] = params178set_organization(kwargs)179return self._check(self._doit('post', path, *args, **kwargs), path, kwargs)180181def _put(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:182"""183Invoke a PUT request.184185Parameters186----------187path : str188Path of the resource189*args : positional arguments, optional190Arguments to add to the POST request191**kwargs : keyword arguments, optional192Keyword arguments to add to the POST request193194Returns195-------196requests.Response197198"""199if self._params:200params = dict(self._params)201params.update(kwargs.get('params', {}))202kwargs['params'] = params203set_organization(kwargs)204return self._check(self._doit('put', path, *args, **kwargs), path, kwargs)205206def _delete(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:207"""208Invoke a DELETE request.209210Parameters211----------212path : str213Path of the resource214*args : positional arguments, optional215Arguments to add to the DELETE request216**kwargs : keyword arguments, optional217Keyword arguments to add to the DELETE request218219Returns220-------221requests.Response222223"""224if self._params:225params = dict(self._params)226params.update(kwargs.get('params', {}))227kwargs['params'] = params228set_organization(kwargs)229return self._check(self._doit('delete', path, *args, **kwargs), path, kwargs)230231def _patch(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:232"""233Invoke a PATCH request.234235Parameters236----------237path : str238Path of the resource239*args : positional arguments, optional240Arguments to add to the PATCH request241**kwargs : keyword arguments, optional242Keyword arguments to add to the PATCH request243244Returns245-------246requests.Response247248"""249if self._params:250params = dict(self._params)251params.update(kwargs.get('params', {}))252kwargs['params'] = params253set_organization(kwargs)254return self._check(self._doit('patch', path, *args, **kwargs), path, kwargs)255256def _wait_on_state(257self,258out: Any,259state: Union[str, List[str]],260interval: int = 20,261timeout: int = 600,262) -> Any:263"""264Wait on server state before continuing.265266Parameters267----------268out : Any269Current object270state : str or List[str]271State(s) to wait for272interval : int, optional273Interval between each server poll274timeout : int, optional275Maximum time to wait before raising an exception276277Raises278------279ManagementError280If timeout is reached281282Returns283-------284Same object type as `out`285286"""287states = [288x.lower().strip()289for x in (isinstance(state, str) and [state] or state)290]291292if getattr(out, 'state', None) is None:293raise ManagementError(294msg='{} object does not have a `state` attribute'.format(295type(out).__name__,296),297)298299while True:300if getattr(out, 'state').lower() in states:301break302if timeout <= 0:303raise ManagementError(304msg=f'Exceeded waiting time for {self.obj_type} to become '305'{}.'.format(', '.join(states)),306)307time.sleep(interval)308timeout -= interval309out = getattr(self, f'get_{self.obj_type}')(out.id)310311return out312313314