Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/fusion/handlers/export.py
469 views
1
#!/usr/bin/env python3
2
import datetime
3
import json
4
from typing import Any
5
from typing import Dict
6
from typing import Optional
7
8
from .. import result
9
from ...management.export import _get_exports
10
from ...management.export import ExportService
11
from ...management.export import ExportStatus
12
from ..handler import SQLHandler
13
from ..result import FusionSQLResult
14
from .utils import get_workspace_group
15
16
17
class CreateClusterIdentity(SQLHandler):
18
"""
19
CREATE CLUSTER IDENTITY
20
catalog
21
storage
22
;
23
24
# Catolog
25
catalog = CATALOG { _catalog_config | _catalog_creds }
26
_catalog_config = CONFIG '<catalog-config>'
27
_catalog_creds = CREDENTIALS '<catalog-creds>'
28
29
# Storage
30
storage = LINK { _link_config | _link_creds }
31
_link_config = S3 CONFIG '<link-config>'
32
_link_creds = CREDENTIALS '<link-creds>'
33
34
Description
35
-----------
36
Create a cluster identity for allowing the export service to access
37
external cloud resources.
38
39
Arguments
40
---------
41
* ``<catalog-config>`` and ``<catalog-creds>``: Catalog configuration
42
and credentials in JSON format.
43
* ``<link-config>`` and ``<link-creds>``: Storage link configuration
44
and credentials in JSON format.
45
46
Remarks
47
-------
48
* ``CATALOG`` specifies the details of the catalog to connect to.
49
* ``LINK`` specifies the details of the data storage to connect to.
50
51
Example
52
-------
53
The following statement creates a cluster identity for the catalog
54
and link::
55
56
CREATE CLUSTER IDENTITY
57
CATALOG CONFIG '{
58
"catalog_type": "GLUE",
59
"table_format": "ICEBERG",
60
"catalog_id": "13983498723498",
61
"catalog_region": "us-east-1"
62
}'
63
LINK S3 CONFIG '{
64
"region": "us-east-1",
65
"endpoint_url": "s3://bucket-name"
66
67
}'
68
;
69
70
"""
71
72
_enabled = False
73
74
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
75
# Catalog
76
catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')
77
catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')
78
79
# Storage
80
storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')
81
storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')
82
83
storage_config['provider'] = 'S3'
84
85
wsg = get_workspace_group({})
86
87
if wsg._manager is None:
88
raise TypeError('no workspace manager is associated with workspace group')
89
90
out = ExportService(
91
wsg,
92
'none',
93
'none',
94
dict(**catalog_config, **catalog_creds),
95
dict(**storage_config, **storage_creds),
96
columns=None,
97
).create_cluster_identity()
98
99
res = FusionSQLResult()
100
res.add_field('Identity', result.STRING)
101
res.set_rows([(out['identity'],)])
102
103
return res
104
105
106
CreateClusterIdentity.register(overwrite=True)
107
108
109
def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]:
110
# From table
111
if isinstance(params['from_table'], str):
112
from_database = None
113
from_table = params['from_table']
114
else:
115
from_database, from_table = params['from_table']
116
117
# Catalog
118
catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')
119
catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')
120
121
# Storage
122
storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')
123
storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')
124
125
storage_config['provider'] = 'S3'
126
127
wsg = get_workspace_group({})
128
129
if from_database is None:
130
raise ValueError('database name must be specified for source table')
131
132
if wsg._manager is None:
133
raise TypeError('no workspace manager is associated with workspace group')
134
135
partition_by = []
136
if params['partition_by']:
137
for key in params['partition_by']:
138
transform = key['partition_key']['transform']['col_transform']
139
part = {}
140
part['transform'] = transform[0].lower()
141
part['name'] = transform[-1]['transform_col']
142
partition_by.append(part)
143
144
order_by = []
145
if params['order_by'] and params['order_by']['by']:
146
for key in params['order_by']['by']:
147
transform = key['transform']['col_transform']
148
order = {}
149
order['transform'] = transform[0].lower()
150
order['name'] = transform[-1]['transform_col']
151
order['direction'] = 'ascending'
152
order['null_order'] = 'nulls_first'
153
if key.get('direction'):
154
if 'desc' in key['direction'].lower():
155
order['direction'] = 'descending'
156
if key.get('null_order'):
157
if 'last' in key['null_order'].lower():
158
order['null_order'] = 'nulls_last'
159
order_by.append(order)
160
161
# Refresh interval
162
refresh_interval_delta = None
163
refresh_interval = params.get('refresh_interval', None)
164
if refresh_interval is not None:
165
value = int(refresh_interval['refresh_interval_value'])
166
time_unit = refresh_interval['refresh_interval_time_unit'].upper()
167
if value < 0:
168
raise ValueError('refresh interval must be greater than 0')
169
if time_unit == 'SECONDS':
170
refresh_interval_delta = datetime.timedelta(seconds=int(value))
171
elif time_unit == 'MINUTES':
172
refresh_interval_delta = datetime.timedelta(minutes=int(value))
173
elif time_unit == 'HOURS':
174
refresh_interval_delta = datetime.timedelta(hours=int(value))
175
elif time_unit == 'DAYS':
176
refresh_interval_delta = datetime.timedelta(days=int(value))
177
else:
178
raise ValueError('invalid refresh interval time unit')
179
180
out = ExportService(
181
wsg,
182
from_database,
183
from_table,
184
dict(**catalog_config, **catalog_creds),
185
dict(**storage_config, **storage_creds),
186
columns=None,
187
partition_by=partition_by or None,
188
order_by=order_by or None,
189
properties=json.loads(params['properties']) if params['properties'] else None,
190
incremental=params.get('incremental', False),
191
refresh_interval=int(refresh_interval_delta.total_seconds())
192
if refresh_interval_delta is not None else None,
193
).start()
194
195
res = FusionSQLResult()
196
res.add_field('ExportID', result.STRING)
197
res.set_rows([(out.export_id,)])
198
199
return res
200
201
202
class StartExport(SQLHandler):
203
"""
204
START EXPORT
205
from_table
206
catalog
207
storage
208
[ partition_by ]
209
[ order_by ]
210
[ properties ]
211
;
212
213
# From table
214
from_table = FROM <table>
215
216
# Transforms
217
_col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col )
218
_transform_col = <column>
219
_arg_transform = { BUCKET | TRUNCATE } ( _transform_col <comma> _transform_arg )
220
_transform_arg = <integer>
221
transform = { _col_transform | _arg_transform }
222
223
# Partitions
224
partition_by = PARTITION BY partition_key,...
225
partition_key = transform
226
227
# Sort order
228
order_by = ORDER BY sort_key,...
229
sort_key = transform [ direction ] [ null_order ]
230
direction = { ASC | DESC | ASCENDING | DESCENDING }
231
null_order = { NULLS_FIRST | NULLS_LAST }
232
233
# Properties
234
properties = PROPERTIES '<json>'
235
236
# Catolog
237
catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]
238
_catalog_config = CONFIG '<catalog-config>'
239
_catalog_creds = CREDENTIALS '<catalog-creds>'
240
241
# Storage
242
storage = LINK [ _link_config ] [ _link_creds ]
243
_link_config = S3 CONFIG '<link-config>'
244
_link_creds = CREDENTIALS '<link-creds>'
245
246
Description
247
-----------
248
Start an export.
249
250
Arguments
251
---------
252
* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.
253
* ``<link-config>`` and ``<link-creds>``: The storage link configuration.
254
255
Remarks
256
-------
257
* ``FROM <table>`` specifies the SingleStore table to export. The same name will
258
be used for the exported table.
259
* ``CATALOG`` specifies the details of the catalog to connect to.
260
* ``LINK`` specifies the details of the data storage to connect to.
261
262
Examples
263
--------
264
The following statement starts an export operation with the given
265
catalog and link configurations. The source table to export is
266
named "customer_data"::
267
268
START EXPORT FROM my_db.customer_data
269
CATALOG CONFIG '{
270
"catalog_type": "GLUE",
271
"table_format": "ICEBERG",
272
"catalog_id": "13983498723498",
273
"catalog_region": "us-east-1"
274
}'
275
LINK S3 CONFIG '{
276
"region": "us-east-1",
277
"endpoint_url": "s3://bucket-name"
278
}'
279
;
280
281
""" # noqa
282
283
_enabled = False
284
285
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
286
return _start_export(params)
287
288
289
StartExport.register(overwrite=True)
290
291
292
class StartIncrementalExport(SQLHandler):
293
"""
294
START INCREMENTAL EXPORT
295
from_table
296
catalog
297
storage
298
[ partition_by ]
299
[ order_by ]
300
[ properties ]
301
[ refresh_interval ]
302
;
303
304
# From table
305
from_table = FROM <table>
306
307
# Transforms
308
_col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col )
309
_transform_col = <column>
310
_arg_transform = { BUCKET | TRUNCATE } ( _transform_col <comma> _transform_arg )
311
_transform_arg = <integer>
312
transform = { _col_transform | _arg_transform }
313
314
# Partitions
315
partition_by = PARTITION BY partition_key,...
316
partition_key = transform
317
318
# Sort order
319
order_by = ORDER BY sort_key,...
320
sort_key = transform [ direction ] [ null_order ]
321
direction = { ASC | DESC | ASCENDING | DESCENDING }
322
null_order = { NULLS_FIRST | NULLS_LAST }
323
324
# Properties
325
properties = PROPERTIES '<json>'
326
327
# Catolog
328
catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]
329
_catalog_config = CONFIG '<catalog-config>'
330
_catalog_creds = CREDENTIALS '<catalog-creds>'
331
332
# Storage
333
storage = LINK [ _link_config ] [ _link_creds ]
334
_link_config = S3 CONFIG '<link-config>'
335
_link_creds = CREDENTIALS '<link-creds>'
336
337
# Refresh interval
338
refresh_interval = REFRESH INTERVAL _refresh_interval_value _refresh_interval_time_unit
339
_refresh_interval_value = <integer>
340
_refresh_interval_time_unit = { SECONDS | MINUTES | HOURS | DAYS }
341
342
Description
343
-----------
344
Start an incremental export.
345
346
Arguments
347
---------
348
* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.
349
* ``<link-config>`` and ``<link-creds>``: The storage link configuration.
350
351
Remarks
352
-------
353
* ``FROM <table>`` specifies the SingleStore table to export. The same name will
354
be used for the exported table.
355
* ``CATALOG`` specifies the details of the catalog to connect to.
356
* ``LINK`` specifies the details of the data storage to connect to.
357
* ``REFRESH INTERVAL`` specifies the interval for refreshing the
358
incremental export. The default is 1 day.
359
360
Examples
361
--------
362
The following statement starts an export operation with the given
363
catalog and link configurations. The source table to export is
364
named "customer_data"::
365
366
START INCREMENTAL EXPORT FROM my_db.customer_data
367
CATALOG CONFIG '{
368
"catalog_type": "GLUE",
369
"table_format": "ICEBERG",
370
"catalog_id": "13983498723498",
371
"catalog_region": "us-east-1"
372
}'
373
LINK S3 CONFIG '{
374
"region": "us-east-1",
375
"endpoint_url": "s3://bucket-name"
376
}'
377
REFRESH INTERVAL 24 HOURS
378
;
379
380
""" # noqa
381
382
_enabled = False
383
384
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
385
params['incremental'] = True
386
return _start_export(params)
387
388
389
StartIncrementalExport.register(overwrite=True)
390
391
392
def _format_status(export_id: str, status: ExportStatus) -> Optional[FusionSQLResult]:
393
"""Return the status of an export operation."""
394
info = status._info()
395
396
res = FusionSQLResult()
397
res.add_field('ExportID', result.STRING)
398
res.add_field('Status', result.STRING)
399
res.add_field('Message', result.STRING)
400
res.set_rows([
401
(
402
export_id,
403
info.get('status', 'Unknown'),
404
info.get('statusMsg', ''),
405
),
406
])
407
408
return res
409
410
411
class ShowExport(SQLHandler):
412
"""
413
SHOW EXPORT export_id;
414
415
# ID of export
416
export_id = '<export-id>'
417
418
"""
419
420
_enabled = False
421
422
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
423
wsg = get_workspace_group({})
424
return _format_status(
425
params['export_id'], ExportStatus(params['export_id'], wsg),
426
)
427
428
429
ShowExport.register(overwrite=True)
430
431
432
class ShowExports(SQLHandler):
433
"""
434
SHOW EXPORTS [ scope ];
435
436
# Location of the export
437
scope = FOR '<scope>'
438
439
"""
440
441
_enabled = False
442
443
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
444
wsg = get_workspace_group({})
445
446
exports = _get_exports(wsg, params.get('scope', 'all'))
447
448
res = FusionSQLResult()
449
res.add_field('ExportID', result.STRING)
450
res.add_field('Status', result.STRING)
451
res.add_field('Message', result.STRING)
452
res.set_rows([
453
(
454
info['egressID'],
455
info.get('status', 'Unknown'),
456
info.get('statusMsg', ''),
457
)
458
for info in [x._info() for x in exports]
459
])
460
461
return res
462
463
464
ShowExports.register(overwrite=True)
465
466
467
class SuspendExport(SQLHandler):
468
"""
469
SUSPEND EXPORT export_id;
470
471
# ID of export
472
export_id = '<export-id>'
473
474
"""
475
476
_enabled = False
477
478
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
479
wsg = get_workspace_group({})
480
service = ExportService.from_export_id(wsg, params['export_id'])
481
return _format_status(params['export_id'], service.suspend())
482
483
484
SuspendExport.register(overwrite=True)
485
486
487
class ResumeExport(SQLHandler):
488
"""
489
RESUME EXPORT export_id;
490
491
# ID of export
492
export_id = '<export-id>'
493
494
"""
495
496
_enabled = False
497
498
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
499
wsg = get_workspace_group({})
500
service = ExportService.from_export_id(wsg, params['export_id'])
501
return _format_status(params['export_id'], service.resume())
502
503
504
ResumeExport.register(overwrite=True)
505
506
507
class DropExport(SQLHandler):
508
"""
509
DROP EXPORT export_id;
510
511
# ID of export
512
export_id = '<export-id>'
513
514
"""
515
516
_enabled = False
517
518
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
519
wsg = get_workspace_group({})
520
service = ExportService.from_export_id(wsg, params['export_id'])
521
service.drop()
522
return None
523
524
525
DropExport.register(overwrite=True)
526
527