Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/utils.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB Cluster Management."""
3
import datetime
4
import functools
5
import itertools
6
import os
7
import re
8
import sys
9
from typing import Any
10
from typing import Callable
11
from typing import Dict
12
from typing import List
13
from typing import Mapping
14
from typing import Optional
15
from typing import SupportsIndex
16
from typing import Tuple
17
from typing import TypeVar
18
from typing import Union
19
from urllib.parse import urlparse
20
21
import jwt
22
23
from .. import converters
24
from ..config import get_option
25
from ..utils import events
26
27
JSON = Union[str, List[str], Dict[str, 'JSON']]
28
JSONObj = Dict[str, JSON]
29
JSONList = List[JSON]
30
T = TypeVar('T')
31
32
if sys.version_info < (3, 10):
33
PathLike = Union[str, os.PathLike] # type: ignore
34
PathLikeABC = os.PathLike
35
else:
36
PathLike = Union[str, os.PathLike[str]]
37
PathLikeABC = os.PathLike[str]
38
39
40
class TTLProperty(object):
41
"""Property with time limit."""
42
43
def __init__(self, fget: Callable[[Any], Any], ttl: datetime.timedelta):
44
self.fget = fget
45
self.ttl = ttl
46
self._last_executed = datetime.datetime(2000, 1, 1)
47
self._last_result = None
48
self.__doc__ = fget.__doc__
49
self._name = ''
50
51
def reset(self) -> None:
52
self._last_executed = datetime.datetime(2000, 1, 1)
53
self._last_result = None
54
55
def __set_name__(self, owner: Any, name: str) -> None:
56
self._name = name
57
58
def __get__(self, obj: Any, objtype: Any = None) -> Any:
59
if obj is None:
60
return self
61
62
if self._last_result is not None \
63
and (datetime.datetime.now() - self._last_executed) < self.ttl:
64
return self._last_result
65
66
self._last_result = self.fget(obj)
67
self._last_executed = datetime.datetime.now()
68
69
return self._last_result
70
71
72
def ttl_property(ttl: datetime.timedelta) -> Callable[[Any], Any]:
73
"""Property with a time-to-live."""
74
def wrapper(func: Callable[[Any], Any]) -> Any:
75
out = TTLProperty(func, ttl=ttl)
76
return functools.wraps(func)(out) # type: ignore
77
return wrapper
78
79
80
class NamedList(List[T]):
81
"""List class which also allows selection by ``name`` and ``id`` attribute."""
82
83
def _find_item(self, key: str) -> T:
84
for item in self:
85
if getattr(item, 'name', '') == key:
86
return item
87
if getattr(item, 'id', '') == key:
88
return item
89
raise KeyError(key)
90
91
def __getitem__(self, key: Union[SupportsIndex, slice, str]) -> Any:
92
if isinstance(key, str):
93
return self._find_item(key)
94
return super().__getitem__(key)
95
96
def __contains__(self, key: Any) -> bool:
97
if isinstance(key, str):
98
try:
99
self._find_item(key)
100
return True
101
except KeyError:
102
return False
103
return super().__contains__(key)
104
105
def names(self) -> List[str]:
106
"""Return ``name`` attribute of each item."""
107
return [y for y in [getattr(x, 'name', None) for x in self] if y is not None]
108
109
def ids(self) -> List[str]:
110
"""Return ``id`` attribute of each item."""
111
return [y for y in [getattr(x, 'id', None) for x in self] if y is not None]
112
113
def get(self, name_or_id: str, *default: Any) -> Any:
114
"""Return object with name / ID if it exists, or return default value."""
115
try:
116
return self._find_item(name_or_id)
117
except KeyError:
118
if default:
119
return default[0]
120
raise
121
122
123
def _setup_authentication_info_handler() -> Callable[..., Dict[str, Any]]:
124
"""Setup authentication info event handler."""
125
126
authentication_info: Dict[str, Any] = {}
127
128
def handle_authentication_info(msg: Dict[str, Any]) -> None:
129
"""Handle authentication info events."""
130
nonlocal authentication_info
131
if msg.get('name', '') != 'singlestore.portal.authentication_updated':
132
return
133
authentication_info = dict(msg.get('data', {}))
134
135
events.subscribe(handle_authentication_info)
136
137
def handle_connection_info(msg: Dict[str, Any]) -> None:
138
"""Handle connection info events."""
139
nonlocal authentication_info
140
if msg.get('name', '') != 'singlestore.portal.connection_updated':
141
return
142
data = msg.get('data', {})
143
out = {}
144
if 'user' in data:
145
out['user'] = data['user']
146
if 'password' in data:
147
out['password'] = data['password']
148
authentication_info = out
149
150
events.subscribe(handle_authentication_info)
151
152
def retrieve_current_authentication_info() -> List[Tuple[str, Any]]:
153
"""Retrieve JWT if not expired."""
154
nonlocal authentication_info
155
password = authentication_info.get('password')
156
if password:
157
expires = datetime.datetime.fromtimestamp(
158
jwt.decode(
159
password,
160
options={'verify_signature': False},
161
)['exp'],
162
)
163
if datetime.datetime.now() > expires:
164
authentication_info = {}
165
return list(authentication_info.items())
166
167
def get_env() -> List[Tuple[str, Any]]:
168
"""Retrieve JWT from environment."""
169
conn = {}
170
url = os.environ.get('SINGLESTOREDB_URL') or get_option('host')
171
if url:
172
urlp = urlparse(url, scheme='singlestoredb', allow_fragments=True)
173
conn = dict(
174
user=urlp.username or None,
175
password=urlp.password or None,
176
)
177
178
return [
179
x for x in dict(
180
**conn,
181
).items() if x[1] is not None
182
]
183
184
def get_authentication_info(include_env: bool = True) -> Dict[str, Any]:
185
"""Return authentication info from event."""
186
return dict(
187
itertools.chain(
188
(get_env() if include_env else []),
189
retrieve_current_authentication_info(),
190
),
191
)
192
193
return get_authentication_info
194
195
196
get_authentication_info = _setup_authentication_info_handler()
197
198
199
def get_token() -> Optional[str]:
200
"""Return the token for the Management API."""
201
# See if an API key is configured
202
tok = get_option('management.token')
203
if tok:
204
return tok
205
206
tok = get_authentication_info(include_env=True).get('password')
207
if tok:
208
try:
209
jwt.decode(tok, options={'verify_signature': False})
210
return tok
211
except jwt.DecodeError:
212
pass
213
214
# Didn't find a key anywhere
215
return None
216
217
218
def get_cluster_id() -> Optional[str]:
219
"""Return the cluster id for the current token or environment."""
220
return os.environ.get('SINGLESTOREDB_CLUSTER') or None
221
222
223
def get_workspace_id() -> Optional[str]:
224
"""Return the workspace id for the current token or environment."""
225
return os.environ.get('SINGLESTOREDB_WORKSPACE') or None
226
227
228
def get_virtual_workspace_id() -> Optional[str]:
229
"""Return the virtual workspace id for the current token or environment."""
230
return os.environ.get('SINGLESTOREDB_VIRTUAL_WORKSPACE') or None
231
232
233
def get_database_name() -> Optional[str]:
234
"""Return the default database name for the current token or environment."""
235
return os.environ.get('SINGLESTOREDB_DEFAULT_DATABASE') or None
236
237
238
def enable_http_tracing() -> None:
239
"""Enable tracing of HTTP requests."""
240
import logging
241
import http.client as http_client
242
http_client.HTTPConnection.debuglevel = 1
243
logging.basicConfig()
244
logging.getLogger().setLevel(logging.DEBUG)
245
requests_log = logging.getLogger('requests.packages.urllib3')
246
requests_log.setLevel(logging.DEBUG)
247
requests_log.propagate = True
248
249
250
def to_datetime(
251
obj: Optional[Union[str, datetime.datetime]],
252
) -> Optional[datetime.datetime]:
253
"""Convert string to datetime."""
254
if not obj:
255
return None
256
if isinstance(obj, datetime.datetime):
257
return obj
258
if obj == '0001-01-01T00:00:00Z':
259
return None
260
obj = obj.replace('Z', '')
261
# Fix datetimes with truncated zeros
262
if '.' in obj:
263
obj, micros = obj.split('.', 1)
264
micros = micros + '0' * (6 - len(micros))
265
obj = obj + '.' + micros
266
out = converters.datetime_fromisoformat(obj)
267
if isinstance(out, str):
268
return None
269
if isinstance(out, datetime.date) and not isinstance(out, datetime.datetime):
270
return datetime.datetime(out.year, out.month, out.day)
271
return out
272
273
274
def to_datetime_strict(
275
obj: Optional[Union[str, datetime.datetime]],
276
) -> datetime.datetime:
277
"""Convert string to datetime."""
278
if not obj:
279
raise TypeError('not possible to convert None to datetime')
280
if isinstance(obj, datetime.datetime):
281
return obj
282
if obj == '0001-01-01T00:00:00Z':
283
raise ValueError('not possible to convert 0001-01-01T00:00:00Z to datetime')
284
obj = obj.replace('Z', '')
285
# Fix datetimes with truncated zeros
286
if '.' in obj:
287
obj, micros = obj.split('.', 1)
288
micros = micros + '0' * (6 - len(micros))
289
obj = obj + '.' + micros
290
out = converters.datetime_fromisoformat(obj)
291
if not out:
292
raise TypeError('not possible to convert None to datetime')
293
if isinstance(out, str):
294
raise ValueError('value cannot be str')
295
if isinstance(out, datetime.date) and not isinstance(out, datetime.datetime):
296
return datetime.datetime(out.year, out.month, out.day)
297
return out
298
299
300
def from_datetime(
301
obj: Union[str, datetime.datetime],
302
) -> Optional[str]:
303
"""Convert datetime to string."""
304
if not obj:
305
return None
306
if isinstance(obj, str):
307
return obj
308
out = obj.isoformat()
309
if not re.search(r'[A-Za-z]$', out):
310
out = f'{out}Z'
311
return out
312
313
314
def vars_to_str(obj: Any) -> str:
315
"""Render a string representation of vars(obj)."""
316
attrs = []
317
obj_vars = vars(obj)
318
if 'name' in obj_vars:
319
attrs.append('name={}'.format(repr(obj_vars['name'])))
320
if 'id' in obj_vars:
321
attrs.append('id={}'.format(repr(obj_vars['id'])))
322
for name, value in sorted(obj_vars.items()):
323
if name in ('name', 'id'):
324
continue
325
if not value or name.startswith('_'):
326
continue
327
attrs.append('{}={}'.format(name, repr(value)))
328
return '{}({})'.format(type(obj).__name__, ', '.join(attrs))
329
330
331
def single_item(s: Any) -> Any:
332
"""Return only item if ``s`` is a list, otherwise return ``s``."""
333
if isinstance(s, list):
334
if len(s) != 1:
335
raise ValueError('list must only contain a singleitem')
336
return s[0]
337
return s
338
339
340
def stringify(s: JSON) -> str:
341
"""Convert list of strings to single string."""
342
if isinstance(s, (tuple, list)):
343
if len(s) > 1:
344
raise ValueError('list contains more than one item')
345
return s[0]
346
if isinstance(s, dict):
347
raise TypeError('only strings and lists are valid arguments')
348
return s
349
350
351
def listify(s: JSON) -> List[str]:
352
"""Convert string to list of strings."""
353
if isinstance(s, (tuple, list)):
354
return list(s)
355
if isinstance(s, dict):
356
raise TypeError('only strings and lists are valid arguments')
357
return [s]
358
359
360
def listify_obj(s: JSON) -> List[JSONObj]:
361
"""Convert object to list of objects."""
362
if isinstance(s, (tuple, list)):
363
for item in s:
364
if not isinstance(item, dict):
365
raise TypeError('only dicts and lists of dicts are valid parameters')
366
return list(s) # type: ignore
367
if not isinstance(s, dict):
368
raise TypeError('only dicts and lists of dicts are valid parameters')
369
return [s]
370
371
372
def _upper_match(m: Any) -> str:
373
"""Upper-case the first match group."""
374
return m.group(1).upper()
375
376
377
def snake_to_camel(s: Optional[str], cap_first: bool = False) -> Optional[str]:
378
"""Convert snake-case to camel-case."""
379
if s is None:
380
return None
381
out = re.sub(r'_([A-Za-z])', _upper_match, s.lower())
382
if cap_first and out:
383
return out[0].upper() + out[1:]
384
return out
385
386
387
def camel_to_snake(s: Optional[str]) -> Optional[str]:
388
"""Convert camel-case to snake-case."""
389
if s is None:
390
return None
391
out = re.sub(r'([A-Z]+)', r'_\1', s).lower()
392
if out and out[0] == '_':
393
return out[1:]
394
return out
395
396
397
def snake_to_camel_dict(
398
s: Optional[Mapping[str, Any]],
399
cap_first: bool = False,
400
) -> Optional[Dict[str, Any]]:
401
"""Convert snake-case keys to camel-case keys."""
402
if s is None:
403
return None
404
out = {}
405
for k, v in s.items():
406
if isinstance(v, Mapping):
407
out[str(snake_to_camel(k))] = snake_to_camel_dict(v, cap_first=cap_first)
408
else:
409
out[str(snake_to_camel(k))] = v
410
return out
411
412
413
def camel_to_snake_dict(s: Optional[Mapping[str, Any]]) -> Optional[Dict[str, Any]]:
414
"""Convert camel-case keys to snake-case keys."""
415
if s is None:
416
return None
417
out = {}
418
for k, v in s.items():
419
if isinstance(v, Mapping):
420
out[str(camel_to_snake(k))] = camel_to_snake_dict(v)
421
else:
422
out[str(camel_to_snake(k))] = v
423
return out
424
425