Path: blob/main/singlestoredb/management/job.py
469 views
#!/usr/bin/env python1"""SingleStoreDB Cloud Scheduled Notebook Job."""2import datetime3import time4from enum import Enum5from typing import Any6from typing import Dict7from typing import List8from typing import Optional9from typing import Type10from typing import Union1112from ..exceptions import ManagementError13from .manager import Manager14from .utils import camel_to_snake15from .utils import from_datetime16from .utils import get_cluster_id17from .utils import get_database_name18from .utils import get_virtual_workspace_id19from .utils import get_workspace_id20from .utils import to_datetime21from .utils import to_datetime_strict22from .utils import vars_to_str232425type_to_parameter_conversion_map = {26str: 'string',27int: 'integer',28float: 'float',29bool: 'boolean',30}313233class Mode(Enum):34ONCE = 'Once'35RECURRING = 'Recurring'3637@classmethod38def from_str(cls, s: str) -> 'Mode':39try:40return cls[str(camel_to_snake(s)).upper()]41except KeyError:42raise ValueError(f'Unknown Mode: {s}')4344def __str__(self) -> str:45"""Return string representation."""46return self.value4748def __repr__(self) -> str:49"""Return string representation."""50return str(self)515253class TargetType(Enum):54WORKSPACE = 'Workspace'55CLUSTER = 'Cluster'56VIRTUAL_WORKSPACE = 'VirtualWorkspace'5758@classmethod59def from_str(cls, s: str) -> 'TargetType':60try:61return cls[str(camel_to_snake(s)).upper()]62except KeyError:63raise ValueError(f'Unknown TargetType: {s}')6465def __str__(self) -> str:66"""Return string representation."""67return self.value6869def __repr__(self) -> str:70"""Return string representation."""71return str(self)727374class Status(Enum):75UNKNOWN = 'Unknown'76SCHEDULED = 'Scheduled'77RUNNING = 'Running'78COMPLETED = 'Completed'79FAILED = 'Failed'80ERROR = 'Error'81CANCELED = 'Canceled'8283@classmethod84def from_str(cls, s: str) -> 'Status':85try:86return cls[str(camel_to_snake(s)).upper()]87except KeyError:88return cls.UNKNOWN8990def __str__(self) -> str:91"""Return string representation."""92return self.value9394def __repr__(self) -> str:95"""Return string representation."""96return str(self)979899class Parameter(object):100101name: str102value: str103type: str104105def __init__(106self,107name: str,108value: str,109type: str,110):111self.name = name112self.value = value113self.type = type114115@classmethod116def from_dict(cls, obj: Dict[str, Any]) -> 'Parameter':117"""118Construct a Parameter from a dictionary of values.119120Parameters121----------122obj : dict123Dictionary of values124125Returns126-------127:class:`Parameter`128129"""130out = cls(131name=obj['name'],132value=obj['value'],133type=obj['type'],134)135136return out137138def __str__(self) -> str:139"""Return string representation."""140return vars_to_str(self)141142def __repr__(self) -> str:143"""Return string representation."""144return str(self)145146147class Runtime(object):148149name: str150description: str151152def __init__(153self,154name: str,155description: str,156):157self.name = name158self.description = description159160@classmethod161def from_dict(cls, obj: Dict[str, Any]) -> 'Runtime':162"""163Construct a Runtime from a dictionary of values.164165Parameters166----------167obj : dict168Dictionary of values169170Returns171-------172:class:`Runtime`173174"""175out = cls(176name=obj['name'],177description=obj['description'],178)179180return out181182def __str__(self) -> str:183"""Return string representation."""184return vars_to_str(self)185186def __repr__(self) -> str:187"""Return string representation."""188return str(self)189190191class JobMetadata(object):192193avg_duration_in_seconds: Optional[float]194count: int195max_duration_in_seconds: Optional[float]196status: Status197198def __init__(199self,200avg_duration_in_seconds: Optional[float],201count: int,202max_duration_in_seconds: Optional[float],203status: Status,204):205self.avg_duration_in_seconds = avg_duration_in_seconds206self.count = count207self.max_duration_in_seconds = max_duration_in_seconds208self.status = status209210@classmethod211def from_dict(cls, obj: Dict[str, Any]) -> 'JobMetadata':212"""213Construct a JobMetadata from a dictionary of values.214215Parameters216----------217obj : dict218Dictionary of values219220Returns221-------222:class:`JobMetadata`223224"""225out = cls(226avg_duration_in_seconds=obj.get('avgDurationInSeconds'),227count=obj['count'],228max_duration_in_seconds=obj.get('maxDurationInSeconds'),229status=Status.from_str(obj['status']),230)231232return out233234def __str__(self) -> str:235"""Return string representation."""236return vars_to_str(self)237238def __repr__(self) -> str:239"""Return string representation."""240return str(self)241242243class ExecutionMetadata(object):244245start_execution_number: int246end_execution_number: int247248def __init__(249self,250start_execution_number: int,251end_execution_number: int,252):253self.start_execution_number = start_execution_number254self.end_execution_number = end_execution_number255256@classmethod257def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionMetadata':258"""259Construct an ExecutionMetadata from a dictionary of values.260261Parameters262----------263obj : dict264Dictionary of values265266Returns267-------268:class:`ExecutionMetadata`269270"""271out = cls(272start_execution_number=obj['startExecutionNumber'],273end_execution_number=obj['endExecutionNumber'],274)275276return out277278def __str__(self) -> str:279"""Return string representation."""280return vars_to_str(self)281282def __repr__(self) -> str:283"""Return string representation."""284return str(self)285286287class Execution(object):288289execution_id: str290job_id: str291status: Status292snapshot_notebook_path: Optional[str]293scheduled_start_time: datetime.datetime294started_at: Optional[datetime.datetime]295finished_at: Optional[datetime.datetime]296execution_number: int297298def __init__(299self,300execution_id: str,301job_id: str,302status: Status,303scheduled_start_time: datetime.datetime,304started_at: Optional[datetime.datetime],305finished_at: Optional[datetime.datetime],306execution_number: int,307snapshot_notebook_path: Optional[str],308):309self.execution_id = execution_id310self.job_id = job_id311self.status = status312self.scheduled_start_time = scheduled_start_time313self.started_at = started_at314self.finished_at = finished_at315self.execution_number = execution_number316self.snapshot_notebook_path = snapshot_notebook_path317318@classmethod319def from_dict(cls, obj: Dict[str, Any]) -> 'Execution':320"""321Construct an Execution from a dictionary of values.322323Parameters324----------325obj : dict326Dictionary of values327328Returns329-------330:class:`Execution`331332"""333out = cls(334execution_id=obj['executionID'],335job_id=obj['jobID'],336status=Status.from_str(obj['status']),337snapshot_notebook_path=obj.get('snapshotNotebookPath'),338scheduled_start_time=to_datetime_strict(obj['scheduledStartTime']),339started_at=to_datetime(obj.get('startedAt')),340finished_at=to_datetime(obj.get('finishedAt')),341execution_number=obj['executionNumber'],342)343344return out345346def __str__(self) -> str:347"""Return string representation."""348return vars_to_str(self)349350def __repr__(self) -> str:351"""Return string representation."""352return str(self)353354355class ExecutionsData(object):356357executions: List[Execution]358metadata: ExecutionMetadata359360def __init__(361self,362executions: List[Execution],363metadata: ExecutionMetadata,364):365self.executions = executions366self.metadata = metadata367368@classmethod369def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionsData':370"""371Construct an ExecutionsData from a dictionary of values.372373Parameters374----------375obj : dict376Dictionary of values377378Returns379-------380:class:`ExecutionsData`381382"""383out = cls(384executions=[Execution.from_dict(x) for x in obj['executions']],385metadata=ExecutionMetadata.from_dict(obj['executionsMetadata']),386)387388return out389390def __str__(self) -> str:391"""Return string representation."""392return vars_to_str(self)393394def __repr__(self) -> str:395"""Return string representation."""396return str(self)397398399class ExecutionConfig(object):400401create_snapshot: bool402max_duration_in_mins: int403notebook_path: str404405def __init__(406self,407create_snapshot: bool,408max_duration_in_mins: int,409notebook_path: str,410):411self.create_snapshot = create_snapshot412self.max_duration_in_mins = max_duration_in_mins413self.notebook_path = notebook_path414415@classmethod416def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionConfig':417"""418Construct an ExecutionConfig from a dictionary of values.419420Parameters421----------422obj : dict423Dictionary of values424425Returns426-------427:class:`ExecutionConfig`428429"""430out = cls(431create_snapshot=obj['createSnapshot'],432max_duration_in_mins=obj['maxAllowedExecutionDurationInMinutes'],433notebook_path=obj['notebookPath'],434)435436return out437438def __str__(self) -> str:439"""Return string representation."""440return vars_to_str(self)441442def __repr__(self) -> str:443"""Return string representation."""444return str(self)445446447class Schedule(object):448449execution_interval_in_minutes: Optional[int]450mode: Mode451start_at: Optional[datetime.datetime]452453def __init__(454self,455execution_interval_in_minutes: Optional[int],456mode: Mode,457start_at: Optional[datetime.datetime],458):459self.execution_interval_in_minutes = execution_interval_in_minutes460self.mode = mode461self.start_at = start_at462463@classmethod464def from_dict(cls, obj: Dict[str, Any]) -> 'Schedule':465"""466Construct a Schedule from a dictionary of values.467468Parameters469----------470obj : dict471Dictionary of values472473Returns474-------475:class:`Schedule`476477"""478out = cls(479execution_interval_in_minutes=obj.get('executionIntervalInMinutes'),480mode=Mode.from_str(obj['mode']),481start_at=to_datetime(obj.get('startAt')),482)483484return out485486def __str__(self) -> str:487"""Return string representation."""488return vars_to_str(self)489490def __repr__(self) -> str:491"""Return string representation."""492return str(self)493494495class TargetConfig(object):496497database_name: Optional[str]498resume_target: bool499target_id: str500target_type: TargetType501502def __init__(503self,504database_name: Optional[str],505resume_target: bool,506target_id: str,507target_type: TargetType,508):509self.database_name = database_name510self.resume_target = resume_target511self.target_id = target_id512self.target_type = target_type513514@classmethod515def from_dict(cls, obj: Dict[str, Any]) -> 'TargetConfig':516"""517Construct a TargetConfig from a dictionary of values.518519Parameters520----------521obj : dict522Dictionary of values523524Returns525-------526:class:`TargetConfig`527528"""529out = cls(530database_name=obj.get('databaseName'),531resume_target=obj['resumeTarget'],532target_id=obj['targetID'],533target_type=TargetType.from_str(obj['targetType']),534)535536return out537538def __str__(self) -> str:539"""Return string representation."""540return vars_to_str(self)541542def __repr__(self) -> str:543"""Return string representation."""544return str(self)545546547class Job(object):548"""549Scheduled Notebook Job definition.550551This object is not directly instantiated. It is used in results552of API calls on the :class:`JobsManager`. See :meth:`JobsManager.run`.553"""554555completed_executions_count: int556created_at: datetime.datetime557description: Optional[str]558enqueued_by: str559execution_config: ExecutionConfig560job_id: str561job_metadata: List[JobMetadata]562name: Optional[str]563schedule: Schedule564target_config: Optional[TargetConfig]565terminated_at: Optional[datetime.datetime]566567def __init__(568self,569completed_executions_count: int,570created_at: datetime.datetime,571description: Optional[str],572enqueued_by: str,573execution_config: ExecutionConfig,574job_id: str,575job_metadata: List[JobMetadata],576name: Optional[str],577schedule: Schedule,578target_config: Optional[TargetConfig],579terminated_at: Optional[datetime.datetime],580):581self.completed_executions_count = completed_executions_count582self.created_at = created_at583self.description = description584self.enqueued_by = enqueued_by585self.execution_config = execution_config586self.job_id = job_id587self.job_metadata = job_metadata588self.name = name589self.schedule = schedule590self.target_config = target_config591self.terminated_at = terminated_at592self._manager: Optional[JobsManager] = None593594@classmethod595def from_dict(cls, obj: Dict[str, Any], manager: 'JobsManager') -> 'Job':596"""597Construct a Job from a dictionary of values.598599Parameters600----------601obj : dict602Dictionary of values603604Returns605-------606:class:`Job`607608"""609target_config = obj.get('targetConfig')610if target_config is not None:611target_config = TargetConfig.from_dict(target_config)612613out = cls(614completed_executions_count=obj['completedExecutionsCount'],615created_at=to_datetime_strict(obj['createdAt']),616description=obj.get('description'),617enqueued_by=obj['enqueuedBy'],618execution_config=ExecutionConfig.from_dict(obj['executionConfig']),619job_id=obj['jobID'],620job_metadata=[JobMetadata.from_dict(x) for x in obj['jobMetadata']],621name=obj.get('name'),622schedule=Schedule.from_dict(obj['schedule']),623target_config=target_config,624terminated_at=to_datetime(obj.get('terminatedAt')),625)626out._manager = manager627return out628629def wait(self, timeout: Optional[int] = None) -> bool:630"""Wait for the job to complete."""631if self._manager is None:632raise ManagementError(msg='Job not initialized with JobsManager')633return self._manager._wait_for_job(self, timeout)634635def get_executions(636self,637start_execution_number: int,638end_execution_number: int,639) -> ExecutionsData:640"""Get executions for the job."""641if self._manager is None:642raise ManagementError(msg='Job not initialized with JobsManager')643return self._manager.get_executions(644self.job_id,645start_execution_number,646end_execution_number,647)648649def get_parameters(self) -> List[Parameter]:650"""Get parameters for the job."""651if self._manager is None:652raise ManagementError(msg='Job not initialized with JobsManager')653return self._manager.get_parameters(self.job_id)654655def delete(self) -> bool:656"""Delete the job."""657if self._manager is None:658raise ManagementError(msg='Job not initialized with JobsManager')659return self._manager.delete(self.job_id)660661def __str__(self) -> str:662"""Return string representation."""663return vars_to_str(self)664665def __repr__(self) -> str:666"""Return string representation."""667return str(self)668669670class JobsManager(object):671"""672SingleStoreDB scheduled notebook jobs manager.673674This class should be instantiated using :attr:`Organization.jobs`.675676Parameters677----------678manager : WorkspaceManager, optional679The WorkspaceManager the JobsManager belongs to680681See Also682--------683:attr:`Organization.jobs`684"""685686def __init__(self, manager: Optional[Manager]):687self._manager = manager688689def schedule(690self,691notebook_path: str,692mode: Mode,693create_snapshot: bool,694name: Optional[str] = None,695description: Optional[str] = None,696execution_interval_in_minutes: Optional[int] = None,697start_at: Optional[datetime.datetime] = None,698runtime_name: Optional[str] = None,699resume_target: Optional[bool] = None,700parameters: Optional[Dict[str, Any]] = None,701) -> Job:702"""Creates and returns a scheduled notebook job."""703if self._manager is None:704raise ManagementError(msg='JobsManager not initialized')705706schedule = dict(707mode=mode.value,708) # type: Dict[str, Any]709710if start_at is not None:711schedule['startAt'] = from_datetime(start_at)712713if execution_interval_in_minutes is not None:714schedule['executionIntervalInMinutes'] = execution_interval_in_minutes715716execution_config = dict(717createSnapshot=create_snapshot,718notebookPath=notebook_path,719) # type: Dict[str, Any]720721if runtime_name is not None:722execution_config['runtimeName'] = runtime_name723724target_config = None # type: Optional[Dict[str, Any]]725database_name = get_database_name()726if database_name is not None:727target_config = dict(728databaseName=database_name,729)730731if resume_target is not None:732target_config['resumeTarget'] = resume_target733734workspace_id = get_workspace_id()735virtual_workspace_id = get_virtual_workspace_id()736cluster_id = get_cluster_id()737if virtual_workspace_id is not None:738target_config['targetID'] = virtual_workspace_id739target_config['targetType'] = TargetType.VIRTUAL_WORKSPACE.value740741elif workspace_id is not None:742target_config['targetID'] = workspace_id743target_config['targetType'] = TargetType.WORKSPACE.value744745elif cluster_id is not None:746target_config['targetID'] = cluster_id747target_config['targetType'] = TargetType.CLUSTER.value748749job_run_json = dict(750schedule=schedule,751executionConfig=execution_config,752) # type: Dict[str, Any]753754if target_config is not None:755job_run_json['targetConfig'] = target_config756757if name is not None:758job_run_json['name'] = name759760if description is not None:761job_run_json['description'] = description762763if parameters is not None:764job_run_json['parameters'] = [765dict(766name=k,767value=str(parameters[k]),768type=type_to_parameter_conversion_map[type(parameters[k])],769) for k in parameters770]771772res = self._manager._post('jobs', json=job_run_json).json()773return Job.from_dict(res, self)774775def run(776self,777notebook_path: str,778runtime_name: Optional[str] = None,779parameters: Optional[Dict[str, Any]] = None,780) -> Job:781"""Creates and returns a scheduled notebook job that runs once immediately."""782return self.schedule(783notebook_path,784Mode.ONCE,785False,786start_at=datetime.datetime.now(),787runtime_name=runtime_name,788parameters=parameters,789)790791def wait(self, jobs: List[Union[str, Job]], timeout: Optional[int] = None) -> bool:792"""Wait for jobs to finish executing."""793if timeout is not None:794if timeout <= 0:795return False796finish_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)797798for job in jobs:799if timeout is not None:800job_timeout = int((finish_time - datetime.datetime.now()).total_seconds())801else:802job_timeout = None803804res = self._wait_for_job(job, job_timeout)805if not res:806return False807808return True809810def _wait_for_job(self, job: Union[str, Job], timeout: Optional[int] = None) -> bool:811if self._manager is None:812raise ManagementError(msg='JobsManager not initialized')813814if timeout is not None:815if timeout <= 0:816return False817finish_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)818819if isinstance(job, str):820job_id = job821else:822job_id = job.job_id823824while True:825if timeout is not None and datetime.datetime.now() >= finish_time:826return False827828res = self._manager._get(f'jobs/{job_id}').json()829job = Job.from_dict(res, self)830if job.schedule.mode == Mode.ONCE and job.completed_executions_count > 0:831return True832if job.schedule.mode == Mode.RECURRING:833raise ValueError(f'Cannot wait for recurring job {job_id}')834time.sleep(5)835836def get(self, job_id: str) -> Job:837"""Get a job by its ID."""838if self._manager is None:839raise ManagementError(msg='JobsManager not initialized')840841res = self._manager._get(f'jobs/{job_id}').json()842return Job.from_dict(res, self)843844def get_executions(845self,846job_id: str,847start_execution_number: int,848end_execution_number: int,849) -> ExecutionsData:850"""Get executions for a job by its ID."""851if self._manager is None:852raise ManagementError(msg='JobsManager not initialized')853path = (854f'jobs/{job_id}/executions'855f'?start={start_execution_number}'856f'&end={end_execution_number}'857)858res = self._manager._get(path).json()859return ExecutionsData.from_dict(res)860861def get_parameters(self, job_id: str) -> List[Parameter]:862"""Get parameters for a job by its ID."""863if self._manager is None:864raise ManagementError(msg='JobsManager not initialized')865866res = self._manager._get(f'jobs/{job_id}/parameters').json()867return [Parameter.from_dict(p) for p in res]868869def delete(self, job_id: str) -> bool:870"""Delete a job by its ID."""871if self._manager is None:872raise ManagementError(msg='JobsManager not initialized')873874return self._manager._delete(f'jobs/{job_id}').json()875876def modes(self) -> Type[Mode]:877"""Get all possible job scheduling modes."""878return Mode879880def runtimes(self) -> List[Runtime]:881"""Get all available job runtimes."""882if self._manager is None:883raise ManagementError(msg='JobsManager not initialized')884885res = self._manager._get('jobs/runtimes').json()886return [Runtime.from_dict(r) for r in res]887888889