Path: blob/main/singlestoredb/management/utils.py
469 views
#!/usr/bin/env python1"""SingleStoreDB Cluster Management."""2import datetime3import functools4import itertools5import os6import re7import sys8from typing import Any9from typing import Callable10from typing import Dict11from typing import List12from typing import Mapping13from typing import Optional14from typing import SupportsIndex15from typing import Tuple16from typing import TypeVar17from typing import Union18from urllib.parse import urlparse1920import jwt2122from .. import converters23from ..config import get_option24from ..utils import events2526JSON = Union[str, List[str], Dict[str, 'JSON']]27JSONObj = Dict[str, JSON]28JSONList = List[JSON]29T = TypeVar('T')3031if sys.version_info < (3, 10):32PathLike = Union[str, os.PathLike] # type: ignore33PathLikeABC = os.PathLike34else:35PathLike = Union[str, os.PathLike[str]]36PathLikeABC = os.PathLike[str]373839class TTLProperty(object):40"""Property with time limit."""4142def __init__(self, fget: Callable[[Any], Any], ttl: datetime.timedelta):43self.fget = fget44self.ttl = ttl45self._last_executed = datetime.datetime(2000, 1, 1)46self._last_result = None47self.__doc__ = fget.__doc__48self._name = ''4950def reset(self) -> None:51self._last_executed = datetime.datetime(2000, 1, 1)52self._last_result = None5354def __set_name__(self, owner: Any, name: str) -> None:55self._name = name5657def __get__(self, obj: Any, objtype: Any = None) -> Any:58if obj is None:59return self6061if self._last_result is not None \62and (datetime.datetime.now() - self._last_executed) < self.ttl:63return self._last_result6465self._last_result = self.fget(obj)66self._last_executed = datetime.datetime.now()6768return self._last_result697071def ttl_property(ttl: datetime.timedelta) -> Callable[[Any], Any]:72"""Property with a time-to-live."""73def wrapper(func: Callable[[Any], Any]) -> Any:74out = TTLProperty(func, ttl=ttl)75return functools.wraps(func)(out) # type: ignore76return wrapper777879class NamedList(List[T]):80"""List class which also allows selection by ``name`` and ``id`` attribute."""8182def _find_item(self, key: str) -> T:83for item in self:84if getattr(item, 'name', '') == key:85return item86if getattr(item, 'id', '') == key:87return item88raise KeyError(key)8990def __getitem__(self, key: Union[SupportsIndex, slice, str]) -> Any:91if isinstance(key, str):92return self._find_item(key)93return super().__getitem__(key)9495def __contains__(self, key: Any) -> bool:96if isinstance(key, str):97try:98self._find_item(key)99return True100except KeyError:101return False102return super().__contains__(key)103104def names(self) -> List[str]:105"""Return ``name`` attribute of each item."""106return [y for y in [getattr(x, 'name', None) for x in self] if y is not None]107108def ids(self) -> List[str]:109"""Return ``id`` attribute of each item."""110return [y for y in [getattr(x, 'id', None) for x in self] if y is not None]111112def get(self, name_or_id: str, *default: Any) -> Any:113"""Return object with name / ID if it exists, or return default value."""114try:115return self._find_item(name_or_id)116except KeyError:117if default:118return default[0]119raise120121122def _setup_authentication_info_handler() -> Callable[..., Dict[str, Any]]:123"""Setup authentication info event handler."""124125authentication_info: Dict[str, Any] = {}126127def handle_authentication_info(msg: Dict[str, Any]) -> None:128"""Handle authentication info events."""129nonlocal authentication_info130if msg.get('name', '') != 'singlestore.portal.authentication_updated':131return132authentication_info = dict(msg.get('data', {}))133134events.subscribe(handle_authentication_info)135136def handle_connection_info(msg: Dict[str, Any]) -> None:137"""Handle connection info events."""138nonlocal authentication_info139if msg.get('name', '') != 'singlestore.portal.connection_updated':140return141data = msg.get('data', {})142out = {}143if 'user' in data:144out['user'] = data['user']145if 'password' in data:146out['password'] = data['password']147authentication_info = out148149events.subscribe(handle_authentication_info)150151def retrieve_current_authentication_info() -> List[Tuple[str, Any]]:152"""Retrieve JWT if not expired."""153nonlocal authentication_info154password = authentication_info.get('password')155if password:156expires = datetime.datetime.fromtimestamp(157jwt.decode(158password,159options={'verify_signature': False},160)['exp'],161)162if datetime.datetime.now() > expires:163authentication_info = {}164return list(authentication_info.items())165166def get_env() -> List[Tuple[str, Any]]:167"""Retrieve JWT from environment."""168conn = {}169url = os.environ.get('SINGLESTOREDB_URL') or get_option('host')170if url:171urlp = urlparse(url, scheme='singlestoredb', allow_fragments=True)172conn = dict(173user=urlp.username or None,174password=urlp.password or None,175)176177return [178x for x in dict(179**conn,180).items() if x[1] is not None181]182183def get_authentication_info(include_env: bool = True) -> Dict[str, Any]:184"""Return authentication info from event."""185return dict(186itertools.chain(187(get_env() if include_env else []),188retrieve_current_authentication_info(),189),190)191192return get_authentication_info193194195get_authentication_info = _setup_authentication_info_handler()196197198def get_token() -> Optional[str]:199"""Return the token for the Management API."""200# See if an API key is configured201tok = get_option('management.token')202if tok:203return tok204205tok = get_authentication_info(include_env=True).get('password')206if tok:207try:208jwt.decode(tok, options={'verify_signature': False})209return tok210except jwt.DecodeError:211pass212213# Didn't find a key anywhere214return None215216217def get_cluster_id() -> Optional[str]:218"""Return the cluster id for the current token or environment."""219return os.environ.get('SINGLESTOREDB_CLUSTER') or None220221222def get_workspace_id() -> Optional[str]:223"""Return the workspace id for the current token or environment."""224return os.environ.get('SINGLESTOREDB_WORKSPACE') or None225226227def get_virtual_workspace_id() -> Optional[str]:228"""Return the virtual workspace id for the current token or environment."""229return os.environ.get('SINGLESTOREDB_VIRTUAL_WORKSPACE') or None230231232def get_database_name() -> Optional[str]:233"""Return the default database name for the current token or environment."""234return os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE') or None235236237def enable_http_tracing() -> None:238"""Enable tracing of HTTP requests."""239import logging240import http.client as http_client241http_client.HTTPConnection.debuglevel = 1242logging.basicConfig()243logging.getLogger().setLevel(logging.DEBUG)244requests_log = logging.getLogger('requests.packages.urllib3')245requests_log.setLevel(logging.DEBUG)246requests_log.propagate = True247248249def to_datetime(250obj: Optional[Union[str, datetime.datetime]],251) -> Optional[datetime.datetime]:252"""Convert string to datetime."""253if not obj:254return None255if isinstance(obj, datetime.datetime):256return obj257if obj == '0001-01-01T00:00:00Z':258return None259obj = obj.replace('Z', '')260# Fix datetimes with truncated zeros261if '.' in obj:262obj, micros = obj.split('.', 1)263micros = micros + '0' * (6 - len(micros))264obj = obj + '.' + micros265out = converters.datetime_fromisoformat(obj)266if isinstance(out, str):267return None268if isinstance(out, datetime.date) and not isinstance(out, datetime.datetime):269return datetime.datetime(out.year, out.month, out.day)270return out271272273def to_datetime_strict(274obj: Optional[Union[str, datetime.datetime]],275) -> datetime.datetime:276"""Convert string to datetime."""277if not obj:278raise TypeError('not possible to convert None to datetime')279if isinstance(obj, datetime.datetime):280return obj281if obj == '0001-01-01T00:00:00Z':282raise ValueError('not possible to convert 0001-01-01T00:00:00Z to datetime')283obj = obj.replace('Z', '')284# Fix datetimes with truncated zeros285if '.' in obj:286obj, micros = obj.split('.', 1)287micros = micros + '0' * (6 - len(micros))288obj = obj + '.' + micros289out = converters.datetime_fromisoformat(obj)290if not out:291raise TypeError('not possible to convert None to datetime')292if isinstance(out, str):293raise ValueError('value cannot be str')294if isinstance(out, datetime.date) and not isinstance(out, datetime.datetime):295return datetime.datetime(out.year, out.month, out.day)296return out297298299def from_datetime(300obj: Union[str, datetime.datetime],301) -> Optional[str]:302"""Convert datetime to string."""303if not obj:304return None305if isinstance(obj, str):306return obj307out = obj.isoformat()308if not re.search(r'[A-Za-z]$', out):309out = f'{out}Z'310return out311312313def vars_to_str(obj: Any) -> str:314"""Render a string representation of vars(obj)."""315attrs = []316obj_vars = vars(obj)317if 'name' in obj_vars:318attrs.append('name={}'.format(repr(obj_vars['name'])))319if 'id' in obj_vars:320attrs.append('id={}'.format(repr(obj_vars['id'])))321for name, value in sorted(obj_vars.items()):322if name in ('name', 'id'):323continue324if not value or name.startswith('_'):325continue326attrs.append('{}={}'.format(name, repr(value)))327return '{}({})'.format(type(obj).__name__, ', '.join(attrs))328329330def single_item(s: Any) -> Any:331"""Return only item if ``s`` is a list, otherwise return ``s``."""332if isinstance(s, list):333if len(s) != 1:334raise ValueError('list must only contain a singleitem')335return s[0]336return s337338339def stringify(s: JSON) -> str:340"""Convert list of strings to single string."""341if isinstance(s, (tuple, list)):342if len(s) > 1:343raise ValueError('list contains more than one item')344return s[0]345if isinstance(s, dict):346raise TypeError('only strings and lists are valid arguments')347return s348349350def listify(s: JSON) -> List[str]:351"""Convert string to list of strings."""352if isinstance(s, (tuple, list)):353return list(s)354if isinstance(s, dict):355raise TypeError('only strings and lists are valid arguments')356return [s]357358359def listify_obj(s: JSON) -> List[JSONObj]:360"""Convert object to list of objects."""361if isinstance(s, (tuple, list)):362for item in s:363if not isinstance(item, dict):364raise TypeError('only dicts and lists of dicts are valid parameters')365return list(s) # type: ignore366if not isinstance(s, dict):367raise TypeError('only dicts and lists of dicts are valid parameters')368return [s]369370371def _upper_match(m: Any) -> str:372"""Upper-case the first match group."""373return m.group(1).upper()374375376def snake_to_camel(s: Optional[str], cap_first: bool = False) -> Optional[str]:377"""Convert snake-case to camel-case."""378if s is None:379return None380out = re.sub(r'_([A-Za-z])', _upper_match, s.lower())381if cap_first and out:382return out[0].upper() + out[1:]383return out384385386def camel_to_snake(s: Optional[str]) -> Optional[str]:387"""Convert camel-case to snake-case."""388if s is None:389return None390out = re.sub(r'([A-Z]+)', r'_\1', s).lower()391if out and out[0] == '_':392return out[1:]393return out394395396def snake_to_camel_dict(397s: Optional[Mapping[str, Any]],398cap_first: bool = False,399) -> Optional[Dict[str, Any]]:400"""Convert snake-case keys to camel-case keys."""401if s is None:402return None403out = {}404for k, v in s.items():405if isinstance(v, Mapping):406out[str(snake_to_camel(k))] = snake_to_camel_dict(v, cap_first=cap_first)407else:408out[str(snake_to_camel(k))] = v409return out410411412def camel_to_snake_dict(s: Optional[Mapping[str, Any]]) -> Optional[Dict[str, Any]]:413"""Convert camel-case keys to snake-case keys."""414if s is None:415return None416out = {}417for k, v in s.items():418if isinstance(v, Mapping):419out[str(camel_to_snake(k))] = camel_to_snake_dict(v)420else:421out[str(camel_to_snake(k))] = v422return out423424425