Path: blob/main/singlestoredb/fusion/handlers/job.py
469 views
#!/usr/bin/env python31from typing import Any2from typing import Dict3from typing import List4from typing import Optional5from typing import Tuple67from .. import result8from ...management.utils import to_datetime9from ..handler import SQLHandler10from ..result import FusionSQLResult11from .utils import dt_isoformat12from .utils import get_workspace_manager13from singlestoredb.management.job import Mode141516class ScheduleJobHandler(SQLHandler):17"""18SCHEDULE JOB USING NOTEBOOK notebook_path19with_mode20[ create_snapshot ]21[ with_runtime ]22[ with_name ]23[ with_description ]24[ execute_every ]25[ start_at ]26[ resume_target ]27[ with_parameters ]28;2930# Path to notebook file31notebook_path = '<notebook-path>'3233# Mode to use (either Once or Recurring)34with_mode = WITH MODE '<mode>'3536# Create snapshot37create_snapshot = CREATE SNAPSHOT3839# Runtime to use40with_runtime = WITH RUNTIME '<runtime-name>'4142# Name of the job43with_name = WITH NAME '<job-name>'4445# Description of the job46with_description = WITH DESCRIPTION '<job-description>'4748# Execution interval49execute_every = EXECUTE EVERY interval time_unit50interval = <integer>51time_unit = { MINUTES | HOURS | DAYS }5253# Start time54start_at = START AT '<year>-<month>-<day> <hour>:<min>:<sec>'5556# Resume target if suspended57resume_target = RESUME TARGET5859# Parameters to pass to the job60with_parameters = WITH PARAMETERS <json>6162Description63-----------64Creates a scheduled notebook job.6566Arguments67---------68* ``<notebook-path>``: The path in the Stage where the notebook file is69stored.70* ``<mode>``: The mode of the job. Either **Once** or **Recurring**.71* ``<runtime-name>``: The name of the runtime the job will be run with.72* ``<job-name>``: The name of the job.73* ``<job-description>``: The description of the job.74* ``<integer>``: The interval at which the job will be executed.75* ``<year>-<month>-<day> <hour>:<min>:<sec>``: The start date and time of the76job in UTC. The format is **yyyy-MM-dd HH:mm:ss**. The hour is in 24-hour format.77* ``<json>``: The parameters to pass to the job. A JSON object with78the following format: ``{"<paramName>": "<paramValue>", ...}``.7980Remarks81-------82* The ``WITH MODE`` clause specifies the mode of the job and is either83**Once** or **Recurring**.84* The ``EXECUTE EVERY`` clause specifies the interval at which the job will be85executed. The interval can be in minutes, hours, or days. It is mandatory to86specify the interval if the mode is **Recurring**.87* The ``CREATE SNAPSHOT`` clause creates a snapshot of the notebook executed by88the job.89* The ``WITH RUNTIME`` clause specifies the name of the runtime that90the job will be run with.91* The ``RESUME TARGET`` clause resumes the job's target if it is suspended.92* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The93only supported parameter value types are strings, integers, floats, and booleans.9495Example96-------97The following command creates a job that will run the content of notebook98**example_notebook.ipynb** every 5 minutes starting at **2024-06-25 21:35:06**99using the runtime **notebooks-cpu-small**. The job's target will be resumed if it100is suspended, a snapshot of the notebook will be created and the job is named101**example_job** with the description **This is an example job**. The job will102have the following parameters: **strParam** with value **"string"**, **intParam**103with value **1**, **floatParam** with value **1.0**, and **boolParam** with value104**true**::105106SCHEDULE JOB USING NOTEBOOK 'example_notebook.ipynb'107WITH MODE 'Recurring'108CREATE SNAPSHOT109WITH RUNTIME 'notebooks-cpu-small'110WITH NAME 'example_job'111WITH DESCRIPTION 'This is an example job'112EXECUTE EVERY 5 MINUTES113START AT '2024-06-25 21:35:06'114RESUME TARGET115WITH PARAMETERS {116"strParam": "string",117"intParam": 1,118"floatParam": 1.0,119"boolParam": true120}121;122"""123124def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:125res = FusionSQLResult()126res.add_field('JobID', result.STRING)127128jobs_manager = get_workspace_manager().organizations.current.jobs129130parameters = None131if params.get('with_parameters'):132parameters = {}133for name, value in params['with_parameters'].items():134parameters[name] = value135136execution_interval_in_mins = None137if params.get('execute_every'):138execution_interval_in_mins = params['execute_every']['interval']139time_unit = params['execute_every']['time_unit'].upper()140if time_unit == 'MINUTES':141pass142elif time_unit == 'HOURS':143execution_interval_in_mins *= 60144elif time_unit == 'DAYS':145execution_interval_in_mins *= 60 * 24146else:147raise ValueError(f'Invalid time unit: {time_unit}')148149job = jobs_manager.schedule(150notebook_path=params['notebook_path'],151mode=Mode.from_str(params['with_mode']),152runtime_name=params['with_runtime'],153create_snapshot=params['create_snapshot'],154name=params['with_name'],155description=params['with_description'],156execution_interval_in_minutes=execution_interval_in_mins,157start_at=to_datetime(params.get('start_at')),158resume_target=params['resume_target'],159parameters=parameters,160)161res.set_rows([(job.job_id,)])162163return res164165166ScheduleJobHandler.register(overwrite=True)167168169class RunJobHandler(SQLHandler):170"""171RUN JOB USING NOTEBOOK notebook_path172[ with_runtime ]173[ with_parameters ]174;175176# Path to notebook file177notebook_path = '<notebook-path>'178179# Runtime to use180with_runtime = WITH RUNTIME '<runtime-name>'181182# Parameters to pass to the job183with_parameters = WITH PARAMETERS <json>184185Description186-----------187Creates a scheduled notebook job that runs once immediately on the specified runtime.188189Arguments190---------191* ``<notebook-path>``: The path in the Stage where the notebook file is stored.192* ``<runtime-name>``: The name of the runtime the job will be run with.193* ``<json>``: The parameters to pass to the job. A JSON object with194the following format: ``{"<paramName>": "<paramValue>", ...}``.195196Remarks197-------198* The job is run immediately after the command is executed.199* The ``WITH RUNTIME`` clause specifies the name of the runtime that200the job will be run with.201* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The202only supported parameter value types are strings, integers, floats, and booleans.203204Example205-------206The following command creates a job that will run the content of notebook207**example_notebook.ipynb** using the runtime **notebooks-cpu-small** immediately.208The job will have the following parameters: **strParam** with value **"string"**,209**intParam** with value **1**, **floatParam** with value **1.0**, and **boolParam**210with value **true**::211212RUN JOB USING NOTEBOOK 'example_notebook.ipynb'213WITH RUNTIME 'notebooks-cpu-small'214WITH PARAMETERS {215"strParam": "string",216"intParam": 1,217"floatParam": 1.0,218"boolParam": true219}220;221222"""223224def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:225res = FusionSQLResult()226res.add_field('JobID', result.STRING)227228jobs_manager = get_workspace_manager().organizations.current.jobs229230parameters = None231if params.get('with_parameters'):232parameters = {}233for name, value in params['with_parameters'].items():234parameters[name] = value235236job = jobs_manager.run(237params['notebook_path'],238runtime_name=params['with_runtime'],239parameters=parameters,240)241res.set_rows([(job.job_id,)])242243return res244245246RunJobHandler.register(overwrite=True)247248249class WaitOnJobsHandler(SQLHandler):250"""251WAIT ON JOBS job_ids252[ with_timeout ]253;254255# Job IDs to wait on256job_ids = '<job-id>',...257258# Timeout259with_timeout = WITH TIMEOUT time time_unit260time = <integer>261time_unit = { SECONDS | MINUTES | HOURS }262263Description264-----------265Waits for the jobs with the specified IDs to complete.266267Arguments268---------269* ``<job-id>``: A list of the IDs of the jobs to wait on.270* ``<integer>``: The number of seconds to wait for the jobs to complete.271272Remarks273-------274* The ``WITH TIMEOUT`` clause specifies the time to wait for the jobs to complete.275The time can be in seconds, minutes, or hours.276277Example278-------279The following command waits for the jobs with IDs **job1** and **job2** to complete280with a timeout of 60 seconds::281282WAIT ON JOBS 'job1', 'job2' WITH TIMEOUT 60;283284"""285286def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:287res = FusionSQLResult()288res.add_field('Success', result.BOOL)289290jobs_manager = get_workspace_manager().organizations.current.jobs291292timeout_in_secs = None293if params.get('with_timeout'):294timeout_in_secs = params['with_timeout']['time']295time_unit = params['with_timeout']['time_unit'].upper()296if time_unit == 'SECONDS':297pass298elif time_unit == 'MINUTES':299timeout_in_secs *= 60300elif time_unit == 'HOURS':301timeout_in_secs *= 60 * 60302else:303raise ValueError(f'Invalid time unit: {time_unit}')304305success = jobs_manager.wait(306params['job_ids'],307timeout=timeout_in_secs,308)309res.set_rows([(success,)])310311return res312313314WaitOnJobsHandler.register(overwrite=True)315316317class ShowJobsHandler(SQLHandler):318"""319SHOW JOBS job_ids320[ <extended> ]321[ <like> ]322;323324# Job IDs to show325job_ids = '<job-id>',...326327Description328-----------329Shows the jobs with the specified IDs.330331Arguments332---------333* ``<job-id>``: A list of the IDs of the jobs to show.334* ``<pattern>``: A pattern similar to SQL LIKE clause.335Uses ``%`` as the wildcard character.336337Remarks338-------339* Use the ``LIKE`` clause to specify a pattern and return only340the jobs that match the specified pattern.341* To return more information about the jobs, use the342``EXTENDED`` clause.343344Example345-------346The following command shows extended information on the jobs with IDs347**job1** and **job2** and that match the pattern **example_job_name**::348349SHOW JOBS 'job1', 'job2'350EXTENDED351LIKE 'example_job_name';352353"""354355def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:356res = FusionSQLResult()357res.add_field('JobID', result.STRING)358res.add_field('Name', result.STRING)359res.add_field('CreatedAt', result.DATETIME)360res.add_field('EnqueuedBy', result.STRING)361res.add_field('CompletedExecutions', result.INTEGER)362res.add_field('NotebookPath', result.STRING)363res.add_field('DatabaseName', result.STRING)364res.add_field('TargetID', result.STRING)365res.add_field('TargetType', result.STRING)366367jobs_manager = get_workspace_manager().organizations.current.jobs368369jobs = []370for job_id in params['job_ids']:371jobs.append(jobs_manager.get(job_id))372373if params['extended']:374res.add_field('Description', result.STRING)375res.add_field('TerminatedAt', result.DATETIME)376res.add_field('CreateSnapshot', result.BOOL)377res.add_field('MaxDurationInMins', result.INTEGER)378res.add_field('ExecutionIntervalInMins', result.INTEGER)379res.add_field('Mode', result.STRING)380res.add_field('StartAt', result.DATETIME)381res.add_field('ResumeTarget', result.BOOL)382383def fields(job: Any) -> Any:384database_name = None385resume_target = None386target_id = None387target_type = None388if job.target_config is not None:389database_name = job.target_config.database_name390resume_target = job.target_config.resume_target391target_id = job.target_config.target_id392target_type = job.target_config.target_type.value393return (394job.job_id,395job.name,396dt_isoformat(job.created_at),397job.enqueued_by,398job.completed_executions_count,399job.execution_config.notebook_path,400database_name,401target_id,402target_type,403job.description,404dt_isoformat(job.terminated_at),405job.execution_config.create_snapshot,406job.execution_config.max_duration_in_mins,407job.schedule.execution_interval_in_minutes,408job.schedule.mode.value,409dt_isoformat(job.schedule.start_at),410resume_target,411)412else:413def fields(job: Any) -> Any:414database_name = None415target_id = None416target_type = None417if job.target_config is not None:418database_name = job.target_config.database_name419target_id = job.target_config.target_id420target_type = job.target_config.target_type.value421return (422job.job_id,423job.name,424dt_isoformat(job.created_at),425job.enqueued_by,426job.completed_executions_count,427job.execution_config.notebook_path,428database_name,429target_id,430target_type,431)432433res.set_rows([fields(job) for job in jobs])434435if params['like']:436res = res.like(Name=params['like'])437438return res439440441ShowJobsHandler.register(overwrite=True)442443444class ShowJobExecutionsHandler(SQLHandler):445"""446SHOW JOB EXECUTIONS FOR job_id447from_start448to_end449[ <extended> ];450451# ID of the job to show executions for452job_id = '<job-id>'453454# From start execution number455from_start = FROM <integer>456457# To end execution number458to_end = TO <integer>459460Description461-----------462Shows the executions for the job with the specified ID within the specified range.463464Arguments465---------466* ``<job-id>``: The ID of the job to show executions for.467* ``<integer>``: The execution number to start from or end at.468469Remarks470-------471* Use the ``FROM`` clause to specify the execution number to start from.472* Use the ``TO`` clause to specify the execution number to end at.473* To return more information about the executions, use the474``EXTENDED`` clause.475476Example477-------478The following command shows extended information on the executions for the job479with ID **job1**, from execution number **1** to **10**::480481SHOW JOB EXECUTIONS FOR 'job1'482FROM 1 TO 10483EXTENDED;484"""485486def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:487res = FusionSQLResult()488res.add_field('ExecutionID', result.STRING)489res.add_field('ExecutionNumber', result.INTEGER)490res.add_field('JobID', result.STRING)491res.add_field('Status', result.STRING)492res.add_field('ScheduledStartTime', result.DATETIME)493res.add_field('StartedAt', result.DATETIME)494res.add_field('FinishedAt', result.DATETIME)495496jobs_manager = get_workspace_manager().organizations.current.jobs497498executionsData = jobs_manager.get_executions(499params['job_id'],500params['from_start'],501params['to_end'],502)503504if params['extended']:505res.add_field('SnapshotNotebookPath', result.STRING)506507def fields(execution: Any) -> Any:508return (509execution.execution_id,510execution.execution_number,511execution.job_id,512execution.status.value,513dt_isoformat(execution.scheduled_start_time),514dt_isoformat(execution.started_at),515dt_isoformat(execution.finished_at),516execution.snapshot_notebook_path,517)518else:519def fields(execution: Any) -> Any:520return (521execution.execution_id,522execution.execution_number,523execution.job_id,524execution.status.value,525dt_isoformat(execution.scheduled_start_time),526dt_isoformat(execution.started_at),527dt_isoformat(execution.finished_at),528)529530res.set_rows([fields(execution) for execution in executionsData.executions])531532return res533534535ShowJobExecutionsHandler.register(overwrite=True)536537538class ShowJobParametersHandler(SQLHandler):539"""540SHOW JOB PARAMETERS FOR job_id;541542# ID of the job to show parameters for543job_id = '<job-id>'544545Description546-----------547Shows the parameters for the job with the specified ID.548549Example550-------551The following command shows the parameters for the job with ID **job1**::552553SHOW JOB PARAMETERS FOR 'job1';554"""555556def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:557res = FusionSQLResult()558res.add_field('Name', result.STRING)559res.add_field('Value', result.STRING)560res.add_field('Type', result.STRING)561562jobs_manager = get_workspace_manager().organizations.current.jobs563564parameters = jobs_manager.get_parameters(params['job_id'])565566def fields(parameter: Any) -> Any:567return (568parameter.name,569parameter.value,570parameter.type,571)572573res.set_rows([fields(parameter) for parameter in parameters])574575return res576577578ShowJobParametersHandler.register(overwrite=True)579580581class ShowJobRuntimesHandler(SQLHandler):582"""583SHOW JOB RUNTIMES;584585Description586-----------587Shows the available runtimes for jobs.588589Example590-------591The following command shows the available runtimes for jobs::592593SHOW JOB RUNTIMES;594"""595596def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:597res = FusionSQLResult()598res.add_field('Name', result.STRING)599res.add_field('Description', result.STRING)600601jobs_manager = get_workspace_manager().organizations.current.jobs602603runtimes = jobs_manager.runtimes()604605def fields(runtime: Any) -> Any:606return (607runtime.name,608runtime.description,609)610611res.set_rows([fields(runtime) for runtime in runtimes])612613return res614615616ShowJobRuntimesHandler.register(overwrite=True)617618619class DropJobHandler(SQLHandler):620"""621DROP JOBS job_ids;622623# Job IDs to drop624job_ids = '<job-id>',...625626Description627-----------628Drops the jobs with the specified IDs.629630Arguments631---------632* ``<job-id>``: A list of the IDs of the jobs to drop.633634Example635-------636The following command drops the jobs with ID **job1** and **job2**::637638DROP JOBS 'job1', 'job2';639"""640641def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:642res = FusionSQLResult()643res.add_field('JobID', result.STRING)644res.add_field('Success', result.BOOL)645646jobs_manager = get_workspace_manager().organizations.current.jobs647648results: List[Tuple[Any, ...]] = []649for job_id in params['job_ids']:650success = jobs_manager.delete(job_id)651results.append((job_id, success))652res.set_rows(results)653654return res655656657DropJobHandler.register(overwrite=True)658659660