Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/manager.py
800 views
1
#!/usr/bin/env python
2
"""SingleStoreDB Base Manager."""
3
import os
4
import sys
5
import time
6
from typing import Any
7
from typing import Dict
8
from typing import List
9
from typing import Optional
10
from typing import Union
11
from urllib.parse import urljoin
12
13
import jwt
14
import requests
15
16
from .. import config
17
from ..exceptions import ManagementError
18
from ..exceptions import OperationalError
19
from .utils import get_token
20
21
22
def set_organization(kwargs: Dict[str, Any]) -> None:
23
"""Set the organization ID in the dictionary."""
24
if kwargs.get('params', {}).get('organizationID', None):
25
return
26
27
org = os.environ.get('SINGLESTOREDB_ORGANIZATION')
28
if org:
29
if 'params' not in kwargs:
30
kwargs['params'] = {}
31
kwargs['params']['organizationID'] = org
32
33
34
def is_jwt(token: str) -> bool:
35
"""Is the given token a JWT?"""
36
try:
37
jwt.decode(token, options={'verify_signature': False})
38
return True
39
except jwt.DecodeError:
40
return False
41
42
43
class Manager(object):
44
"""SingleStoreDB manager base class."""
45
46
#: Management API version if none is specified.
47
default_version = config.get_option('management.version') or 'v1'
48
49
#: Base URL if none is specified.
50
default_base_url = config.get_option('management.base_url') \
51
or 'https://api.singlestore.com'
52
53
#: Object type
54
obj_type = ''
55
56
def __init__(
57
self, access_token: Optional[str] = None, version: Optional[str] = None,
58
base_url: Optional[str] = None, *, organization_id: Optional[str] = None,
59
):
60
from .. import __version__ as client_version
61
new_access_token = (
62
access_token or get_token()
63
)
64
if not new_access_token:
65
raise ManagementError(msg='No management token was configured.')
66
67
self._is_jwt = not access_token and new_access_token and is_jwt(new_access_token)
68
self._sess = requests.Session()
69
self._sess.headers.update({
70
'Authorization': f'Bearer {new_access_token}',
71
'Content-Type': 'application/json',
72
'Accept': 'application/json',
73
'User-Agent': f'SingleStoreDB-Python/{client_version}',
74
})
75
76
self._base_url = urljoin(
77
base_url
78
or config.get_option('management.base_url')
79
or type(self).default_base_url,
80
version or type(self).default_version,
81
) + '/'
82
83
self._params: Dict[str, str] = {}
84
if organization_id:
85
self._params['organizationID'] = organization_id
86
87
def _check(
88
self, res: requests.Response, url: str, params: Dict[str, Any],
89
) -> requests.Response:
90
"""
91
Check the HTTP response status code and raise an exception as needed.
92
93
Parameters
94
----------
95
res : requests.Response
96
HTTP response to check
97
98
Returns
99
-------
100
requests.Response
101
102
"""
103
if config.get_option('debug.queries'):
104
print(os.path.join(self._base_url, url), params, file=sys.stderr)
105
if res.status_code >= 400:
106
txt = res.text.strip()
107
msg = f'{txt}: /{url}'
108
if params:
109
new_params = params.copy()
110
if 'json' in new_params:
111
for k, v in new_params['json'].items():
112
if 'password' in k.lower() and v:
113
new_params['json'][k] = '*' * len(v)
114
msg += ': {}'.format(str(new_params))
115
raise ManagementError(errno=res.status_code, msg=msg, response=txt)
116
return res
117
118
def _doit(
119
self,
120
method: str,
121
path: str,
122
*args: Any,
123
**kwargs: Any,
124
) -> requests.Response:
125
"""Perform HTTP request."""
126
# Refresh the JWT as needed
127
if self._is_jwt:
128
self._sess.headers.update({'Authorization': f'Bearer {get_token()}'})
129
return getattr(self._sess, method.lower())(
130
urljoin(self._base_url, path), *args, **kwargs,
131
)
132
133
def _get(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
134
"""
135
Invoke a GET request.
136
137
Parameters
138
----------
139
path : str
140
Path of the resource
141
*args : positional arguments, optional
142
Arguments to add to the GET request
143
**kwargs : keyword arguments, optional
144
Keyword arguments to add to the GET request
145
146
Returns
147
-------
148
requests.Response
149
150
"""
151
if self._params:
152
params = dict(self._params)
153
params.update(kwargs.get('params', {}))
154
kwargs['params'] = params
155
set_organization(kwargs)
156
return self._check(self._doit('get', path, *args, **kwargs), path, kwargs)
157
158
def _post(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
159
"""
160
Invoke a POST request.
161
162
Parameters
163
----------
164
path : str
165
Path of the resource
166
*args : positional arguments, optional
167
Arguments to add to the POST request
168
**kwargs : keyword arguments, optional
169
Keyword arguments to add to the POST request
170
171
Returns
172
-------
173
requests.Response
174
175
"""
176
if self._params:
177
params = dict(self._params)
178
params.update(kwargs.get('params', {}))
179
kwargs['params'] = params
180
set_organization(kwargs)
181
return self._check(self._doit('post', path, *args, **kwargs), path, kwargs)
182
183
def _put(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
184
"""
185
Invoke a PUT request.
186
187
Parameters
188
----------
189
path : str
190
Path of the resource
191
*args : positional arguments, optional
192
Arguments to add to the POST request
193
**kwargs : keyword arguments, optional
194
Keyword arguments to add to the POST request
195
196
Returns
197
-------
198
requests.Response
199
200
"""
201
if self._params:
202
params = dict(self._params)
203
params.update(kwargs.get('params', {}))
204
kwargs['params'] = params
205
set_organization(kwargs)
206
return self._check(self._doit('put', path, *args, **kwargs), path, kwargs)
207
208
def _delete(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
209
"""
210
Invoke a DELETE request.
211
212
Parameters
213
----------
214
path : str
215
Path of the resource
216
*args : positional arguments, optional
217
Arguments to add to the DELETE request
218
**kwargs : keyword arguments, optional
219
Keyword arguments to add to the DELETE request
220
221
Returns
222
-------
223
requests.Response
224
225
"""
226
if self._params:
227
params = dict(self._params)
228
params.update(kwargs.get('params', {}))
229
kwargs['params'] = params
230
set_organization(kwargs)
231
return self._check(self._doit('delete', path, *args, **kwargs), path, kwargs)
232
233
def _patch(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
234
"""
235
Invoke a PATCH request.
236
237
Parameters
238
----------
239
path : str
240
Path of the resource
241
*args : positional arguments, optional
242
Arguments to add to the PATCH request
243
**kwargs : keyword arguments, optional
244
Keyword arguments to add to the PATCH request
245
246
Returns
247
-------
248
requests.Response
249
250
"""
251
if self._params:
252
params = dict(self._params)
253
params.update(kwargs.get('params', {}))
254
kwargs['params'] = params
255
set_organization(kwargs)
256
return self._check(self._doit('patch', path, *args, **kwargs), path, kwargs)
257
258
def _wait_on_state(
259
self,
260
out: Any,
261
state: Union[str, List[str]],
262
interval: int = 20,
263
timeout: int = 600,
264
) -> Any:
265
"""
266
Wait on server state before continuing.
267
268
Parameters
269
----------
270
out : Any
271
Current object
272
state : str or List[str]
273
State(s) to wait for
274
interval : int, optional
275
Interval between each server poll
276
timeout : int, optional
277
Maximum time to wait before raising an exception
278
279
Raises
280
------
281
ManagementError
282
If timeout is reached
283
284
Returns
285
-------
286
Same object type as `out`
287
288
"""
289
states = [
290
x.lower().strip()
291
for x in (isinstance(state, str) and [state] or state)
292
]
293
294
if getattr(out, 'state', None) is None:
295
raise ManagementError(
296
msg='{} object does not have a `state` attribute'.format(
297
type(out).__name__,
298
),
299
)
300
301
while True:
302
if getattr(out, 'state').lower() in states:
303
break
304
if timeout <= 0:
305
raise ManagementError(
306
msg=f'Exceeded waiting time for {self.obj_type} to become '
307
'{}.'.format(', '.join(states)),
308
)
309
time.sleep(interval)
310
timeout -= interval
311
out = getattr(self, f'get_{self.obj_type}')(out.id)
312
313
return out
314
315
def _wait_on_endpoint(
316
self,
317
out: Any,
318
interval: int = 10,
319
timeout: int = 300,
320
) -> Any:
321
"""
322
Wait for the endpoint to be ready by attempting to connect.
323
324
Parameters
325
----------
326
out : Any
327
Workspace object with a connect method
328
interval : int, optional
329
Interval between each connection attempt (default: 10 seconds)
330
timeout : int, optional
331
Maximum time to wait before raising an exception (default: 300 seconds)
332
333
Raises
334
------
335
ManagementError
336
If timeout is reached or endpoint is not available
337
338
Returns
339
-------
340
Same object type as `out`
341
342
"""
343
# Only wait if workload type is set which means we are in the
344
# notebook environment. Outside of the environment, the endpoint
345
# may not be reachable directly.
346
if not os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE', ''):
347
return out
348
349
if not hasattr(out, 'connect') or not out.connect:
350
raise ManagementError(
351
msg=f'{type(out).__name__} object does not have a valid endpoint',
352
)
353
354
while True:
355
try:
356
# Try to establish a connection to the endpoint using context manager
357
with out.connect(connect_timeout=5):
358
pass
359
except Exception as exc:
360
# If we get an 'access denied' error, that means that the server is
361
# up and we just aren't authenticating.
362
if isinstance(exc, OperationalError) and exc.errno == 1045:
363
break
364
# If connection fails, check timeout and retry
365
if timeout <= 0:
366
raise ManagementError(
367
msg=f'Exceeded waiting time for {self.obj_type} endpoint '
368
'to become ready',
369
)
370
time.sleep(interval)
371
timeout -= interval
372
373
return out
374
375