Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/export.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB export service."""
3
from __future__ import annotations
4
5
import copy
6
import json
7
from typing import Any
8
from typing import Dict
9
from typing import List
10
from typing import Optional
11
from typing import Union
12
13
from .. import ManagementError
14
from .utils import vars_to_str
15
from .workspace import WorkspaceGroup
16
from .workspace import WorkspaceManager
17
18
19
class ExportService(object):
20
"""Export service."""
21
22
database: str
23
table: str
24
catalog_info: Dict[str, Any]
25
storage_info: Dict[str, Any]
26
columns: Optional[List[str]]
27
partition_by: Optional[List[Dict[str, str]]]
28
order_by: Optional[List[Dict[str, Dict[str, str]]]]
29
properties: Optional[Dict[str, Any]]
30
incremental: bool
31
refresh_interval: Optional[int]
32
export_id: Optional[str]
33
34
def __init__(
35
self,
36
workspace_group: WorkspaceGroup,
37
database: str,
38
table: str,
39
catalog_info: Union[str, Dict[str, Any]],
40
storage_info: Union[str, Dict[str, Any]],
41
columns: Optional[List[str]] = None,
42
partition_by: Optional[List[Dict[str, str]]] = None,
43
order_by: Optional[List[Dict[str, Dict[str, str]]]] = None,
44
incremental: bool = False,
45
refresh_interval: Optional[int] = None,
46
properties: Optional[Dict[str, Any]] = None,
47
):
48
#: Workspace group
49
self.workspace_group = workspace_group
50
51
#: Name of SingleStoreDB database
52
self.database = database
53
54
#: Name of SingleStoreDB table
55
self.table = table
56
57
#: List of columns to export
58
self.columns = columns
59
60
#: Catalog
61
if isinstance(catalog_info, str):
62
self.catalog_info = json.loads(catalog_info)
63
else:
64
self.catalog_info = copy.copy(catalog_info)
65
66
#: Storage
67
if isinstance(storage_info, str):
68
self.storage_info = json.loads(storage_info)
69
else:
70
self.storage_info = copy.copy(storage_info)
71
72
self.partition_by = partition_by or None
73
self.order_by = order_by or None
74
self.properties = properties or None
75
76
self.incremental = incremental
77
self.refresh_interval = refresh_interval
78
79
self.export_id = None
80
81
self._manager: Optional[WorkspaceManager] = workspace_group._manager
82
83
@classmethod
84
def from_export_id(
85
self,
86
workspace_group: WorkspaceGroup,
87
export_id: str,
88
) -> ExportService:
89
"""Create export service from export ID."""
90
out = ExportService(
91
workspace_group=workspace_group,
92
database='',
93
table='',
94
catalog_info={},
95
storage_info={},
96
)
97
out.export_id = export_id
98
return out
99
100
def __str__(self) -> str:
101
"""Return string representation."""
102
return vars_to_str(self)
103
104
def __repr__(self) -> str:
105
"""Return string representation."""
106
return str(self)
107
108
def create_cluster_identity(self) -> Dict[str, Any]:
109
"""Create a cluster identity."""
110
if self._manager is None:
111
raise ManagementError(
112
msg='No workspace manager is associated with this object.',
113
)
114
115
out = self._manager._post(
116
f'workspaceGroups/{self.workspace_group.id}/'
117
'egress/createEgressClusterIdentity',
118
json=dict(
119
catalogInfo=self.catalog_info,
120
storageInfo=self.storage_info,
121
),
122
)
123
124
return out.json()
125
126
def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
127
"""Start the export process."""
128
if not self.table or not self.database:
129
raise ManagementError(
130
msg='Database and table must be set before starting the export.',
131
)
132
133
if self._manager is None:
134
raise ManagementError(
135
msg='No workspace manager is associated with this object.',
136
)
137
138
partition_spec = None
139
if self.partition_by:
140
partition_spec = dict(partitions=self.partition_by)
141
142
sort_order_spec = None
143
if self.order_by:
144
sort_order_spec = dict(keys=self.order_by)
145
146
out = self._manager._post(
147
f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress',
148
json={
149
k: v for k, v in dict(
150
databaseName=self.database,
151
tableName=self.table,
152
storageInfo=self.storage_info,
153
catalogInfo=self.catalog_info,
154
partitionSpec=partition_spec,
155
sortOrderSpec=sort_order_spec,
156
properties=self.properties,
157
incremental=self.incremental or None,
158
refreshInterval=self.refresh_interval
159
if self.refresh_interval is not None else None,
160
).items() if v is not None
161
},
162
)
163
164
self.export_id = str(out.json()['egressID'])
165
166
return ExportStatus(self.export_id, self.workspace_group)
167
168
def suspend(self) -> 'ExportStatus':
169
"""Suspend the export process."""
170
if self._manager is None:
171
raise ManagementError(
172
msg='No workspace manager is associated with this object.',
173
)
174
175
if self.export_id is None:
176
raise ManagementError(
177
msg='Export ID is not set. You must start the export first.',
178
)
179
180
self._manager._post(
181
f'workspaceGroups/{self.workspace_group.id}/egress/suspendTableEgress',
182
json=dict(egressID=self.export_id),
183
)
184
185
return ExportStatus(self.export_id, self.workspace_group)
186
187
def resume(self) -> 'ExportStatus':
188
"""Resume the export process."""
189
if self._manager is None:
190
raise ManagementError(
191
msg='No workspace manager is associated with this object.',
192
)
193
194
if self.export_id is None:
195
raise ManagementError(
196
msg='Export ID is not set. You must start the export first.',
197
)
198
199
self._manager._post(
200
f'workspaceGroups/{self.workspace_group.id}/egress/resumeTableEgress',
201
json=dict(egressID=self.export_id),
202
)
203
204
return ExportStatus(self.export_id, self.workspace_group)
205
206
def drop(self) -> None:
207
"""Drop the export process."""
208
if self._manager is None:
209
raise ManagementError(
210
msg='No workspace manager is associated with this object.',
211
)
212
213
if self.export_id is None:
214
raise ManagementError(
215
msg='Export ID is not set. You must start the export first.',
216
)
217
218
self._manager._delete(
219
f'workspaceGroups/{self.workspace_group.id}/egress/dropTableEgress',
220
json=dict(egressID=self.export_id),
221
)
222
223
return None
224
225
def status(self) -> ExportStatus:
226
"""Get the status of the export process."""
227
if self._manager is None:
228
raise ManagementError(
229
msg='No workspace manager is associated with this object.',
230
)
231
232
if self.export_id is None:
233
raise ManagementError(
234
msg='Export ID is not set. You must start the export first.',
235
)
236
237
return ExportStatus(self.export_id, self.workspace_group)
238
239
240
class ExportStatus(object):
241
242
export_id: str
243
244
def __init__(self, export_id: str, workspace_group: WorkspaceGroup):
245
self.export_id = export_id
246
self.workspace_group = workspace_group
247
self._manager: Optional[WorkspaceManager] = workspace_group._manager
248
249
def _info(self) -> Dict[str, Any]:
250
"""Return export status."""
251
if self._manager is None:
252
raise ManagementError(
253
msg='No workspace manager is associated with this object.',
254
)
255
256
out = self._manager._get(
257
f'workspaceGroups/{self.workspace_group.id}/egress/tableEgressStatus',
258
json=dict(egressID=self.export_id),
259
)
260
261
return out.json()
262
263
@property
264
def status(self) -> str:
265
"""Return export status."""
266
return self._info().get('status', 'Unknown')
267
268
@property
269
def message(self) -> str:
270
"""Return export status message."""
271
return self._info().get('statusMsg', '')
272
273
def __str__(self) -> str:
274
return self.status
275
276
def __repr__(self) -> str:
277
return self.status
278
279
280
def _get_exports(
281
workspace_group: WorkspaceGroup,
282
scope: str = 'all',
283
) -> List[ExportStatus]:
284
"""Get all exports in the workspace group."""
285
if workspace_group._manager is None:
286
raise ManagementError(
287
msg='No workspace manager is associated with this object.',
288
)
289
290
out = workspace_group._manager._get(
291
f'workspaceGroups/{workspace_group.id}/egress/tableEgressStatus',
292
json=dict(scope=scope),
293
)
294
295
return out.json()
296
297