Path: blob/main/singlestoredb/management/export.py
469 views
#!/usr/bin/env python1"""SingleStoreDB export service."""2from __future__ import annotations34import copy5import json6from typing import Any7from typing import Dict8from typing import List9from typing import Optional10from typing import Union1112from .. import ManagementError13from .utils import vars_to_str14from .workspace import WorkspaceGroup15from .workspace import WorkspaceManager161718class ExportService(object):19"""Export service."""2021database: str22table: str23catalog_info: Dict[str, Any]24storage_info: Dict[str, Any]25columns: Optional[List[str]]26partition_by: Optional[List[Dict[str, str]]]27order_by: Optional[List[Dict[str, Dict[str, str]]]]28properties: Optional[Dict[str, Any]]29incremental: bool30refresh_interval: Optional[int]31export_id: Optional[str]3233def __init__(34self,35workspace_group: WorkspaceGroup,36database: str,37table: str,38catalog_info: Union[str, Dict[str, Any]],39storage_info: Union[str, Dict[str, Any]],40columns: Optional[List[str]] = None,41partition_by: Optional[List[Dict[str, str]]] = None,42order_by: Optional[List[Dict[str, Dict[str, str]]]] = None,43incremental: bool = False,44refresh_interval: Optional[int] = None,45properties: Optional[Dict[str, Any]] = None,46):47#: Workspace group48self.workspace_group = workspace_group4950#: Name of SingleStoreDB database51self.database = database5253#: Name of SingleStoreDB table54self.table = table5556#: List of columns to export57self.columns = columns5859#: Catalog60if isinstance(catalog_info, str):61self.catalog_info = json.loads(catalog_info)62else:63self.catalog_info = copy.copy(catalog_info)6465#: Storage66if isinstance(storage_info, str):67self.storage_info = json.loads(storage_info)68else:69self.storage_info = copy.copy(storage_info)7071self.partition_by = partition_by or None72self.order_by = order_by or None73self.properties = properties or None7475self.incremental = incremental76self.refresh_interval = refresh_interval7778self.export_id = None7980self._manager: Optional[WorkspaceManager] = workspace_group._manager8182@classmethod83def from_export_id(84self,85workspace_group: WorkspaceGroup,86export_id: str,87) -> ExportService:88"""Create export service from export ID."""89out = ExportService(90workspace_group=workspace_group,91database='',92table='',93catalog_info={},94storage_info={},95)96out.export_id = export_id97return out9899def __str__(self) -> str:100"""Return string representation."""101return vars_to_str(self)102103def __repr__(self) -> str:104"""Return string representation."""105return str(self)106107def create_cluster_identity(self) -> Dict[str, Any]:108"""Create a cluster identity."""109if self._manager is None:110raise ManagementError(111msg='No workspace manager is associated with this object.',112)113114out = self._manager._post(115f'workspaceGroups/{self.workspace_group.id}/'116'egress/createEgressClusterIdentity',117json=dict(118catalogInfo=self.catalog_info,119storageInfo=self.storage_info,120),121)122123return out.json()124125def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':126"""Start the export process."""127if not self.table or not self.database:128raise ManagementError(129msg='Database and table must be set before starting the export.',130)131132if self._manager is None:133raise ManagementError(134msg='No workspace manager is associated with this object.',135)136137partition_spec = None138if self.partition_by:139partition_spec = dict(partitions=self.partition_by)140141sort_order_spec = None142if self.order_by:143sort_order_spec = dict(keys=self.order_by)144145out = self._manager._post(146f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress',147json={148k: v for k, v in dict(149databaseName=self.database,150tableName=self.table,151storageInfo=self.storage_info,152catalogInfo=self.catalog_info,153partitionSpec=partition_spec,154sortOrderSpec=sort_order_spec,155properties=self.properties,156incremental=self.incremental or None,157refreshInterval=self.refresh_interval158if self.refresh_interval is not None else None,159).items() if v is not None160},161)162163self.export_id = str(out.json()['egressID'])164165return ExportStatus(self.export_id, self.workspace_group)166167def suspend(self) -> 'ExportStatus':168"""Suspend the export process."""169if self._manager is None:170raise ManagementError(171msg='No workspace manager is associated with this object.',172)173174if self.export_id is None:175raise ManagementError(176msg='Export ID is not set. You must start the export first.',177)178179self._manager._post(180f'workspaceGroups/{self.workspace_group.id}/egress/suspendTableEgress',181json=dict(egressID=self.export_id),182)183184return ExportStatus(self.export_id, self.workspace_group)185186def resume(self) -> 'ExportStatus':187"""Resume the export process."""188if self._manager is None:189raise ManagementError(190msg='No workspace manager is associated with this object.',191)192193if self.export_id is None:194raise ManagementError(195msg='Export ID is not set. You must start the export first.',196)197198self._manager._post(199f'workspaceGroups/{self.workspace_group.id}/egress/resumeTableEgress',200json=dict(egressID=self.export_id),201)202203return ExportStatus(self.export_id, self.workspace_group)204205def drop(self) -> None:206"""Drop the export process."""207if self._manager is None:208raise ManagementError(209msg='No workspace manager is associated with this object.',210)211212if self.export_id is None:213raise ManagementError(214msg='Export ID is not set. You must start the export first.',215)216217self._manager._delete(218f'workspaceGroups/{self.workspace_group.id}/egress/dropTableEgress',219json=dict(egressID=self.export_id),220)221222return None223224def status(self) -> ExportStatus:225"""Get the status of the export process."""226if self._manager is None:227raise ManagementError(228msg='No workspace manager is associated with this object.',229)230231if self.export_id is None:232raise ManagementError(233msg='Export ID is not set. You must start the export first.',234)235236return ExportStatus(self.export_id, self.workspace_group)237238239class ExportStatus(object):240241export_id: str242243def __init__(self, export_id: str, workspace_group: WorkspaceGroup):244self.export_id = export_id245self.workspace_group = workspace_group246self._manager: Optional[WorkspaceManager] = workspace_group._manager247248def _info(self) -> Dict[str, Any]:249"""Return export status."""250if self._manager is None:251raise ManagementError(252msg='No workspace manager is associated with this object.',253)254255out = self._manager._get(256f'workspaceGroups/{self.workspace_group.id}/egress/tableEgressStatus',257json=dict(egressID=self.export_id),258)259260return out.json()261262@property263def status(self) -> str:264"""Return export status."""265return self._info().get('status', 'Unknown')266267@property268def message(self) -> str:269"""Return export status message."""270return self._info().get('statusMsg', '')271272def __str__(self) -> str:273return self.status274275def __repr__(self) -> str:276return self.status277278279def _get_exports(280workspace_group: WorkspaceGroup,281scope: str = 'all',282) -> List[ExportStatus]:283"""Get all exports in the workspace group."""284if workspace_group._manager is None:285raise ManagementError(286msg='No workspace manager is associated with this object.',287)288289out = workspace_group._manager._get(290f'workspaceGroups/{workspace_group.id}/egress/tableEgressStatus',291json=dict(scope=scope),292)293294return out.json()295296297