Path: blob/main/singlestoredb/management/manager.py
800 views
#!/usr/bin/env python1"""SingleStoreDB Base Manager."""2import os3import sys4import time5from typing import Any6from typing import Dict7from typing import List8from typing import Optional9from typing import Union10from urllib.parse import urljoin1112import jwt13import requests1415from .. import config16from ..exceptions import ManagementError17from ..exceptions import OperationalError18from .utils import get_token192021def set_organization(kwargs: Dict[str, Any]) -> None:22"""Set the organization ID in the dictionary."""23if kwargs.get('params', {}).get('organizationID', None):24return2526org = os.environ.get('SINGLESTOREDB_ORGANIZATION')27if org:28if 'params' not in kwargs:29kwargs['params'] = {}30kwargs['params']['organizationID'] = org313233def is_jwt(token: str) -> bool:34"""Is the given token a JWT?"""35try:36jwt.decode(token, options={'verify_signature': False})37return True38except jwt.DecodeError:39return False404142class Manager(object):43"""SingleStoreDB manager base class."""4445#: Management API version if none is specified.46default_version = config.get_option('management.version') or 'v1'4748#: Base URL if none is specified.49default_base_url = config.get_option('management.base_url') \50or 'https://api.singlestore.com'5152#: Object type53obj_type = ''5455def __init__(56self, access_token: Optional[str] = None, version: Optional[str] = None,57base_url: Optional[str] = None, *, organization_id: Optional[str] = None,58):59from .. import __version__ as client_version60new_access_token = (61access_token or get_token()62)63if not new_access_token:64raise ManagementError(msg='No management token was configured.')6566self._is_jwt = not access_token and new_access_token and is_jwt(new_access_token)67self._sess = requests.Session()68self._sess.headers.update({69'Authorization': f'Bearer {new_access_token}',70'Content-Type': 'application/json',71'Accept': 'application/json',72'User-Agent': f'SingleStoreDB-Python/{client_version}',73})7475self._base_url = urljoin(76base_url77or config.get_option('management.base_url')78or type(self).default_base_url,79version or type(self).default_version,80) + '/'8182self._params: Dict[str, str] = {}83if organization_id:84self._params['organizationID'] = organization_id8586def _check(87self, res: requests.Response, url: str, params: Dict[str, Any],88) -> requests.Response:89"""90Check the HTTP response status code and raise an exception as needed.9192Parameters93----------94res : requests.Response95HTTP response to check9697Returns98-------99requests.Response100101"""102if config.get_option('debug.queries'):103print(os.path.join(self._base_url, url), params, file=sys.stderr)104if res.status_code >= 400:105txt = res.text.strip()106msg = f'{txt}: /{url}'107if params:108new_params = params.copy()109if 'json' in new_params:110for k, v in new_params['json'].items():111if 'password' in k.lower() and v:112new_params['json'][k] = '*' * len(v)113msg += ': {}'.format(str(new_params))114raise ManagementError(errno=res.status_code, msg=msg, response=txt)115return res116117def _doit(118self,119method: str,120path: str,121*args: Any,122**kwargs: Any,123) -> requests.Response:124"""Perform HTTP request."""125# Refresh the JWT as needed126if self._is_jwt:127self._sess.headers.update({'Authorization': f'Bearer {get_token()}'})128return getattr(self._sess, method.lower())(129urljoin(self._base_url, path), *args, **kwargs,130)131132def _get(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:133"""134Invoke a GET request.135136Parameters137----------138path : str139Path of the resource140*args : positional arguments, optional141Arguments to add to the GET request142**kwargs : keyword arguments, optional143Keyword arguments to add to the GET request144145Returns146-------147requests.Response148149"""150if self._params:151params = dict(self._params)152params.update(kwargs.get('params', {}))153kwargs['params'] = params154set_organization(kwargs)155return self._check(self._doit('get', path, *args, **kwargs), path, kwargs)156157def _post(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:158"""159Invoke a POST request.160161Parameters162----------163path : str164Path of the resource165*args : positional arguments, optional166Arguments to add to the POST request167**kwargs : keyword arguments, optional168Keyword arguments to add to the POST request169170Returns171-------172requests.Response173174"""175if self._params:176params = dict(self._params)177params.update(kwargs.get('params', {}))178kwargs['params'] = params179set_organization(kwargs)180return self._check(self._doit('post', path, *args, **kwargs), path, kwargs)181182def _put(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:183"""184Invoke a PUT request.185186Parameters187----------188path : str189Path of the resource190*args : positional arguments, optional191Arguments to add to the POST request192**kwargs : keyword arguments, optional193Keyword arguments to add to the POST request194195Returns196-------197requests.Response198199"""200if self._params:201params = dict(self._params)202params.update(kwargs.get('params', {}))203kwargs['params'] = params204set_organization(kwargs)205return self._check(self._doit('put', path, *args, **kwargs), path, kwargs)206207def _delete(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:208"""209Invoke a DELETE request.210211Parameters212----------213path : str214Path of the resource215*args : positional arguments, optional216Arguments to add to the DELETE request217**kwargs : keyword arguments, optional218Keyword arguments to add to the DELETE request219220Returns221-------222requests.Response223224"""225if self._params:226params = dict(self._params)227params.update(kwargs.get('params', {}))228kwargs['params'] = params229set_organization(kwargs)230return self._check(self._doit('delete', path, *args, **kwargs), path, kwargs)231232def _patch(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:233"""234Invoke a PATCH request.235236Parameters237----------238path : str239Path of the resource240*args : positional arguments, optional241Arguments to add to the PATCH request242**kwargs : keyword arguments, optional243Keyword arguments to add to the PATCH request244245Returns246-------247requests.Response248249"""250if self._params:251params = dict(self._params)252params.update(kwargs.get('params', {}))253kwargs['params'] = params254set_organization(kwargs)255return self._check(self._doit('patch', path, *args, **kwargs), path, kwargs)256257def _wait_on_state(258self,259out: Any,260state: Union[str, List[str]],261interval: int = 20,262timeout: int = 600,263) -> Any:264"""265Wait on server state before continuing.266267Parameters268----------269out : Any270Current object271state : str or List[str]272State(s) to wait for273interval : int, optional274Interval between each server poll275timeout : int, optional276Maximum time to wait before raising an exception277278Raises279------280ManagementError281If timeout is reached282283Returns284-------285Same object type as `out`286287"""288states = [289x.lower().strip()290for x in (isinstance(state, str) and [state] or state)291]292293if getattr(out, 'state', None) is None:294raise ManagementError(295msg='{} object does not have a `state` attribute'.format(296type(out).__name__,297),298)299300while True:301if getattr(out, 'state').lower() in states:302break303if timeout <= 0:304raise ManagementError(305msg=f'Exceeded waiting time for {self.obj_type} to become '306'{}.'.format(', '.join(states)),307)308time.sleep(interval)309timeout -= interval310out = getattr(self, f'get_{self.obj_type}')(out.id)311312return out313314def _wait_on_endpoint(315self,316out: Any,317interval: int = 10,318timeout: int = 300,319) -> Any:320"""321Wait for the endpoint to be ready by attempting to connect.322323Parameters324----------325out : Any326Workspace object with a connect method327interval : int, optional328Interval between each connection attempt (default: 10 seconds)329timeout : int, optional330Maximum time to wait before raising an exception (default: 300 seconds)331332Raises333------334ManagementError335If timeout is reached or endpoint is not available336337Returns338-------339Same object type as `out`340341"""342# Only wait if workload type is set which means we are in the343# notebook environment. Outside of the environment, the endpoint344# may not be reachable directly.345if not os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE', ''):346return out347348if not hasattr(out, 'connect') or not out.connect:349raise ManagementError(350msg=f'{type(out).__name__} object does not have a valid endpoint',351)352353while True:354try:355# Try to establish a connection to the endpoint using context manager356with out.connect(connect_timeout=5):357pass358except Exception as exc:359# If we get an 'access denied' error, that means that the server is360# up and we just aren't authenticating.361if isinstance(exc, OperationalError) and exc.errno == 1045:362break363# If connection fails, check timeout and retry364if timeout <= 0:365raise ManagementError(366msg=f'Exceeded waiting time for {self.obj_type} endpoint '367'to become ready',368)369time.sleep(interval)370timeout -= interval371372return out373374375