Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/notebook/_portal.py
469 views
1
#!/usr/bin/env python
2
import json
3
import os
4
import re
5
import time
6
import urllib.parse
7
from typing import Any
8
from typing import Callable
9
from typing import Dict
10
from typing import List
11
from typing import Optional
12
from typing import Tuple
13
14
from . import _objects as obj
15
from ..management import workspace as mgr
16
from ..utils import events
17
18
try:
19
from IPython import display
20
has_ipython = True
21
except ImportError:
22
has_ipython = False
23
24
25
class Portal(object):
26
"""SingleStore Portal information."""
27
28
def __init__(self) -> None:
29
self._connection_info: Dict[str, Any] = {}
30
self._authentication_info: Dict[str, Any] = {}
31
self._theme_info: Dict[str, Any] = {}
32
events.subscribe(self._request)
33
34
def __str__(self) -> str:
35
attrs = []
36
for name in [
37
'organization_id', 'workspace_group_id', 'workspace_id',
38
'host', 'port', 'user', 'password', 'default_database',
39
]:
40
if name == 'password':
41
if self.password is not None:
42
attrs.append("password='***'")
43
else:
44
attrs.append('password=None')
45
else:
46
attrs.append(f'{name}={getattr(self, name)!r}')
47
return f'{type(self).__name__}({", ".join(attrs)})'
48
49
def __repr__(self) -> str:
50
return str(self)
51
52
def _call_javascript(
53
self,
54
func: str,
55
args: Optional[List[Any]] = None,
56
wait_on_condition: Optional[Callable[[], bool]] = None,
57
timeout_message: str = 'timed out waiting on condition',
58
wait_interval: float = 0.2,
59
timeout: float = 5.0,
60
) -> None:
61
if not has_ipython or not func:
62
return
63
64
if not re.match(r'^[A-Z_][\w\._]*$', func, flags=re.I):
65
raise ValueError(f'function name is not valid: {func}')
66
67
args = args if args else []
68
69
code = f'''
70
if (window.singlestore && window.singlestore.portal) {{
71
window.singlestore.portal.{func}.apply(
72
window,
73
JSON.parse({repr(json.dumps(args))})
74
)
75
}}
76
'''
77
78
display.display(display.Javascript(code))
79
80
if wait_on_condition is not None:
81
elapsed = 0.0
82
while True:
83
if wait_on_condition():
84
break
85
if elapsed > timeout:
86
raise RuntimeError(timeout_message)
87
time.sleep(wait_interval)
88
elapsed += wait_interval
89
90
def _request(self, msg: Dict[str, Any]) -> None:
91
"""Handle request on the control stream."""
92
func = getattr(self, '_handle_' + msg.get('name', 'unknown').split('.')[-1])
93
if func is not None:
94
func(msg.get('data', {}))
95
96
def _handle_connection_updated(self, data: Dict[str, Any]) -> None:
97
"""Handle connection_updated event."""
98
self._connection_info = dict(data)
99
100
def _handle_authentication_updated(self, data: Dict[str, Any]) -> None:
101
"""Handle authentication_updated event."""
102
self._authentication_info = dict(data)
103
104
def _handle_theme_updated(self, data: Dict[str, Any]) -> None:
105
"""Handle theme_updated event."""
106
self._theme_info = dict(data)
107
108
def _handle_unknown(self, data: Dict[str, Any]) -> None:
109
"""Handle unknown events."""
110
pass
111
112
@property
113
def organization_id(self) -> Optional[str]:
114
"""Organization ID."""
115
try:
116
return self._connection_info['organization']
117
except KeyError:
118
return os.environ.get('SINGLESTOREDB_ORGANIZATION')
119
120
@property
121
def organization(self) -> obj.Organization:
122
"""Organization."""
123
return obj.organization
124
125
@property
126
def stage(self) -> obj.Stage:
127
"""Stage."""
128
return obj.stage
129
130
@property
131
def secrets(self) -> obj.Secrets:
132
"""Secrets."""
133
return obj.secrets
134
135
@property
136
def workspace_group_id(self) -> Optional[str]:
137
"""Workspace Group ID."""
138
try:
139
return self._connection_info['workspace_group']
140
except KeyError:
141
return os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP')
142
143
@property
144
def workspace_group(self) -> obj.WorkspaceGroup:
145
"""Workspace group."""
146
return obj.workspace_group
147
148
@workspace_group.setter
149
def workspace_group(self) -> None:
150
"""Set workspace group."""
151
raise AttributeError(
152
'workspace group can not be set explictly; ' +
153
'you can only set a workspace',
154
)
155
156
@property
157
def workspace_id(self) -> Optional[str]:
158
"""Workspace ID."""
159
try:
160
return self._connection_info['workspace']
161
except KeyError:
162
return os.environ.get('SINGLESTOREDB_WORKSPACE')
163
164
@property
165
def workspace(self) -> obj.Workspace:
166
"""Workspace."""
167
return obj.workspace
168
169
@workspace.setter
170
def workspace(self, name_or_id: str) -> None:
171
"""Set workspace."""
172
if re.match(
173
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}',
174
name_or_id, flags=re.I,
175
):
176
w = mgr.get_workspace(name_or_id)
177
else:
178
w = mgr.get_workspace_group(self.workspace_group_id).workspaces[name_or_id]
179
180
if w.state and w.state.lower() not in ['active', 'resumed']:
181
raise RuntimeError('workspace is not active')
182
183
id = w.id
184
185
self._call_javascript(
186
'changeDeployment', [id],
187
wait_on_condition=lambda: self.workspace_id == id, # type: ignore
188
timeout_message='timeout waiting for workspace update',
189
)
190
191
deployment = workspace
192
193
@property
194
def connection(self) -> Tuple[obj.Workspace, Optional[str]]:
195
"""Workspace and default database name."""
196
return self.workspace, self.default_database
197
198
@connection.setter
199
def connection(self, workspace_and_default_database: Tuple[str, str]) -> None:
200
"""Set workspace and default database name."""
201
name_or_id, default_database = workspace_and_default_database
202
if re.match(
203
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}',
204
name_or_id, flags=re.I,
205
):
206
w = mgr.get_workspace(name_or_id)
207
else:
208
w = mgr.get_workspace_group(self.workspace_group_id).workspaces[name_or_id]
209
210
if w.state and w.state.lower() not in ['active', 'resumed']:
211
raise RuntimeError('workspace is not active')
212
213
id = w.id
214
215
self._call_javascript(
216
'changeConnection', [id, default_database],
217
wait_on_condition=lambda: self.workspace_id == id and
218
self.default_database == default_database, # type: ignore
219
timeout_message='timeout waiting for workspace update',
220
)
221
222
@property
223
def cluster_id(self) -> Optional[str]:
224
"""Cluster ID."""
225
try:
226
return self._connection_info['cluster']
227
except KeyError:
228
return os.environ.get('SINGLESTOREDB_CLUSTER')
229
230
def _parse_url(self) -> Dict[str, Any]:
231
url = urllib.parse.urlparse(
232
os.environ.get('SINGLESTOREDB_URL', ''),
233
)
234
return dict(
235
host=url.hostname or None,
236
port=url.port or None,
237
user=url.username or None,
238
password=url.password or None,
239
default_database=url.path.split('/')[-1] or None,
240
)
241
242
@property
243
def connection_url(self) -> Optional[str]:
244
"""Connection URL."""
245
try:
246
return self._connection_info['connection_url']
247
except KeyError:
248
return os.environ.get('SINGLESTOREDB_URL')
249
250
@property
251
def connection_url_kai(self) -> Optional[str]:
252
"""Kai connectionURL."""
253
try:
254
return self._connection_info.get('connection_url_kai')
255
except KeyError:
256
return os.environ.get('SINGLESTOREDB_URL_KAI')
257
258
@property
259
def host(self) -> Optional[str]:
260
"""Hostname."""
261
try:
262
return self._connection_info['host']
263
except KeyError:
264
return self._parse_url()['host']
265
266
@property
267
def port(self) -> Optional[int]:
268
"""Database server port."""
269
try:
270
return self._connection_info['port']
271
except KeyError:
272
return self._parse_url()['port']
273
274
@property
275
def user(self) -> Optional[str]:
276
"""Username."""
277
try:
278
return self._authentication_info['user']
279
except KeyError:
280
return self._parse_url()['user']
281
282
@property
283
def password(self) -> Optional[str]:
284
"""Password."""
285
try:
286
return self._authentication_info['password']
287
except KeyError:
288
return self._parse_url()['password']
289
290
@property
291
def default_database(self) -> Optional[str]:
292
"""Default database."""
293
try:
294
return self._connection_info['default_database']
295
except KeyError:
296
return self._parse_url()['default_database']
297
298
@default_database.setter
299
def default_database(self, name: str) -> None:
300
"""Set default database."""
301
self._call_javascript(
302
'changeDefaultDatabase', [name],
303
wait_on_condition=lambda: self.default_database == name, # type: ignore
304
timeout_message='timeout waiting for database update',
305
)
306
307
@property
308
def version(self) -> Optional[str]:
309
"""Version."""
310
return self._connection_info.get('version')
311
312
313
portal = Portal()
314
315