Path: blob/main/singlestoredb/notebook/_portal.py
801 views
#!/usr/bin/env python1import json2import os3import re4import time5import urllib.parse6from typing import Any7from typing import Callable8from typing import Dict9from typing import List10from typing import Optional11from typing import Tuple12from typing import Union1314from . import _objects as obj15from ..management import workspace as mgr16from ..utils import events1718try:19from IPython import display20has_ipython = True21except ImportError:22has_ipython = False232425class Portal(object):26"""SingleStore Portal information."""2728def __init__(self) -> None:29self._connection_info: Dict[str, Any] = {}30self._authentication_info: Dict[str, Any] = {}31self._theme_info: Dict[str, Any] = {}32events.subscribe(self._request)3334def __str__(self) -> str:35attrs = []36for name in [37'organization_id', 'workspace_group_id', 'workspace_id',38'host', 'port', 'user', 'password', 'default_database',39]:40if name == 'password':41if self.password is not None:42attrs.append("password='***'")43else:44attrs.append('password=None')45else:46attrs.append(f'{name}={getattr(self, name)!r}')47return f'{type(self).__name__}({", ".join(attrs)})'4849def __repr__(self) -> str:50return str(self)5152def _call_javascript(53self,54func: str,55args: Optional[List[Any]] = None,56wait_on_condition: Optional[Callable[[], bool]] = None,57timeout_message: str = 'timed out waiting on condition',58wait_interval: float = 2.0,59timeout: float = 60.0,60) -> None:61if not has_ipython or not func:62return6364if not re.match(r'^[A-Z_][\w\._]*$', func, flags=re.I):65raise ValueError(f'function name is not valid: {func}')6667args = args if args else []6869code = f'''70if (window.singlestore && window.singlestore.portal) {{71window.singlestore.portal.{func}.apply(72window,73JSON.parse({repr(json.dumps(args))})74)75}}76'''7778display.display(display.Javascript(code))7980if wait_on_condition is not None:81elapsed = 0.082while True:83if wait_on_condition():84break85if elapsed > timeout:86raise RuntimeError(timeout_message)87time.sleep(wait_interval)88elapsed += wait_interval8990def _request(self, msg: Dict[str, Any]) -> None:91"""Handle request on the control stream."""92func = getattr(self, '_handle_' + msg.get('name', 'unknown').split('.')[-1])93if func is not None:94func(msg.get('data', {}))9596def _handle_connection_updated(self, data: Dict[str, Any]) -> None:97"""Handle connection_updated event."""98self._connection_info = dict(data)99100def _handle_authentication_updated(self, data: Dict[str, Any]) -> None:101"""Handle authentication_updated event."""102self._authentication_info = dict(data)103104def _handle_theme_updated(self, data: Dict[str, Any]) -> None:105"""Handle theme_updated event."""106self._theme_info = dict(data)107108def _handle_unknown(self, data: Dict[str, Any]) -> None:109"""Handle unknown events."""110pass111112@property113def organization_id(self) -> Optional[str]:114"""Organization ID."""115try:116return self._connection_info['organization']117except KeyError:118return os.environ.get('SINGLESTOREDB_ORGANIZATION')119120@property121def organization(self) -> obj.Organization:122"""Organization."""123return obj.organization124125@property126def stage(self) -> obj.Stage:127"""Stage."""128return obj.stage129130@property131def secrets(self) -> obj.Secrets:132"""Secrets."""133return obj.secrets134135@property136def workspace_group_id(self) -> Optional[str]:137"""Workspace Group ID."""138try:139return self._connection_info['workspace_group']140except KeyError:141return os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP')142143@property144def workspace_group(self) -> obj.WorkspaceGroup:145"""Workspace group."""146return obj.workspace_group147148@workspace_group.setter149def workspace_group(self) -> None:150"""Set workspace group."""151raise AttributeError(152'workspace group can not be set explictly; ' +153'you can only set a workspace',154)155156@property157def workspace_id(self) -> Optional[str]:158"""Workspace ID."""159try:160return self._connection_info['workspace']161except KeyError:162return os.environ.get('SINGLESTOREDB_WORKSPACE')163164@property165def workspace(self) -> obj.Workspace:166"""Workspace."""167return obj.workspace168169@workspace.setter170def workspace(self, workspace_spec: Union[str, Tuple[str, str]]) -> None:171"""Set workspace."""172if isinstance(workspace_spec, tuple):173# 2-element tuple: (workspace_group_id, workspace_name_or_id)174workspace_group_id, name_or_id = workspace_spec175uuid_pattern = (176r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'177)178if re.match(uuid_pattern, name_or_id, flags=re.I):179w = mgr.get_workspace(name_or_id)180else:181w = mgr.get_workspace_group(workspace_group_id).workspaces[182name_or_id183]184else:185# String: workspace_name_or_id (existing behavior)186name_or_id = workspace_spec187uuid_pattern = (188r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'189)190if re.match(uuid_pattern, name_or_id, flags=re.I):191w = mgr.get_workspace(name_or_id)192else:193w = mgr.get_workspace_group(194self.workspace_group_id,195).workspaces[name_or_id]196197if w.state and w.state.lower() not in ['active', 'resumed']:198raise RuntimeError('workspace is not active')199200id = w.id201202self._call_javascript(203'changeDeployment', [id],204wait_on_condition=lambda: self.workspace_id == id, # type: ignore205timeout_message='timeout waiting for workspace update',206)207208deployment = workspace209210@property211def connection(self) -> Tuple[obj.Workspace, Optional[str]]:212"""Workspace and default database name."""213return self.workspace, self.default_database214215@connection.setter216def connection(217self,218connection_spec: Union[Tuple[str, str], Tuple[str, str, str]],219) -> None:220"""Set workspace and default database name."""221if len(connection_spec) == 3:222# 3-element tuple: (workspace_group_id, workspace_name_or_id,223# default_database)224workspace_group_id, name_or_id, default_database = connection_spec225uuid_pattern = (226r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'227)228if re.match(uuid_pattern, name_or_id, flags=re.I):229w = mgr.get_workspace(name_or_id)230else:231w = mgr.get_workspace_group(workspace_group_id).workspaces[232name_or_id233]234else:235# 2-element tuple: (workspace_name_or_id, default_database)236# existing behavior237name_or_id, default_database = connection_spec238uuid_pattern = (239r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'240)241if re.match(uuid_pattern, name_or_id, flags=re.I):242w = mgr.get_workspace(name_or_id)243else:244w = mgr.get_workspace_group(245self.workspace_group_id,246).workspaces[name_or_id]247248if w.state and w.state.lower() not in ['active', 'resumed']:249raise RuntimeError('workspace is not active')250251id = w.id252253self._call_javascript(254'changeConnection', [id, default_database],255wait_on_condition=lambda: self.workspace_id == id and256self.default_database == default_database, # type: ignore257timeout_message='timeout waiting for workspace update',258)259260@property261def cluster_id(self) -> Optional[str]:262"""Cluster ID."""263try:264return self._connection_info['cluster']265except KeyError:266return os.environ.get('SINGLESTOREDB_CLUSTER')267268def _parse_url(self) -> Dict[str, Any]:269url = urllib.parse.urlparse(270os.environ.get('SINGLESTOREDB_URL', ''),271)272return dict(273host=url.hostname or None,274port=url.port or None,275user=url.username or None,276password=url.password or None,277default_database=url.path.split('/')[-1] or None,278)279280@property281def connection_url(self) -> Optional[str]:282"""Connection URL."""283try:284return self._connection_info['connection_url']285except KeyError:286return os.environ.get('SINGLESTOREDB_URL')287288@property289def connection_url_kai(self) -> Optional[str]:290"""Kai connectionURL."""291try:292return self._connection_info.get('connection_url_kai')293except KeyError:294return os.environ.get('SINGLESTOREDB_URL_KAI')295296@property297def host(self) -> Optional[str]:298"""Hostname."""299try:300return self._connection_info['host']301except KeyError:302return self._parse_url()['host']303304@property305def port(self) -> Optional[int]:306"""Database server port."""307try:308return self._connection_info['port']309except KeyError:310return self._parse_url()['port']311312@property313def user(self) -> Optional[str]:314"""Username."""315try:316return self._authentication_info['user']317except KeyError:318return self._parse_url()['user']319320@property321def password(self) -> Optional[str]:322"""Password."""323try:324return self._authentication_info['password']325except KeyError:326return self._parse_url()['password']327328@property329def default_database(self) -> Optional[str]:330"""Default database."""331try:332return self._connection_info['default_database']333except KeyError:334return self._parse_url()['default_database']335336@default_database.setter337def default_database(self, name: str) -> None:338"""Set default database."""339self._call_javascript(340'changeDefaultDatabase', [name],341wait_on_condition=lambda: self.default_database == name, # type: ignore342timeout_message='timeout waiting for database update',343)344345@property346def version(self) -> Optional[str]:347"""Version."""348return self._connection_info.get('version')349350351portal = Portal()352353354