Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/manager.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB Base Manager."""
3
import os
4
import sys
5
import time
6
from typing import Any
7
from typing import Dict
8
from typing import List
9
from typing import Optional
10
from typing import Union
11
from urllib.parse import urljoin
12
13
import jwt
14
import requests
15
16
from .. import config
17
from ..exceptions import ManagementError
18
from .utils import get_token
19
20
21
def set_organization(kwargs: Dict[str, Any]) -> None:
22
"""Set the organization ID in the dictionary."""
23
if kwargs.get('params', {}).get('organizationID', None):
24
return
25
26
org = os.environ.get('SINGLESTOREDB_ORGANIZATION')
27
if org:
28
if 'params' not in kwargs:
29
kwargs['params'] = {}
30
kwargs['params']['organizationID'] = org
31
32
33
def is_jwt(token: str) -> bool:
34
"""Is the given token a JWT?"""
35
try:
36
jwt.decode(token, options={'verify_signature': False})
37
return True
38
except jwt.DecodeError:
39
return False
40
41
42
class Manager(object):
43
"""SingleStoreDB manager base class."""
44
45
#: Management API version if none is specified.
46
default_version = config.get_option('management.version') or 'v1'
47
48
#: Base URL if none is specified.
49
default_base_url = config.get_option('management.base_url') \
50
or 'https://api.singlestore.com'
51
52
#: Object type
53
obj_type = ''
54
55
def __init__(
56
self, access_token: Optional[str] = None, version: Optional[str] = None,
57
base_url: Optional[str] = None, *, organization_id: Optional[str] = None,
58
):
59
from .. import __version__ as client_version
60
new_access_token = (
61
access_token or get_token()
62
)
63
if not new_access_token:
64
raise ManagementError(msg='No management token was configured.')
65
66
self._is_jwt = not access_token and new_access_token and is_jwt(new_access_token)
67
self._sess = requests.Session()
68
self._sess.headers.update({
69
'Authorization': f'Bearer {new_access_token}',
70
'Content-Type': 'application/json',
71
'Accept': 'application/json',
72
'User-Agent': f'SingleStoreDB-Python/{client_version}',
73
})
74
75
self._base_url = urljoin(
76
base_url
77
or config.get_option('management.base_url')
78
or type(self).default_base_url,
79
version or type(self).default_version,
80
) + '/'
81
82
self._params: Dict[str, str] = {}
83
if organization_id:
84
self._params['organizationID'] = organization_id
85
86
def _check(
87
self, res: requests.Response, url: str, params: Dict[str, Any],
88
) -> requests.Response:
89
"""
90
Check the HTTP response status code and raise an exception as needed.
91
92
Parameters
93
----------
94
res : requests.Response
95
HTTP response to check
96
97
Returns
98
-------
99
requests.Response
100
101
"""
102
if config.get_option('debug.queries'):
103
print(os.path.join(self._base_url, url), params, file=sys.stderr)
104
if res.status_code >= 400:
105
txt = res.text.strip()
106
msg = f'{txt}: /{url}'
107
if params:
108
new_params = params.copy()
109
if 'json' in new_params:
110
for k, v in new_params['json'].items():
111
if 'password' in k.lower() and v:
112
new_params['json'][k] = '*' * len(v)
113
msg += ': {}'.format(str(new_params))
114
raise ManagementError(errno=res.status_code, msg=msg, response=txt)
115
return res
116
117
def _doit(
118
self,
119
method: str,
120
path: str,
121
*args: Any,
122
**kwargs: Any,
123
) -> requests.Response:
124
"""Perform HTTP request."""
125
# Refresh the JWT as needed
126
if self._is_jwt:
127
self._sess.headers.update({'Authorization': f'Bearer {get_token()}'})
128
return getattr(self._sess, method.lower())(
129
urljoin(self._base_url, path), *args, **kwargs,
130
)
131
132
def _get(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
133
"""
134
Invoke a GET request.
135
136
Parameters
137
----------
138
path : str
139
Path of the resource
140
*args : positional arguments, optional
141
Arguments to add to the GET request
142
**kwargs : keyword arguments, optional
143
Keyword arguments to add to the GET request
144
145
Returns
146
-------
147
requests.Response
148
149
"""
150
if self._params:
151
params = dict(self._params)
152
params.update(kwargs.get('params', {}))
153
kwargs['params'] = params
154
set_organization(kwargs)
155
return self._check(self._doit('get', path, *args, **kwargs), path, kwargs)
156
157
def _post(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
158
"""
159
Invoke a POST request.
160
161
Parameters
162
----------
163
path : str
164
Path of the resource
165
*args : positional arguments, optional
166
Arguments to add to the POST request
167
**kwargs : keyword arguments, optional
168
Keyword arguments to add to the POST request
169
170
Returns
171
-------
172
requests.Response
173
174
"""
175
if self._params:
176
params = dict(self._params)
177
params.update(kwargs.get('params', {}))
178
kwargs['params'] = params
179
set_organization(kwargs)
180
return self._check(self._doit('post', path, *args, **kwargs), path, kwargs)
181
182
def _put(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
183
"""
184
Invoke a PUT request.
185
186
Parameters
187
----------
188
path : str
189
Path of the resource
190
*args : positional arguments, optional
191
Arguments to add to the POST request
192
**kwargs : keyword arguments, optional
193
Keyword arguments to add to the POST request
194
195
Returns
196
-------
197
requests.Response
198
199
"""
200
if self._params:
201
params = dict(self._params)
202
params.update(kwargs.get('params', {}))
203
kwargs['params'] = params
204
set_organization(kwargs)
205
return self._check(self._doit('put', path, *args, **kwargs), path, kwargs)
206
207
def _delete(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
208
"""
209
Invoke a DELETE request.
210
211
Parameters
212
----------
213
path : str
214
Path of the resource
215
*args : positional arguments, optional
216
Arguments to add to the DELETE request
217
**kwargs : keyword arguments, optional
218
Keyword arguments to add to the DELETE request
219
220
Returns
221
-------
222
requests.Response
223
224
"""
225
if self._params:
226
params = dict(self._params)
227
params.update(kwargs.get('params', {}))
228
kwargs['params'] = params
229
set_organization(kwargs)
230
return self._check(self._doit('delete', path, *args, **kwargs), path, kwargs)
231
232
def _patch(self, path: str, *args: Any, **kwargs: Any) -> requests.Response:
233
"""
234
Invoke a PATCH request.
235
236
Parameters
237
----------
238
path : str
239
Path of the resource
240
*args : positional arguments, optional
241
Arguments to add to the PATCH request
242
**kwargs : keyword arguments, optional
243
Keyword arguments to add to the PATCH request
244
245
Returns
246
-------
247
requests.Response
248
249
"""
250
if self._params:
251
params = dict(self._params)
252
params.update(kwargs.get('params', {}))
253
kwargs['params'] = params
254
set_organization(kwargs)
255
return self._check(self._doit('patch', path, *args, **kwargs), path, kwargs)
256
257
def _wait_on_state(
258
self,
259
out: Any,
260
state: Union[str, List[str]],
261
interval: int = 20,
262
timeout: int = 600,
263
) -> Any:
264
"""
265
Wait on server state before continuing.
266
267
Parameters
268
----------
269
out : Any
270
Current object
271
state : str or List[str]
272
State(s) to wait for
273
interval : int, optional
274
Interval between each server poll
275
timeout : int, optional
276
Maximum time to wait before raising an exception
277
278
Raises
279
------
280
ManagementError
281
If timeout is reached
282
283
Returns
284
-------
285
Same object type as `out`
286
287
"""
288
states = [
289
x.lower().strip()
290
for x in (isinstance(state, str) and [state] or state)
291
]
292
293
if getattr(out, 'state', None) is None:
294
raise ManagementError(
295
msg='{} object does not have a `state` attribute'.format(
296
type(out).__name__,
297
),
298
)
299
300
while True:
301
if getattr(out, 'state').lower() in states:
302
break
303
if timeout <= 0:
304
raise ManagementError(
305
msg=f'Exceeded waiting time for {self.obj_type} to become '
306
'{}.'.format(', '.join(states)),
307
)
308
time.sleep(interval)
309
timeout -= interval
310
out = getattr(self, f'get_{self.obj_type}')(out.id)
311
312
return out
313
314