Path: blob/main/singlestoredb/fusion/handlers/export.py
469 views
#!/usr/bin/env python31import datetime2import json3from typing import Any4from typing import Dict5from typing import Optional67from .. import result8from ...management.export import _get_exports9from ...management.export import ExportService10from ...management.export import ExportStatus11from ..handler import SQLHandler12from ..result import FusionSQLResult13from .utils import get_workspace_group141516class CreateClusterIdentity(SQLHandler):17"""18CREATE CLUSTER IDENTITY19catalog20storage21;2223# Catolog24catalog = CATALOG { _catalog_config | _catalog_creds }25_catalog_config = CONFIG '<catalog-config>'26_catalog_creds = CREDENTIALS '<catalog-creds>'2728# Storage29storage = LINK { _link_config | _link_creds }30_link_config = S3 CONFIG '<link-config>'31_link_creds = CREDENTIALS '<link-creds>'3233Description34-----------35Create a cluster identity for allowing the export service to access36external cloud resources.3738Arguments39---------40* ``<catalog-config>`` and ``<catalog-creds>``: Catalog configuration41and credentials in JSON format.42* ``<link-config>`` and ``<link-creds>``: Storage link configuration43and credentials in JSON format.4445Remarks46-------47* ``CATALOG`` specifies the details of the catalog to connect to.48* ``LINK`` specifies the details of the data storage to connect to.4950Example51-------52The following statement creates a cluster identity for the catalog53and link::5455CREATE CLUSTER IDENTITY56CATALOG CONFIG '{57"catalog_type": "GLUE",58"table_format": "ICEBERG",59"catalog_id": "13983498723498",60"catalog_region": "us-east-1"61}'62LINK S3 CONFIG '{63"region": "us-east-1",64"endpoint_url": "s3://bucket-name"6566}'67;6869"""7071_enabled = False7273def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:74# Catalog75catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')76catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')7778# Storage79storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')80storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')8182storage_config['provider'] = 'S3'8384wsg = get_workspace_group({})8586if wsg._manager is None:87raise TypeError('no workspace manager is associated with workspace group')8889out = ExportService(90wsg,91'none',92'none',93dict(**catalog_config, **catalog_creds),94dict(**storage_config, **storage_creds),95columns=None,96).create_cluster_identity()9798res = FusionSQLResult()99res.add_field('Identity', result.STRING)100res.set_rows([(out['identity'],)])101102return res103104105CreateClusterIdentity.register(overwrite=True)106107108def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]:109# From table110if isinstance(params['from_table'], str):111from_database = None112from_table = params['from_table']113else:114from_database, from_table = params['from_table']115116# Catalog117catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')118catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')119120# Storage121storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')122storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')123124storage_config['provider'] = 'S3'125126wsg = get_workspace_group({})127128if from_database is None:129raise ValueError('database name must be specified for source table')130131if wsg._manager is None:132raise TypeError('no workspace manager is associated with workspace group')133134partition_by = []135if params['partition_by']:136for key in params['partition_by']:137transform = key['partition_key']['transform']['col_transform']138part = {}139part['transform'] = transform[0].lower()140part['name'] = transform[-1]['transform_col']141partition_by.append(part)142143order_by = []144if params['order_by'] and params['order_by']['by']:145for key in params['order_by']['by']:146transform = key['transform']['col_transform']147order = {}148order['transform'] = transform[0].lower()149order['name'] = transform[-1]['transform_col']150order['direction'] = 'ascending'151order['null_order'] = 'nulls_first'152if key.get('direction'):153if 'desc' in key['direction'].lower():154order['direction'] = 'descending'155if key.get('null_order'):156if 'last' in key['null_order'].lower():157order['null_order'] = 'nulls_last'158order_by.append(order)159160# Refresh interval161refresh_interval_delta = None162refresh_interval = params.get('refresh_interval', None)163if refresh_interval is not None:164value = int(refresh_interval['refresh_interval_value'])165time_unit = refresh_interval['refresh_interval_time_unit'].upper()166if value < 0:167raise ValueError('refresh interval must be greater than 0')168if time_unit == 'SECONDS':169refresh_interval_delta = datetime.timedelta(seconds=int(value))170elif time_unit == 'MINUTES':171refresh_interval_delta = datetime.timedelta(minutes=int(value))172elif time_unit == 'HOURS':173refresh_interval_delta = datetime.timedelta(hours=int(value))174elif time_unit == 'DAYS':175refresh_interval_delta = datetime.timedelta(days=int(value))176else:177raise ValueError('invalid refresh interval time unit')178179out = ExportService(180wsg,181from_database,182from_table,183dict(**catalog_config, **catalog_creds),184dict(**storage_config, **storage_creds),185columns=None,186partition_by=partition_by or None,187order_by=order_by or None,188properties=json.loads(params['properties']) if params['properties'] else None,189incremental=params.get('incremental', False),190refresh_interval=int(refresh_interval_delta.total_seconds())191if refresh_interval_delta is not None else None,192).start()193194res = FusionSQLResult()195res.add_field('ExportID', result.STRING)196res.set_rows([(out.export_id,)])197198return res199200201class StartExport(SQLHandler):202"""203START EXPORT204from_table205catalog206storage207[ partition_by ]208[ order_by ]209[ properties ]210;211212# From table213from_table = FROM <table>214215# Transforms216_col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col )217_transform_col = <column>218_arg_transform = { BUCKET | TRUNCATE } ( _transform_col <comma> _transform_arg )219_transform_arg = <integer>220transform = { _col_transform | _arg_transform }221222# Partitions223partition_by = PARTITION BY partition_key,...224partition_key = transform225226# Sort order227order_by = ORDER BY sort_key,...228sort_key = transform [ direction ] [ null_order ]229direction = { ASC | DESC | ASCENDING | DESCENDING }230null_order = { NULLS_FIRST | NULLS_LAST }231232# Properties233properties = PROPERTIES '<json>'234235# Catolog236catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]237_catalog_config = CONFIG '<catalog-config>'238_catalog_creds = CREDENTIALS '<catalog-creds>'239240# Storage241storage = LINK [ _link_config ] [ _link_creds ]242_link_config = S3 CONFIG '<link-config>'243_link_creds = CREDENTIALS '<link-creds>'244245Description246-----------247Start an export.248249Arguments250---------251* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.252* ``<link-config>`` and ``<link-creds>``: The storage link configuration.253254Remarks255-------256* ``FROM <table>`` specifies the SingleStore table to export. The same name will257be used for the exported table.258* ``CATALOG`` specifies the details of the catalog to connect to.259* ``LINK`` specifies the details of the data storage to connect to.260261Examples262--------263The following statement starts an export operation with the given264catalog and link configurations. The source table to export is265named "customer_data"::266267START EXPORT FROM my_db.customer_data268CATALOG CONFIG '{269"catalog_type": "GLUE",270"table_format": "ICEBERG",271"catalog_id": "13983498723498",272"catalog_region": "us-east-1"273}'274LINK S3 CONFIG '{275"region": "us-east-1",276"endpoint_url": "s3://bucket-name"277}'278;279280""" # noqa281282_enabled = False283284def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:285return _start_export(params)286287288StartExport.register(overwrite=True)289290291class StartIncrementalExport(SQLHandler):292"""293START INCREMENTAL EXPORT294from_table295catalog296storage297[ partition_by ]298[ order_by ]299[ properties ]300[ refresh_interval ]301;302303# From table304from_table = FROM <table>305306# Transforms307_col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col )308_transform_col = <column>309_arg_transform = { BUCKET | TRUNCATE } ( _transform_col <comma> _transform_arg )310_transform_arg = <integer>311transform = { _col_transform | _arg_transform }312313# Partitions314partition_by = PARTITION BY partition_key,...315partition_key = transform316317# Sort order318order_by = ORDER BY sort_key,...319sort_key = transform [ direction ] [ null_order ]320direction = { ASC | DESC | ASCENDING | DESCENDING }321null_order = { NULLS_FIRST | NULLS_LAST }322323# Properties324properties = PROPERTIES '<json>'325326# Catolog327catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]328_catalog_config = CONFIG '<catalog-config>'329_catalog_creds = CREDENTIALS '<catalog-creds>'330331# Storage332storage = LINK [ _link_config ] [ _link_creds ]333_link_config = S3 CONFIG '<link-config>'334_link_creds = CREDENTIALS '<link-creds>'335336# Refresh interval337refresh_interval = REFRESH INTERVAL _refresh_interval_value _refresh_interval_time_unit338_refresh_interval_value = <integer>339_refresh_interval_time_unit = { SECONDS | MINUTES | HOURS | DAYS }340341Description342-----------343Start an incremental export.344345Arguments346---------347* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.348* ``<link-config>`` and ``<link-creds>``: The storage link configuration.349350Remarks351-------352* ``FROM <table>`` specifies the SingleStore table to export. The same name will353be used for the exported table.354* ``CATALOG`` specifies the details of the catalog to connect to.355* ``LINK`` specifies the details of the data storage to connect to.356* ``REFRESH INTERVAL`` specifies the interval for refreshing the357incremental export. The default is 1 day.358359Examples360--------361The following statement starts an export operation with the given362catalog and link configurations. The source table to export is363named "customer_data"::364365START INCREMENTAL EXPORT FROM my_db.customer_data366CATALOG CONFIG '{367"catalog_type": "GLUE",368"table_format": "ICEBERG",369"catalog_id": "13983498723498",370"catalog_region": "us-east-1"371}'372LINK S3 CONFIG '{373"region": "us-east-1",374"endpoint_url": "s3://bucket-name"375}'376REFRESH INTERVAL 24 HOURS377;378379""" # noqa380381_enabled = False382383def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:384params['incremental'] = True385return _start_export(params)386387388StartIncrementalExport.register(overwrite=True)389390391def _format_status(export_id: str, status: ExportStatus) -> Optional[FusionSQLResult]:392"""Return the status of an export operation."""393info = status._info()394395res = FusionSQLResult()396res.add_field('ExportID', result.STRING)397res.add_field('Status', result.STRING)398res.add_field('Message', result.STRING)399res.set_rows([400(401export_id,402info.get('status', 'Unknown'),403info.get('statusMsg', ''),404),405])406407return res408409410class ShowExport(SQLHandler):411"""412SHOW EXPORT export_id;413414# ID of export415export_id = '<export-id>'416417"""418419_enabled = False420421def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:422wsg = get_workspace_group({})423return _format_status(424params['export_id'], ExportStatus(params['export_id'], wsg),425)426427428ShowExport.register(overwrite=True)429430431class ShowExports(SQLHandler):432"""433SHOW EXPORTS [ scope ];434435# Location of the export436scope = FOR '<scope>'437438"""439440_enabled = False441442def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:443wsg = get_workspace_group({})444445exports = _get_exports(wsg, params.get('scope', 'all'))446447res = FusionSQLResult()448res.add_field('ExportID', result.STRING)449res.add_field('Status', result.STRING)450res.add_field('Message', result.STRING)451res.set_rows([452(453info['egressID'],454info.get('status', 'Unknown'),455info.get('statusMsg', ''),456)457for info in [x._info() for x in exports]458])459460return res461462463ShowExports.register(overwrite=True)464465466class SuspendExport(SQLHandler):467"""468SUSPEND EXPORT export_id;469470# ID of export471export_id = '<export-id>'472473"""474475_enabled = False476477def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:478wsg = get_workspace_group({})479service = ExportService.from_export_id(wsg, params['export_id'])480return _format_status(params['export_id'], service.suspend())481482483SuspendExport.register(overwrite=True)484485486class ResumeExport(SQLHandler):487"""488RESUME EXPORT export_id;489490# ID of export491export_id = '<export-id>'492493"""494495_enabled = False496497def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:498wsg = get_workspace_group({})499service = ExportService.from_export_id(wsg, params['export_id'])500return _format_status(params['export_id'], service.resume())501502503ResumeExport.register(overwrite=True)504505506class DropExport(SQLHandler):507"""508DROP EXPORT export_id;509510# ID of export511export_id = '<export-id>'512513"""514515_enabled = False516517def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:518wsg = get_workspace_group({})519service = ExportService.from_export_id(wsg, params['export_id'])520service.drop()521return None522523524DropExport.register(overwrite=True)525526527