Path: blob/main/singlestoredb/notebook/_portal.py
469 views
#!/usr/bin/env python1import json2import os3import re4import time5import urllib.parse6from typing import Any7from typing import Callable8from typing import Dict9from typing import List10from typing import Optional11from typing import Tuple1213from . import _objects as obj14from ..management import workspace as mgr15from ..utils import events1617try:18from IPython import display19has_ipython = True20except ImportError:21has_ipython = False222324class Portal(object):25"""SingleStore Portal information."""2627def __init__(self) -> None:28self._connection_info: Dict[str, Any] = {}29self._authentication_info: Dict[str, Any] = {}30self._theme_info: Dict[str, Any] = {}31events.subscribe(self._request)3233def __str__(self) -> str:34attrs = []35for name in [36'organization_id', 'workspace_group_id', 'workspace_id',37'host', 'port', 'user', 'password', 'default_database',38]:39if name == 'password':40if self.password is not None:41attrs.append("password='***'")42else:43attrs.append('password=None')44else:45attrs.append(f'{name}={getattr(self, name)!r}')46return f'{type(self).__name__}({", ".join(attrs)})'4748def __repr__(self) -> str:49return str(self)5051def _call_javascript(52self,53func: str,54args: Optional[List[Any]] = None,55wait_on_condition: Optional[Callable[[], bool]] = None,56timeout_message: str = 'timed out waiting on condition',57wait_interval: float = 0.2,58timeout: float = 5.0,59) -> None:60if not has_ipython or not func:61return6263if not re.match(r'^[A-Z_][\w\._]*$', func, flags=re.I):64raise ValueError(f'function name is not valid: {func}')6566args = args if args else []6768code = f'''69if (window.singlestore && window.singlestore.portal) {{70window.singlestore.portal.{func}.apply(71window,72JSON.parse({repr(json.dumps(args))})73)74}}75'''7677display.display(display.Javascript(code))7879if wait_on_condition is not None:80elapsed = 0.081while True:82if wait_on_condition():83break84if elapsed > timeout:85raise RuntimeError(timeout_message)86time.sleep(wait_interval)87elapsed += wait_interval8889def _request(self, msg: Dict[str, Any]) -> None:90"""Handle request on the control stream."""91func = getattr(self, '_handle_' + msg.get('name', 'unknown').split('.')[-1])92if func is not None:93func(msg.get('data', {}))9495def _handle_connection_updated(self, data: Dict[str, Any]) -> None:96"""Handle connection_updated event."""97self._connection_info = dict(data)9899def _handle_authentication_updated(self, data: Dict[str, Any]) -> None:100"""Handle authentication_updated event."""101self._authentication_info = dict(data)102103def _handle_theme_updated(self, data: Dict[str, Any]) -> None:104"""Handle theme_updated event."""105self._theme_info = dict(data)106107def _handle_unknown(self, data: Dict[str, Any]) -> None:108"""Handle unknown events."""109pass110111@property112def organization_id(self) -> Optional[str]:113"""Organization ID."""114try:115return self._connection_info['organization']116except KeyError:117return os.environ.get('SINGLESTOREDB_ORGANIZATION')118119@property120def organization(self) -> obj.Organization:121"""Organization."""122return obj.organization123124@property125def stage(self) -> obj.Stage:126"""Stage."""127return obj.stage128129@property130def secrets(self) -> obj.Secrets:131"""Secrets."""132return obj.secrets133134@property135def workspace_group_id(self) -> Optional[str]:136"""Workspace Group ID."""137try:138return self._connection_info['workspace_group']139except KeyError:140return os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP')141142@property143def workspace_group(self) -> obj.WorkspaceGroup:144"""Workspace group."""145return obj.workspace_group146147@workspace_group.setter148def workspace_group(self) -> None:149"""Set workspace group."""150raise AttributeError(151'workspace group can not be set explictly; ' +152'you can only set a workspace',153)154155@property156def workspace_id(self) -> Optional[str]:157"""Workspace ID."""158try:159return self._connection_info['workspace']160except KeyError:161return os.environ.get('SINGLESTOREDB_WORKSPACE')162163@property164def workspace(self) -> obj.Workspace:165"""Workspace."""166return obj.workspace167168@workspace.setter169def workspace(self, name_or_id: str) -> None:170"""Set workspace."""171if re.match(172r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}',173name_or_id, flags=re.I,174):175w = mgr.get_workspace(name_or_id)176else:177w = mgr.get_workspace_group(self.workspace_group_id).workspaces[name_or_id]178179if w.state and w.state.lower() not in ['active', 'resumed']:180raise RuntimeError('workspace is not active')181182id = w.id183184self._call_javascript(185'changeDeployment', [id],186wait_on_condition=lambda: self.workspace_id == id, # type: ignore187timeout_message='timeout waiting for workspace update',188)189190deployment = workspace191192@property193def connection(self) -> Tuple[obj.Workspace, Optional[str]]:194"""Workspace and default database name."""195return self.workspace, self.default_database196197@connection.setter198def connection(self, workspace_and_default_database: Tuple[str, str]) -> None:199"""Set workspace and default database name."""200name_or_id, default_database = workspace_and_default_database201if re.match(202r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}',203name_or_id, flags=re.I,204):205w = mgr.get_workspace(name_or_id)206else:207w = mgr.get_workspace_group(self.workspace_group_id).workspaces[name_or_id]208209if w.state and w.state.lower() not in ['active', 'resumed']:210raise RuntimeError('workspace is not active')211212id = w.id213214self._call_javascript(215'changeConnection', [id, default_database],216wait_on_condition=lambda: self.workspace_id == id and217self.default_database == default_database, # type: ignore218timeout_message='timeout waiting for workspace update',219)220221@property222def cluster_id(self) -> Optional[str]:223"""Cluster ID."""224try:225return self._connection_info['cluster']226except KeyError:227return os.environ.get('SINGLESTOREDB_CLUSTER')228229def _parse_url(self) -> Dict[str, Any]:230url = urllib.parse.urlparse(231os.environ.get('SINGLESTOREDB_URL', ''),232)233return dict(234host=url.hostname or None,235port=url.port or None,236user=url.username or None,237password=url.password or None,238default_database=url.path.split('/')[-1] or None,239)240241@property242def connection_url(self) -> Optional[str]:243"""Connection URL."""244try:245return self._connection_info['connection_url']246except KeyError:247return os.environ.get('SINGLESTOREDB_URL')248249@property250def connection_url_kai(self) -> Optional[str]:251"""Kai connectionURL."""252try:253return self._connection_info.get('connection_url_kai')254except KeyError:255return os.environ.get('SINGLESTOREDB_URL_KAI')256257@property258def host(self) -> Optional[str]:259"""Hostname."""260try:261return self._connection_info['host']262except KeyError:263return self._parse_url()['host']264265@property266def port(self) -> Optional[int]:267"""Database server port."""268try:269return self._connection_info['port']270except KeyError:271return self._parse_url()['port']272273@property274def user(self) -> Optional[str]:275"""Username."""276try:277return self._authentication_info['user']278except KeyError:279return self._parse_url()['user']280281@property282def password(self) -> Optional[str]:283"""Password."""284try:285return self._authentication_info['password']286except KeyError:287return self._parse_url()['password']288289@property290def default_database(self) -> Optional[str]:291"""Default database."""292try:293return self._connection_info['default_database']294except KeyError:295return self._parse_url()['default_database']296297@default_database.setter298def default_database(self, name: str) -> None:299"""Set default database."""300self._call_javascript(301'changeDefaultDatabase', [name],302wait_on_condition=lambda: self.default_database == name, # type: ignore303timeout_message='timeout waiting for database update',304)305306@property307def version(self) -> Optional[str]:308"""Version."""309return self._connection_info.get('version')310311312portal = Portal()313314315