Path: blob/main/singlestoredb/management/organization.py
469 views
#!/usr/bin/env python1"""SingleStoreDB Cloud Organization."""2import datetime3from typing import Dict4from typing import List5from typing import Optional6from typing import Union78from ..exceptions import ManagementError9from .inference_api import InferenceAPIManager10from .job import JobsManager11from .manager import Manager12from .utils import vars_to_str131415def listify(x: Union[str, List[str]]) -> List[str]:16if isinstance(x, list):17return x18return [x]192021def stringify(x: Union[str, List[str]]) -> str:22if isinstance(x, list):23return x[0]24return x252627class Secret(object):28"""29SingleStoreDB secrets definition.3031This object is not directly instantiated. It is used in results32of API calls on the :class:`Organization`. See :meth:`Organization.get_secret`.33"""3435def __init__(36self,37id: str,38name: str,39created_by: str,40created_at: Union[str, datetime.datetime],41last_updated_by: str,42last_updated_at: Union[str, datetime.datetime],43value: Optional[str] = None,44deleted_by: Optional[str] = None,45deleted_at: Optional[Union[str, datetime.datetime]] = None,46):47# UUID of the secret48self.id = id4950# Name of the secret51self.name = name5253# Value of the secret54self.value = value5556# User who created the secret57self.created_by = created_by5859# Time when the secret was created60self.created_at = created_at6162# UUID of the user who last updated the secret63self.last_updated_by = last_updated_by6465# Time when the secret was last updated66self.last_updated_at = last_updated_at6768# UUID of the user who deleted the secret69self.deleted_by = deleted_by7071# Time when the secret was deleted72self.deleted_at = deleted_at7374@classmethod75def from_dict(cls, obj: Dict[str, str]) -> 'Secret':76"""77Construct a Secret from a dictionary of values.7879Parameters80----------81obj : dict82Dictionary of values8384Returns85-------86:class:`Secret`8788"""89out = cls(90id=obj['secretID'],91name=obj['name'],92created_by=obj['createdBy'],93created_at=obj['createdAt'],94last_updated_by=obj['lastUpdatedBy'],95last_updated_at=obj['lastUpdatedAt'],96value=obj.get('value'),97deleted_by=obj.get('deletedBy'),98deleted_at=obj.get('deletedAt'),99)100101return out102103def __str__(self) -> str:104"""Return string representation."""105return vars_to_str(self)106107def __repr__(self) -> str:108"""Return string representation."""109return str(self)110111112class Organization(object):113"""114Organization in SingleStoreDB Cloud portal.115116This object is not directly instantiated. It is used in results117of ``WorkspaceManager`` API calls.118119See Also120--------121:attr:`WorkspaceManager.organization`122123"""124125id: str126name: str127firewall_ranges: List[str]128129def __init__(self, id: str, name: str, firewall_ranges: List[str]):130"""Use :attr:`WorkspaceManager.organization` instead."""131#: Unique ID of the organization132self.id = id133134#: Name of the organization135self.name = name136137#: Firewall ranges of the organization138self.firewall_ranges = list(firewall_ranges)139140self._manager: Optional[Manager] = None141142def __str__(self) -> str:143"""Return string representation."""144return vars_to_str(self)145146def __repr__(self) -> str:147"""Return string representation."""148return str(self)149150def get_secret(self, name: str) -> Secret:151if self._manager is None:152raise ManagementError(msg='Organization not initialized')153154res = self._manager._get('secrets', params=dict(name=name))155156secrets = [Secret.from_dict(item) for item in res.json()['secrets']]157158if len(secrets) == 0:159raise ManagementError(msg=f'Secret {name} not found')160161if len(secrets) > 1:162raise ManagementError(msg=f'Multiple secrets found for {name}')163164return secrets[0]165166@classmethod167def from_dict(168cls,169obj: Dict[str, Union[str, List[str]]],170manager: Manager,171) -> 'Organization':172"""173Convert dictionary to an ``Organization`` object.174175Parameters176----------177obj : dict178Key-value pairs to retrieve organization information from179manager : WorkspaceManager, optional180The WorkspaceManager the Organization belongs to181182Returns183-------184:class:`Organization`185186"""187out = cls(188id=stringify(obj['orgID']),189name=stringify(obj.get('name', '<unknown>')),190firewall_ranges=listify(obj.get('firewallRanges', [])),191)192out._manager = manager193return out194195@property196def jobs(self) -> JobsManager:197"""198Retrieve a SingleStoreDB scheduled job manager.199200Parameters201----------202manager : WorkspaceManager, optional203The WorkspaceManager the JobsManager belongs to204205Returns206-------207:class:`JobsManager`208"""209return JobsManager(self._manager)210211@property212def inference_apis(self) -> InferenceAPIManager:213"""214Retrieve a SingleStoreDB inference api manager.215216Parameters217----------218manager : WorkspaceManager, optional219The WorkspaceManager the InferenceAPIManager belongs to220221Returns222-------223:class:`InferenceAPIManager`224"""225return InferenceAPIManager(self._manager)226227228