Path: blob/main/singlestoredb/utils/config.py
469 views
#!/usr/bin/env python1#2# Copyright SAS Institute3#4# Licensed under the Apache License, Version 2.0 (the License);5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15#16# This file was originally copied from https://github.com/sassoftware/python-swat.17#18"""19Generalized interface for configuring, setting, and getting options.2021Options can be set and retrieved using set_option(...), get_option(...), and22reset_option(...). The describe_option(...) function can be used to display23a description of one or more options.2425"""26import contextlib27import os28import re29from typing import Any30from typing import Callable31from typing import Dict32from typing import Iterator33from typing import List34from typing import Mapping35from typing import Optional36from typing import Tuple37from typing import Union38from urllib.parse import urlparse3940from .xdict import xdict414243# Container for options44_config = xdict()4546items_types = (list, tuple, set)474849def _getenv(names: Union[str, List[str]], *args: Any) -> str:50"""51Check for multiple environment variable values.5253Two forms of the environment variable name will be checked,54both with and without underscores. This allows for aliases55such as CAS_HOST and CASHOST.5657Parameters58----------59names : str or list of str60Names of environment variables to look for61*args : any, optional62The default return value if no matching environment63variables exist6465Returns66-------67string or default value6869"""70if not isinstance(names, items_types):71names = [names]72for name in names:73if name in os.environ:74return os.environ[name]75name = name.replace('_', '')76if name in os.environ:77return os.environ[name]78if args:79return args[0]80raise KeyError(names[0])818283def _setenv(names: Union[str, List[str]], value: Any) -> None:84"""85Set environment variable.8687The environment is first checked for an existing variable88that is set. If it finds one, it uses that name.89If no variable is found, the first one in the `names`90list is used.9192Just as with _getenv, the variable name is checked both93with and without underscores to allow aliases.9495Parameters96----------97names : str or list of str98Names of environment variable to look for99value : Any100The value to set101102"""103if not isinstance(names, items_types):104names = [names]105for name in names:106if name in os.environ:107os.environ[name] = value108name = name.replace('_', '')109if name in os.environ:110os.environ[name] = value111112113def _delenv(names: Union[str, List[str]]) -> None:114"""Delete given environment variables."""115if not isinstance(names, items_types):116names = [names]117for name in names:118os.environ.pop(name, None)119os.environ.pop(name.replace('_', ''), None)120121122def iteroptions(*args: Any, **kwargs: Any) -> Iterator[Tuple[str, Any]]:123"""124Iterate through name / value pairs of options125126Options can come in several forms. They can be consecutive arguments127where the first argument is the name and the following argument is128the value. They can be two-element tuples (or lists) where the first129element is the name and the second element is the value. You can130also pass in a dictionary of key / value pairs. And finally, you can131use keyword arguments.132133Parameters134----------135*args : any, optional136See description above.137**kwargs : key / value pairs, optional138Arbitrary keyword arguments.139140Returns141-------142generator143Each iteration returns a name / value pair in a tuple144145"""146items = list(args)147while items:148item = items.pop(0)149if isinstance(item, (list, tuple)):150yield item[0], item[1]151elif isinstance(item, dict):152for key, value in item.items():153yield key, value154else:155yield item, items.pop(0)156for key, value in kwargs.items():157yield key, value158159160@contextlib.contextmanager161def option_context(*args: Any, **kwargs: Any) -> Iterator[None]:162"""163Create a context for setting option temporarily.164165Parameters166----------167*args : str / any pairs168Name / value pairs in consecutive arguments (not tuples)169**kwargs : dict170Key / value pairs of options171172"""173# Save old state and set new option values174oldstate = {}175for key, value in iteroptions(*args, **kwargs):176key = key.lower()177oldstate[key] = get_option(key)178set_option(key, value)179180# Yield control181yield182183# Set old state back184for key, value in oldstate.items():185set_option(key, value)186187188def _get_option_leaf_node(key: str) -> str:189"""190Find full option name of given key.191192Parameters193----------194key : str195Either a partial key or full key name of an option196197Returns198-------199str200The full key name of the option201202Raises203------204KeyError205If more than one option matches206207"""208flatkeys = list(_config.flatkeys())209key = key.lower()210if key in flatkeys:211return key212keys = [k for k in flatkeys if k.endswith('.' + key)]213if len(keys) > 1:214raise KeyError('There is more than one option with the name %s.' % key)215if not keys:216if '.' in key:217raise KeyError('%s is not a valid option name.' % key)218else:219raise TypeError('%s is not a valid option name.' % key)220return keys[0]221222223def set_option(*args: Any, **kwargs: Any) -> None:224"""225Set the value of an option.226227Parameters228----------229*args : str or Any230The name and value of an option in consecutive arguments (not tuples)231**kwargs : dict232Arbitrary keyword / value pairs233234"""235for key, value in iteroptions(*args, **kwargs):236key = _get_option_leaf_node(key)237opt = _config[key]238if not isinstance(opt, Option):239raise TypeError('%s is not a valid option name' % key)240opt.set(value)241242243set_options = set_option244245246def get_option(key: str) -> Any:247"""248Get the value of an option.249250Parameters251----------252key : str253The name of the option254255Returns256-------257Any258The value of the option259260"""261key = _get_option_leaf_node(key)262opt = _config[key]263if not isinstance(opt, Option):264raise TypeError('%s is not a valid option name' % key)265return opt.get()266267268def get_suboptions(key: str) -> Dict[str, Any]:269"""270Get the dictionary of options at the level `key`.271272Parameters273----------274key : str275The name of the option collection276277Returns278-------279dict280The dictionary of options at level `key`281282"""283if key not in _config:284raise KeyError('%s is not a valid option name' % key)285opt = _config[key]286if isinstance(opt, Option):287raise TypeError('%s does not have sub-options' % key)288return opt289290291def get_default(key: str) -> Any:292"""293Get the default value of an option.294295Parameters296----------297key : str298The name of the option299300Returns301-------302Any303The default value of the option304305"""306key = _get_option_leaf_node(key)307opt = _config[key]308if not isinstance(opt, Option):309raise TypeError('%s is not a valid option name' % key)310return opt.get_default()311312313get_default_val = get_default314315316def describe_option(*keys: str, **kwargs: Any) -> Optional[str]:317"""318Print the description of one or more options.319320To print the descriptions of all options, execute this function321with no parameters.322323Parameters324----------325*keys : one or more strings326Names of the options327328"""329_print_desc = kwargs.get('_print_desc', True)330331out = []332333if not keys:334keys = tuple(sorted(_config.flatkeys()))335else:336newkeys = []337for k in keys:338try:339newkeys.append(_get_option_leaf_node(k))340except (KeyError, TypeError):341newkeys.append(k)342343for key in keys:344345if key not in _config:346raise KeyError('%s is not a valid option name' % key)347348opt = _config[key]349if isinstance(opt, xdict):350desc = describe_option(351*[352'%s.%s' % (key, x)353for x in opt.flatkeys()354], _print_desc=_print_desc,355)356if desc is not None:357out.append(desc)358continue359360if _print_desc:361print(opt.__doc__)362print('')363else:364out.append(opt.__doc__)365366if not _print_desc:367return '\n'.join(out)368369return None370371372def reset_option(*keys: str) -> None:373"""374Reset one or more options back to their default value.375376Parameters377----------378*keys : one or more strings379Names of options to reset380381"""382if not keys:383keys = tuple(sorted(_config.flatkeys()))384else:385keys = tuple([_get_option_leaf_node(k) for k in keys])386387for key in keys:388389if key not in _config:390raise KeyError('%s is not a valid option name' % key)391392opt = _config[key]393if not isinstance(opt, Option):394raise TypeError('%s is not a valid option name' % key)395396# Reset options397set_option(key, get_default(key))398399400def check_int(401value: Union[int, float, str],402minimum: Optional[int] = None,403maximum: Optional[int] = None,404exclusive_minimum: bool = False,405exclusive_maximum: bool = False,406multiple_of: Optional[int] = None,407) -> int:408"""409Validate an integer value.410411Parameters412----------413value : int or float414Value to validate415minimum : int, optional416The minimum value allowed417maximum : int, optional418The maximum value allowed419exclusive_minimum : bool, optional420Should the minimum value be excluded as an endpoint?421exclusive_maximum : bool, optional422Should the maximum value be excluded as an endpoint?423multiple_of : int, optional424If specified, the value must be a multple of it in order for425the value to be considered valid.426427Returns428-------429int430The validated integer value431432"""433out = int(value)434435if minimum is not None:436if out < minimum:437raise ValueError(438'%s is smaller than the minimum value of %s' %439(out, minimum),440)441if exclusive_minimum and out == minimum:442raise ValueError(443'%s is equal to the exclusive nimum value of %s' %444(out, minimum),445)446447if maximum is not None:448if out > maximum:449raise ValueError(450'%s is larger than the maximum value of %s' %451(out, maximum),452)453if exclusive_maximum and out == maximum:454raise ValueError(455'%s is equal to the exclusive maximum value of %s' %456(out, maximum),457)458459if multiple_of is not None and (out % int(multiple_of)) != 0:460raise ValueError('%s is not a multiple of %s' % (out, multiple_of))461462return out463464465def check_float(466value: Union[float, int, str],467minimum: Optional[Union[float, int]] = None,468maximum: Optional[Union[float, int]] = None,469exclusive_minimum: bool = False,470exclusive_maximum: bool = False,471multiple_of: Optional[Union[float, int]] = None,472) -> float:473"""474Validate a floating point value.475476Parameters477----------478value : int or float479Value to validate480minimum : int or float, optional481The minimum value allowed482maximum : int or float, optional483The maximum value allowed484exclusive_minimum : bool, optional485Should the minimum value be excluded as an endpoint?486exclusive_maximum : bool, optional487Should the maximum value be excluded as an endpoint?488multiple_of : int or float, optional489If specified, the value must be a multple of it in order for490the value to be considered valid.491492Returns493-------494float495The validated floating point value496497"""498out = float(value)499500if minimum is not None:501if out < minimum:502raise ValueError(503'%s is smaller than the minimum value of %s' %504(out, minimum),505)506if exclusive_minimum and out == minimum:507raise ValueError(508'%s is equal to the exclusive nimum value of %s' %509(out, minimum),510)511512if maximum is not None:513if out > maximum:514raise ValueError(515'%s is larger than the maximum value of %s' %516(out, maximum),517)518if exclusive_maximum and out == maximum:519raise ValueError(520'%s is equal to the exclusive maximum value of %s' %521(out, maximum),522)523524if multiple_of is not None and (out % int(multiple_of)) != 0:525raise ValueError('%s is not a multiple of %s' % (out, multiple_of))526527return out528529530def check_bool(value: Union[bool, int]) -> bool:531"""532Validate a bool value.533534Parameters535----------536value : int or bool537The value to validate. If specified as an integer, it must538be either 0 for False or 1 for True.539540Returns541-------542bool543The validated bool544545"""546if value is False or value is True:547return value548549if isinstance(value, int):550if value == 1:551return True552if value == 0:553return False554555if isinstance(value, (str, bytes)):556value = str(value)557if value.lower() in ['y', 'yes', 'on', 't', 'true', 'enable', 'enabled', '1']:558return True559if value.lower() in ['n', 'no', 'off', 'f', 'false', 'disable', 'disabled', '0']:560return False561562raise ValueError('%s is not a recognized bool value')563564565def check_optional_bool(value: Optional[Union[bool, int]]) -> Optional[bool]:566"""567Validate an optional bool value.568569Parameters570----------571value : int or bool or None572The value to validate. If specified as an integer, it must573be either 0 for False or 1 for True.574575Returns576-------577bool578The validated bool579580"""581if value is None:582return None583584return check_bool(value)585586587def check_str(588value: Any,589pattern: Optional[str] = None,590max_length: Optional[int] = None,591min_length: Optional[int] = None,592valid_values: Optional[List[str]] = None,593) -> Optional[str]:594"""595Validate a string value.596597Parameters598----------599value : string600The value to validate601pattern : regular expression string, optional602A regular expression used to validate string values603max_length : int, optional604The maximum length of the string605min_length : int, optional606The minimum length of the string607valid_values : list of strings, optional608List of the only possible values609610Returns611-------612string613The validated string value614615"""616if value is None:617return None618619if isinstance(value, str):620out = value621else:622out = str(value, 'utf-8')623624if max_length is not None and len(out) > max_length:625raise ValueError(626'%s is longer than the maximum length of %s' %627(out, max_length),628)629630if min_length is not None and len(out) < min_length:631raise ValueError(632'%s is shorter than the minimum length of %s' %633(out, min_length),634)635636if pattern is not None and not re.search(pattern, out):637raise ValueError('%s does not match pattern %s' % (out, pattern))638639if valid_values is not None and out not in valid_values:640raise ValueError(641'%s is not one of the possible values: %s' %642(out, ', '.join(valid_values)),643)644645return out646647648def check_dict_str_str(649value: Any,650) -> Optional[Dict[str, str]]:651"""652Validate a string value.653654Parameters655----------656value : dict657The value to validate. Keys and values must be strings.658659Returns660-------661dict662The validated dict value663"""664if value is None:665return None666667if not isinstance(value, Mapping):668raise ValueError(669'value {} must be of type dict'.format(value),670)671672out = {}673for k, v in value.items():674if not isinstance(k, str) or not isinstance(v, str):675raise ValueError(676'keys and values in {} must be strings'.format(value),677)678out[k] = v679680return out681682683def check_url(684value: str,685pattern: Optional[str] = None,686max_length: Optional[int] = None,687min_length: Optional[int] = None,688valid_values: Optional[List[str]] = None,689) -> Optional[str]:690"""691Validate a URL value.692693Parameters694----------695value : any696The value to validate. This value will be cast to a string697and converted to unicode.698pattern : regular expression string, optional699A regular expression used to validate string values700max_length : int, optional701The maximum length of the string702min_length : int, optional703The minimum length of the string704valid_values : list of strings, optional705List of the only possible values706707Returns708-------709string710The validated URL value711712"""713if value is None:714return None715716out = check_str(717value, pattern=pattern, max_length=max_length,718min_length=min_length, valid_values=valid_values,719)720try:721urlparse(out)722except Exception:723raise TypeError('%s is not a valid URL' % value)724return out725726727class Option(object):728"""729Configuration option.730731Parameters732----------733name : str734The name of the option735typedesc : str736Description of the option data type (e.g., int, float, string)737validator : callable738A callable object that validates the option value and returns739the validated value.740default : any741The default value of the option742doc : str743The documentation string for the option744environ : str or list of strs, optional745If specified, the value should be specified in an environment746variable of that name.747748"""749750def __init__(751self,752name: str,753typedesc: str,754validator: Callable[[str], Any],755default: Any,756doc: str,757environ: Optional[Union[str, List[str]]] = None,758):759self._name = name760self._typedesc = typedesc761self._validator = validator762if environ is not None:763self._default = validator(_getenv(environ, default))764else:765self._default = validator(default)766self._environ = environ767self._value = self._default768self._doc = doc769770@property771def __doc__(self) -> str: # type: ignore772"""Generate documentation string."""773separator = ' '774if isinstance(self._value, (str, bytes)) and len(self._value) > 40:775separator = '\n '776return '''%s : %s\n %s\n [default: %s]%s[currently: %s]\n''' % \777(778self._name, self._typedesc, self._doc.rstrip().replace('\n', '\n '),779self._default, separator, str(self._value),780)781782def set(self, value: Any) -> None:783"""784Set the value of the option.785786Parameters787----------788value : any789The value to set790791"""792value = self._validator(value)793_config[self._name]._value = value794795if self._environ is not None:796if value is None:797_delenv(self._environ)798else:799_setenv(self._environ, str(value))800801def get(self) -> Any:802"""803Get the value of the option.804805Returns806-------807Any808The value of the option809810"""811if self._environ is not None:812try:813_config[self._name]._value = self._validator(_getenv(self._environ))814except KeyError:815pass816return _config[self._name]._value817818def get_default(self) -> Any:819"""820Get the default value of the option.821822Returns823-------824Any825The default value of the option826827"""828return _config[self._name]._default829830831def register_option(832key: str,833typedesc: str,834validator: Callable[[Any], Any],835default: Any,836doc: str,837environ: Optional[Union[str, List[str]]] = None,838) -> None:839"""840Register a new option.841842Parameters843----------844key : str845The name of the option846typedesc : str847Description of option data type (e.g., int, float, string)848validator : callable849A callable object that validates the value and returns850a validated value.851default : any852The default value of the option853doc : str854The documentation string for the option855environ : str or list of strs, optional856If specified, the value should be specified in an environment857variable of that name.858859"""860import warnings861with warnings.catch_warnings():862warnings.simplefilter('ignore')863_config[key] = Option(key, typedesc, validator, default, doc, environ=environ)864865866class AttrOption(object):867"""868Attribute-style access of options.869870Parameters871----------872name : str873Name of the option874875"""876877def __init__(self, name: str):878object.__setattr__(self, '_name', name)879880def __dir__(self) -> List[str]:881"""Return list of flattened keys."""882if self._name in _config:883return _config[self._name].flatkeys()884return _config.flatkeys()885886@property887def __doc__(self) -> Optional[str]: # type: ignore888if self._name:889return describe_option(self._name, _print_desc=False)890return describe_option(_print_desc=False)891892def __getattr__(self, name: str) -> Any:893"""894Retieve option as an attribute.895896Parameters897----------898name : str899Name of the option900901Returns902-------903Any904905"""906name = name.lower()907if self._name:908fullname = self._name + '.' + name909else:910fullname = name911if fullname not in _config:912fullname = _get_option_leaf_node(fullname)913out = _config[fullname]914if not isinstance(out, Option):915return type(self)(fullname)916return out.get()917918def __setattr__(self, name: str, value: Any) -> Any:919"""920Set an attribute value.921922Parameters923----------924name : str925Name of the option926value : Any927Value of the option928929"""930name = name.lower()931if self._name:932fullname = self._name + '.' + name933else:934fullname = name935if fullname not in _config:936fullname = _get_option_leaf_node(fullname)937out = _config[fullname]938if not isinstance(out, Option):939return type(self)(fullname)940_config[fullname].set(value)941return942943def __call__(self, *args: Any, **kwargs: Any) -> Iterator[None]:944"""Shortcut for option context."""945return option_context(*args, **kwargs) # type: ignore946947948# Object for setting and getting options using attribute syntax949options = AttrOption('')950951952