Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/cluster.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB Cluster Management."""
3
import datetime
4
import warnings
5
from typing import Any
6
from typing import Dict
7
from typing import List
8
from typing import Optional
9
from typing import Union
10
11
from .. import config
12
from .. import connection
13
from ..exceptions import ManagementError
14
from .manager import Manager
15
from .region import Region
16
from .utils import NamedList
17
from .utils import to_datetime
18
from .utils import vars_to_str
19
20
21
class Cluster(object):
22
"""
23
SingleStoreDB cluster definition.
24
25
This object is not instantiated directly. It is used in the results
26
of API calls on the :class:`ClusterManager`. Clusters are created using
27
:meth:`ClusterManager.create_cluster`, or existing clusters are accessed by either
28
:attr:`ClusterManager.clusters` or by calling :meth:`ClusterManager.get_cluster`.
29
30
See Also
31
--------
32
:meth:`ClusterManager.create_cluster`
33
:meth:`ClusterManager.get_cluster`
34
:attr:`ClusterManager.clusters`
35
36
"""
37
38
def __init__(
39
self, name: str, id: str, region: Region, size: str,
40
units: float, state: str, version: str,
41
created_at: Union[str, datetime.datetime],
42
expires_at: Optional[Union[str, datetime.datetime]] = None,
43
firewall_ranges: Optional[List[str]] = None,
44
terminated_at: Optional[Union[str, datetime.datetime]] = None,
45
endpoint: Optional[str] = None,
46
):
47
"""Use :attr:`ClusterManager.clusters` or :meth:`ClusterManager.get_cluster`."""
48
#: Name of the cluster
49
self.name = name.strip()
50
51
#: Unique ID of the cluster
52
self.id = id
53
54
#: Region of the cluster (see :class:`Region`)
55
self.region = region
56
57
#: Size of the cluster in cluster size notation (S-00, S-1, etc.)
58
self.size = size
59
60
#: Size of the cluster in units such as 0.25, 1.0, etc.
61
self.units = units
62
63
#: State of the cluster: PendingCreation, Transitioning, Active,
64
#: Terminated, Suspended, Resuming, Failed
65
self.state = state.strip()
66
67
#: Version of the SingleStoreDB server
68
self.version = version.strip()
69
70
#: Timestamp of when the cluster was created
71
self.created_at = to_datetime(created_at)
72
73
#: Timestamp of when the cluster expires
74
self.expires_at = to_datetime(expires_at)
75
76
#: List of allowed incoming IP addresses / ranges
77
self.firewall_ranges = firewall_ranges
78
79
#: Timestamp of when the cluster was terminated
80
self.terminated_at = to_datetime(terminated_at)
81
82
#: Hostname (or IP address) of the cluster database server
83
self.endpoint = endpoint
84
85
self._manager: Optional[ClusterManager] = None
86
87
def __str__(self) -> str:
88
"""Return string representation."""
89
return vars_to_str(self)
90
91
def __repr__(self) -> str:
92
"""Return string representation."""
93
return str(self)
94
95
@classmethod
96
def from_dict(cls, obj: Dict[str, Any], manager: 'ClusterManager') -> 'Cluster':
97
"""
98
Construct a Cluster from a dictionary of values.
99
100
Parameters
101
----------
102
obj : dict
103
Dictionary of values
104
manager : ClusterManager, optional
105
The ClusterManager the Cluster belongs to
106
107
Returns
108
-------
109
:class:`Cluster`
110
111
"""
112
out = cls(
113
name=obj['name'], id=obj['clusterID'],
114
region=Region.from_dict(obj['region'], manager),
115
size=obj.get('size', 'Unknown'), units=obj.get('units', float('nan')),
116
state=obj['state'], version=obj['version'],
117
created_at=obj['createdAt'], expires_at=obj.get('expiresAt'),
118
firewall_ranges=obj.get('firewallRanges'),
119
terminated_at=obj.get('terminatedAt'),
120
endpoint=obj.get('endpoint'),
121
)
122
out._manager = manager
123
return out
124
125
def refresh(self) -> 'Cluster':
126
"""Update the object to the current state."""
127
if self._manager is None:
128
raise ManagementError(
129
msg='No cluster manager is associated with this object.',
130
)
131
new_obj = self._manager.get_cluster(self.id)
132
for name, value in vars(new_obj).items():
133
setattr(self, name, value)
134
return self
135
136
def update(
137
self, name: Optional[str] = None,
138
admin_password: Optional[str] = None,
139
expires_at: Optional[str] = None,
140
size: Optional[str] = None, firewall_ranges: Optional[List[str]] = None,
141
) -> None:
142
"""
143
Update the cluster definition.
144
145
Parameters
146
----------
147
name : str, optional
148
Cluster name
149
admim_password : str, optional
150
Admin password for the cluster
151
expires_at : str, optional
152
Timestamp when the cluster expires
153
size : str, optional
154
Cluster size in cluster size notation (S-00, S-1, etc.)
155
firewall_ranges : Sequence[str], optional
156
List of allowed incoming IP addresses
157
158
"""
159
if self._manager is None:
160
raise ManagementError(
161
msg='No cluster manager is associated with this object.',
162
)
163
data = {
164
k: v for k, v in dict(
165
name=name, adminPassword=admin_password,
166
expiresAt=expires_at, size=size,
167
firewallRanges=firewall_ranges,
168
).items() if v is not None
169
}
170
self._manager._patch(f'clusters/{self.id}', json=data)
171
self.refresh()
172
173
def suspend(
174
self,
175
wait_on_suspended: bool = False,
176
wait_interval: int = 20,
177
wait_timeout: int = 600,
178
) -> None:
179
"""
180
Suspend the cluster.
181
182
Parameters
183
----------
184
wait_on_suspended : bool, optional
185
Wait for the cluster to go into 'Suspended' mode before returning
186
wait_interval : int, optional
187
Number of seconds between each server check
188
wait_timeout : int, optional
189
Total number of seconds to check server before giving up
190
191
Raises
192
------
193
ManagementError
194
If timeout is reached
195
196
"""
197
if self._manager is None:
198
raise ManagementError(
199
msg='No cluster manager is associated with this object.',
200
)
201
self._manager._post(
202
f'clusters/{self.id}/suspend',
203
headers={'Content-Type': 'application/x-www-form-urlencoded'},
204
)
205
if wait_on_suspended:
206
self._manager._wait_on_state(
207
self._manager.get_cluster(self.id),
208
'Suspended', interval=wait_interval, timeout=wait_timeout,
209
)
210
self.refresh()
211
212
def resume(
213
self,
214
wait_on_resumed: bool = False,
215
wait_interval: int = 20,
216
wait_timeout: int = 600,
217
) -> None:
218
"""
219
Resume the cluster.
220
221
Parameters
222
----------
223
wait_on_resumed : bool, optional
224
Wait for the cluster to go into 'Resumed' or 'Active' mode before returning
225
wait_interval : int, optional
226
Number of seconds between each server check
227
wait_timeout : int, optional
228
Total number of seconds to check server before giving up
229
230
Raises
231
------
232
ManagementError
233
If timeout is reached
234
235
"""
236
if self._manager is None:
237
raise ManagementError(
238
msg='No cluster manager is associated with this object.',
239
)
240
self._manager._post(
241
f'clusters/{self.id}/resume',
242
headers={'Content-Type': 'application/x-www-form-urlencoded'},
243
)
244
if wait_on_resumed:
245
self._manager._wait_on_state(
246
self._manager.get_cluster(self.id),
247
['Resumed', 'Active'], interval=wait_interval, timeout=wait_timeout,
248
)
249
self.refresh()
250
251
def terminate(
252
self,
253
wait_on_terminated: bool = False,
254
wait_interval: int = 10,
255
wait_timeout: int = 600,
256
) -> None:
257
"""
258
Terminate the cluster.
259
260
Parameters
261
----------
262
wait_on_terminated : bool, optional
263
Wait for the cluster to go into 'Terminated' mode before returning
264
wait_interval : int, optional
265
Number of seconds between each server check
266
wait_timeout : int, optional
267
Total number of seconds to check server before giving up
268
269
Raises
270
------
271
ManagementError
272
If timeout is reached
273
274
"""
275
if self._manager is None:
276
raise ManagementError(
277
msg='No cluster manager is associated with this object.',
278
)
279
self._manager._delete(f'clusters/{self.id}')
280
if wait_on_terminated:
281
self._manager._wait_on_state(
282
self._manager.get_cluster(self.id),
283
'Terminated', interval=wait_interval, timeout=wait_timeout,
284
)
285
self.refresh()
286
287
def connect(self, **kwargs: Any) -> connection.Connection:
288
"""
289
Create a connection to the database server for this cluster.
290
291
Parameters
292
----------
293
**kwargs : keyword-arguments, optional
294
Parameters to the SingleStoreDB `connect` function except host
295
and port which are supplied by the cluster object
296
297
Returns
298
-------
299
:class:`Connection`
300
301
"""
302
if not self.endpoint:
303
raise ManagementError(
304
msg='An endpoint has not been set in '
305
'this cluster configuration',
306
)
307
kwargs['host'] = self.endpoint
308
return connection.connect(**kwargs)
309
310
311
class ClusterManager(Manager):
312
"""
313
SingleStoreDB cluster manager.
314
315
This class should be instantiated using :func:`singlestoredb.manage_cluster`.
316
317
Parameters
318
----------
319
access_token : str, optional
320
The API key or other access token for the cluster management API
321
version : str, optional
322
Version of the API to use
323
base_url : str, optional
324
Base URL of the cluster management API
325
326
See Also
327
--------
328
:func:`singlestoredb.manage_cluster`
329
330
"""
331
332
#: Cluster management API version if none is specified.
333
default_version = 'v0beta'
334
335
#: Base URL if none is specified.
336
default_base_url = config.get_option('management.base_url') \
337
or 'https://api.singlestore.com'
338
339
#: Object type
340
obj_type = 'cluster'
341
342
@property
343
def clusters(self) -> NamedList[Cluster]:
344
"""Return a list of available clusters."""
345
res = self._get('clusters')
346
return NamedList([Cluster.from_dict(item, self) for item in res.json()])
347
348
@property
349
def regions(self) -> NamedList[Region]:
350
"""Return a list of available regions."""
351
res = self._get('regions')
352
return NamedList([Region.from_dict(item, self) for item in res.json()])
353
354
def create_cluster(
355
self, name: str, region: Union[str, Region], admin_password: str,
356
firewall_ranges: List[str], expires_at: Optional[str] = None,
357
size: Optional[str] = None, plan: Optional[str] = None,
358
wait_on_active: bool = False, wait_timeout: int = 600,
359
wait_interval: int = 20,
360
) -> Cluster:
361
"""
362
Create a new cluster.
363
364
Parameters
365
----------
366
name : str
367
Name of the cluster
368
region : str or Region
369
The region ID of the cluster
370
admin_password : str
371
Admin password for the cluster
372
firewall_ranges : Sequence[str], optional
373
List of allowed incoming IP addresses
374
expires_at : str, optional
375
Timestamp of when the cluster expires
376
size : str, optional
377
Cluster size in cluster size notation (S-00, S-1, etc.)
378
plan : str, optional
379
Internal use only
380
wait_on_active : bool, optional
381
Wait for the cluster to be active before returning
382
wait_timeout : int, optional
383
Maximum number of seconds to wait before raising an exception
384
if wait=True
385
wait_interval : int, optional
386
Number of seconds between each polling interval
387
388
Returns
389
-------
390
:class:`Cluster`
391
392
"""
393
if isinstance(region, Region) and region.id:
394
region = region.id
395
res = self._post(
396
'clusters', json=dict(
397
name=name, regionID=region, adminPassword=admin_password,
398
expiresAt=expires_at, size=size, firewallRanges=firewall_ranges,
399
plan=plan,
400
),
401
)
402
out = self.get_cluster(res.json()['clusterID'])
403
if wait_on_active:
404
out = self._wait_on_state(
405
out, 'Active', interval=wait_interval,
406
timeout=wait_timeout,
407
)
408
return out
409
410
def get_cluster(self, id: str) -> Cluster:
411
"""
412
Retrieve a cluster definition.
413
414
Parameters
415
----------
416
id : str
417
ID of the cluster
418
419
Returns
420
-------
421
:class:`Cluster`
422
423
"""
424
res = self._get(f'clusters/{id}')
425
return Cluster.from_dict(res.json(), manager=self)
426
427
428
def manage_cluster(
429
access_token: Optional[str] = None,
430
version: Optional[str] = None,
431
base_url: Optional[str] = None,
432
*,
433
organization_id: Optional[str] = None,
434
) -> ClusterManager:
435
"""
436
Retrieve a SingleStoreDB cluster manager.
437
438
Parameters
439
----------
440
access_token : str, optional
441
The API key or other access token for the cluster management API
442
version : str, optional
443
Version of the API to use
444
base_url : str, optional
445
Base URL of the cluster management API
446
organization_id: str, optional
447
ID of organization, if using a JWT for authentication
448
449
Returns
450
-------
451
:class:`ClusterManager`
452
453
"""
454
warnings.warn(
455
'The cluster management API is deprecated; '
456
'use manage_workspaces instead.',
457
category=DeprecationWarning,
458
)
459
return ClusterManager(
460
access_token=access_token, base_url=base_url,
461
version=version, organization_id=organization_id,
462
)
463
464