Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/notebook/_portal.py
801 views
1
#!/usr/bin/env python
2
import json
3
import os
4
import re
5
import time
6
import urllib.parse
7
from typing import Any
8
from typing import Callable
9
from typing import Dict
10
from typing import List
11
from typing import Optional
12
from typing import Tuple
13
from typing import Union
14
15
from . import _objects as obj
16
from ..management import workspace as mgr
17
from ..utils import events
18
19
try:
20
from IPython import display
21
has_ipython = True
22
except ImportError:
23
has_ipython = False
24
25
26
class Portal(object):
27
"""SingleStore Portal information."""
28
29
def __init__(self) -> None:
30
self._connection_info: Dict[str, Any] = {}
31
self._authentication_info: Dict[str, Any] = {}
32
self._theme_info: Dict[str, Any] = {}
33
events.subscribe(self._request)
34
35
def __str__(self) -> str:
36
attrs = []
37
for name in [
38
'organization_id', 'workspace_group_id', 'workspace_id',
39
'host', 'port', 'user', 'password', 'default_database',
40
]:
41
if name == 'password':
42
if self.password is not None:
43
attrs.append("password='***'")
44
else:
45
attrs.append('password=None')
46
else:
47
attrs.append(f'{name}={getattr(self, name)!r}')
48
return f'{type(self).__name__}({", ".join(attrs)})'
49
50
def __repr__(self) -> str:
51
return str(self)
52
53
def _call_javascript(
54
self,
55
func: str,
56
args: Optional[List[Any]] = None,
57
wait_on_condition: Optional[Callable[[], bool]] = None,
58
timeout_message: str = 'timed out waiting on condition',
59
wait_interval: float = 2.0,
60
timeout: float = 60.0,
61
) -> None:
62
if not has_ipython or not func:
63
return
64
65
if not re.match(r'^[A-Z_][\w\._]*$', func, flags=re.I):
66
raise ValueError(f'function name is not valid: {func}')
67
68
args = args if args else []
69
70
code = f'''
71
if (window.singlestore && window.singlestore.portal) {{
72
window.singlestore.portal.{func}.apply(
73
window,
74
JSON.parse({repr(json.dumps(args))})
75
)
76
}}
77
'''
78
79
display.display(display.Javascript(code))
80
81
if wait_on_condition is not None:
82
elapsed = 0.0
83
while True:
84
if wait_on_condition():
85
break
86
if elapsed > timeout:
87
raise RuntimeError(timeout_message)
88
time.sleep(wait_interval)
89
elapsed += wait_interval
90
91
def _request(self, msg: Dict[str, Any]) -> None:
92
"""Handle request on the control stream."""
93
func = getattr(self, '_handle_' + msg.get('name', 'unknown').split('.')[-1])
94
if func is not None:
95
func(msg.get('data', {}))
96
97
def _handle_connection_updated(self, data: Dict[str, Any]) -> None:
98
"""Handle connection_updated event."""
99
self._connection_info = dict(data)
100
101
def _handle_authentication_updated(self, data: Dict[str, Any]) -> None:
102
"""Handle authentication_updated event."""
103
self._authentication_info = dict(data)
104
105
def _handle_theme_updated(self, data: Dict[str, Any]) -> None:
106
"""Handle theme_updated event."""
107
self._theme_info = dict(data)
108
109
def _handle_unknown(self, data: Dict[str, Any]) -> None:
110
"""Handle unknown events."""
111
pass
112
113
@property
114
def organization_id(self) -> Optional[str]:
115
"""Organization ID."""
116
try:
117
return self._connection_info['organization']
118
except KeyError:
119
return os.environ.get('SINGLESTOREDB_ORGANIZATION')
120
121
@property
122
def organization(self) -> obj.Organization:
123
"""Organization."""
124
return obj.organization
125
126
@property
127
def stage(self) -> obj.Stage:
128
"""Stage."""
129
return obj.stage
130
131
@property
132
def secrets(self) -> obj.Secrets:
133
"""Secrets."""
134
return obj.secrets
135
136
@property
137
def workspace_group_id(self) -> Optional[str]:
138
"""Workspace Group ID."""
139
try:
140
return self._connection_info['workspace_group']
141
except KeyError:
142
return os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP')
143
144
@property
145
def workspace_group(self) -> obj.WorkspaceGroup:
146
"""Workspace group."""
147
return obj.workspace_group
148
149
@workspace_group.setter
150
def workspace_group(self) -> None:
151
"""Set workspace group."""
152
raise AttributeError(
153
'workspace group can not be set explictly; ' +
154
'you can only set a workspace',
155
)
156
157
@property
158
def workspace_id(self) -> Optional[str]:
159
"""Workspace ID."""
160
try:
161
return self._connection_info['workspace']
162
except KeyError:
163
return os.environ.get('SINGLESTOREDB_WORKSPACE')
164
165
@property
166
def workspace(self) -> obj.Workspace:
167
"""Workspace."""
168
return obj.workspace
169
170
@workspace.setter
171
def workspace(self, workspace_spec: Union[str, Tuple[str, str]]) -> None:
172
"""Set workspace."""
173
if isinstance(workspace_spec, tuple):
174
# 2-element tuple: (workspace_group_id, workspace_name_or_id)
175
workspace_group_id, name_or_id = workspace_spec
176
uuid_pattern = (
177
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'
178
)
179
if re.match(uuid_pattern, name_or_id, flags=re.I):
180
w = mgr.get_workspace(name_or_id)
181
else:
182
w = mgr.get_workspace_group(workspace_group_id).workspaces[
183
name_or_id
184
]
185
else:
186
# String: workspace_name_or_id (existing behavior)
187
name_or_id = workspace_spec
188
uuid_pattern = (
189
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'
190
)
191
if re.match(uuid_pattern, name_or_id, flags=re.I):
192
w = mgr.get_workspace(name_or_id)
193
else:
194
w = mgr.get_workspace_group(
195
self.workspace_group_id,
196
).workspaces[name_or_id]
197
198
if w.state and w.state.lower() not in ['active', 'resumed']:
199
raise RuntimeError('workspace is not active')
200
201
id = w.id
202
203
self._call_javascript(
204
'changeDeployment', [id],
205
wait_on_condition=lambda: self.workspace_id == id, # type: ignore
206
timeout_message='timeout waiting for workspace update',
207
)
208
209
deployment = workspace
210
211
@property
212
def connection(self) -> Tuple[obj.Workspace, Optional[str]]:
213
"""Workspace and default database name."""
214
return self.workspace, self.default_database
215
216
@connection.setter
217
def connection(
218
self,
219
connection_spec: Union[Tuple[str, str], Tuple[str, str, str]],
220
) -> None:
221
"""Set workspace and default database name."""
222
if len(connection_spec) == 3:
223
# 3-element tuple: (workspace_group_id, workspace_name_or_id,
224
# default_database)
225
workspace_group_id, name_or_id, default_database = connection_spec
226
uuid_pattern = (
227
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'
228
)
229
if re.match(uuid_pattern, name_or_id, flags=re.I):
230
w = mgr.get_workspace(name_or_id)
231
else:
232
w = mgr.get_workspace_group(workspace_group_id).workspaces[
233
name_or_id
234
]
235
else:
236
# 2-element tuple: (workspace_name_or_id, default_database)
237
# existing behavior
238
name_or_id, default_database = connection_spec
239
uuid_pattern = (
240
r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'
241
)
242
if re.match(uuid_pattern, name_or_id, flags=re.I):
243
w = mgr.get_workspace(name_or_id)
244
else:
245
w = mgr.get_workspace_group(
246
self.workspace_group_id,
247
).workspaces[name_or_id]
248
249
if w.state and w.state.lower() not in ['active', 'resumed']:
250
raise RuntimeError('workspace is not active')
251
252
id = w.id
253
254
self._call_javascript(
255
'changeConnection', [id, default_database],
256
wait_on_condition=lambda: self.workspace_id == id and
257
self.default_database == default_database, # type: ignore
258
timeout_message='timeout waiting for workspace update',
259
)
260
261
@property
262
def cluster_id(self) -> Optional[str]:
263
"""Cluster ID."""
264
try:
265
return self._connection_info['cluster']
266
except KeyError:
267
return os.environ.get('SINGLESTOREDB_CLUSTER')
268
269
def _parse_url(self) -> Dict[str, Any]:
270
url = urllib.parse.urlparse(
271
os.environ.get('SINGLESTOREDB_URL', ''),
272
)
273
return dict(
274
host=url.hostname or None,
275
port=url.port or None,
276
user=url.username or None,
277
password=url.password or None,
278
default_database=url.path.split('/')[-1] or None,
279
)
280
281
@property
282
def connection_url(self) -> Optional[str]:
283
"""Connection URL."""
284
try:
285
return self._connection_info['connection_url']
286
except KeyError:
287
return os.environ.get('SINGLESTOREDB_URL')
288
289
@property
290
def connection_url_kai(self) -> Optional[str]:
291
"""Kai connectionURL."""
292
try:
293
return self._connection_info.get('connection_url_kai')
294
except KeyError:
295
return os.environ.get('SINGLESTOREDB_URL_KAI')
296
297
@property
298
def host(self) -> Optional[str]:
299
"""Hostname."""
300
try:
301
return self._connection_info['host']
302
except KeyError:
303
return self._parse_url()['host']
304
305
@property
306
def port(self) -> Optional[int]:
307
"""Database server port."""
308
try:
309
return self._connection_info['port']
310
except KeyError:
311
return self._parse_url()['port']
312
313
@property
314
def user(self) -> Optional[str]:
315
"""Username."""
316
try:
317
return self._authentication_info['user']
318
except KeyError:
319
return self._parse_url()['user']
320
321
@property
322
def password(self) -> Optional[str]:
323
"""Password."""
324
try:
325
return self._authentication_info['password']
326
except KeyError:
327
return self._parse_url()['password']
328
329
@property
330
def default_database(self) -> Optional[str]:
331
"""Default database."""
332
try:
333
return self._connection_info['default_database']
334
except KeyError:
335
return self._parse_url()['default_database']
336
337
@default_database.setter
338
def default_database(self, name: str) -> None:
339
"""Set default database."""
340
self._call_javascript(
341
'changeDefaultDatabase', [name],
342
wait_on_condition=lambda: self.default_database == name, # type: ignore
343
timeout_message='timeout waiting for database update',
344
)
345
346
@property
347
def version(self) -> Optional[str]:
348
"""Version."""
349
return self._connection_info.get('version')
350
351
352
portal = Portal()
353
354