Path: blob/main/singlestoredb/fusion/handlers/job.py
802 views
#!/usr/bin/env python31from typing import Any2from typing import Dict3from typing import List4from typing import Optional5from typing import Tuple67from .. import result8from ...management.utils import to_datetime9from ..handler import SQLHandler10from ..result import FusionSQLResult11from .utils import dt_isoformat12from .utils import get_workspace_manager13from singlestoredb.management.job import Mode141516class ScheduleJobHandler(SQLHandler):17"""18SCHEDULE JOB USING NOTEBOOK notebook_path19with_mode20[ create_snapshot ]21[ with_runtime ]22[ with_name ]23[ with_description ]24[ execute_every ]25[ start_at ]26[ resume_target ]27[ with_parameters ]28;2930# Path to notebook file31notebook_path = '<notebook-path>'3233# Mode to use (either Once or Recurring)34with_mode = WITH MODE '<mode>'3536# Create snapshot37create_snapshot = CREATE SNAPSHOT3839# Runtime to use40with_runtime = WITH RUNTIME '<runtime-name>'4142# Name of the job43with_name = WITH NAME '<job-name>'4445# Description of the job46with_description = WITH DESCRIPTION '<job-description>'4748# Execution interval49execute_every = EXECUTE EVERY interval time_unit50interval = <integer>51time_unit = { MINUTES | HOURS | DAYS }5253# Start time54start_at = START AT '<year>-<month>-<day> <hour>:<min>:<sec>'5556# Resume target if suspended57resume_target = RESUME TARGET5859# Parameters to pass to the job60with_parameters = WITH PARAMETERS <json>6162Description63-----------64Creates a scheduled notebook job.6566Arguments67---------68* ``<notebook-path>``: The path in the Stage where the notebook file is69stored.70* ``<mode>``: The mode of the job. Either **Once** or **Recurring**.71* ``<runtime-name>``: The name of the runtime the job will be run with.72* ``<job-name>``: The name of the job.73* ``<job-description>``: The description of the job.74* ``<integer>``: The interval at which the job will be executed.75* ``<year>-<month>-<day> <hour>:<min>:<sec>``: The start date and time of the76job in UTC. The format is **yyyy-MM-dd HH:mm:ss**. The hour is in 24-hour format.77* ``<json>``: The parameters to pass to the job. A JSON object with78the following format: ``{"<paramName>": "<paramValue>", ...}``.7980Remarks81-------82* The ``WITH MODE`` clause specifies the mode of the job and is either83**Once** or **Recurring**.84* The ``EXECUTE EVERY`` clause specifies the interval at which the job will be85executed. The interval can be in minutes, hours, or days. It is mandatory to86specify the interval if the mode is **Recurring**.87* The ``CREATE SNAPSHOT`` clause creates a snapshot of the notebook executed by88the job.89* The ``WITH RUNTIME`` clause specifies the name of the runtime that90the job will be run with.91* The ``RESUME TARGET`` clause resumes the job's target if it is suspended.92* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The93only supported parameter value types are strings, integers, floats, and booleans.9495Example96-------97The following command creates a job that will run the content of notebook98**example_notebook.ipynb** every 5 minutes starting at **2024-06-25 21:35:06**99using the runtime **notebooks-cpu-small**. The job's target will be resumed if it100is suspended, a snapshot of the notebook will be created and the job is named101**example_job** with the description **This is an example job**. The job will102have the following parameters: **strParam** with value **"string"**, **intParam**103with value **1**, **floatParam** with value **1.0**, and **boolParam** with value104**true**::105106SCHEDULE JOB USING NOTEBOOK 'example_notebook.ipynb'107WITH MODE 'Recurring'108CREATE SNAPSHOT109WITH RUNTIME 'notebooks-cpu-small'110WITH NAME 'example_job'111WITH DESCRIPTION 'This is an example job'112EXECUTE EVERY 5 MINUTES113START AT '2024-06-25 21:35:06'114RESUME TARGET115WITH PARAMETERS {116"strParam": "string",117"intParam": 1,118"floatParam": 1.0,119"boolParam": true120}121;122"""123124_preview = True125126def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:127res = FusionSQLResult()128res.add_field('JobID', result.STRING)129130jobs_manager = get_workspace_manager().organizations.current.jobs131132parameters = None133if params.get('with_parameters'):134parameters = {}135for name, value in params['with_parameters'].items():136parameters[name] = value137138execution_interval_in_mins = None139if params.get('execute_every'):140execution_interval_in_mins = params['execute_every']['interval']141time_unit = params['execute_every']['time_unit'].upper()142if time_unit == 'MINUTES':143pass144elif time_unit == 'HOURS':145execution_interval_in_mins *= 60146elif time_unit == 'DAYS':147execution_interval_in_mins *= 60 * 24148else:149raise ValueError(f'Invalid time unit: {time_unit}')150151job = jobs_manager.schedule(152notebook_path=params['notebook_path'],153mode=Mode.from_str(params['with_mode']),154runtime_name=params['with_runtime'],155create_snapshot=params['create_snapshot'],156name=params['with_name'],157description=params['with_description'],158execution_interval_in_minutes=execution_interval_in_mins,159start_at=to_datetime(params.get('start_at')),160resume_target=params['resume_target'],161parameters=parameters,162)163res.set_rows([(job.job_id,)])164165return res166167168ScheduleJobHandler.register(overwrite=True)169170171class RunJobHandler(SQLHandler):172"""173RUN JOB USING NOTEBOOK notebook_path174[ with_runtime ]175[ with_parameters ]176;177178# Path to notebook file179notebook_path = '<notebook-path>'180181# Runtime to use182with_runtime = WITH RUNTIME '<runtime-name>'183184# Parameters to pass to the job185with_parameters = WITH PARAMETERS <json>186187Description188-----------189Creates a scheduled notebook job that runs once immediately on the specified runtime.190191Arguments192---------193* ``<notebook-path>``: The path in the Stage where the notebook file is stored.194* ``<runtime-name>``: The name of the runtime the job will be run with.195* ``<json>``: The parameters to pass to the job. A JSON object with196the following format: ``{"<paramName>": "<paramValue>", ...}``.197198Remarks199-------200* The job is run immediately after the command is executed.201* The ``WITH RUNTIME`` clause specifies the name of the runtime that202the job will be run with.203* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The204only supported parameter value types are strings, integers, floats, and booleans.205206Example207-------208The following command creates a job that will run the content of notebook209**example_notebook.ipynb** using the runtime **notebooks-cpu-small** immediately.210The job will have the following parameters: **strParam** with value **"string"**,211**intParam** with value **1**, **floatParam** with value **1.0**, and **boolParam**212with value **true**::213214RUN JOB USING NOTEBOOK 'example_notebook.ipynb'215WITH RUNTIME 'notebooks-cpu-small'216WITH PARAMETERS {217"strParam": "string",218"intParam": 1,219"floatParam": 1.0,220"boolParam": true221}222;223224"""225226def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:227res = FusionSQLResult()228res.add_field('JobID', result.STRING)229230jobs_manager = get_workspace_manager().organizations.current.jobs231232parameters = None233if params.get('with_parameters'):234parameters = {}235for name, value in params['with_parameters'].items():236parameters[name] = value237238job = jobs_manager.run(239params['notebook_path'],240runtime_name=params['with_runtime'],241parameters=parameters,242)243res.set_rows([(job.job_id,)])244245return res246247248RunJobHandler.register(overwrite=True)249250251class WaitOnJobsHandler(SQLHandler):252"""253WAIT ON JOBS job_ids254[ with_timeout ]255;256257# Job IDs to wait on258job_ids = '<job-id>',...259260# Timeout261with_timeout = WITH TIMEOUT time time_unit262time = <integer>263time_unit = { SECONDS | MINUTES | HOURS }264265Description266-----------267Waits for the jobs with the specified IDs to complete.268269Arguments270---------271* ``<job-id>``: A list of the IDs of the jobs to wait on.272* ``<integer>``: The number of seconds to wait for the jobs to complete.273274Remarks275-------276* The ``WITH TIMEOUT`` clause specifies the time to wait for the jobs to complete.277The time can be in seconds, minutes, or hours.278279Example280-------281The following command waits for the jobs with IDs **job1** and **job2** to complete282with a timeout of 60 seconds::283284WAIT ON JOBS 'job1', 'job2' WITH TIMEOUT 60;285286"""287288def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:289res = FusionSQLResult()290res.add_field('Success', result.BOOL)291292jobs_manager = get_workspace_manager().organizations.current.jobs293294timeout_in_secs = None295if params.get('with_timeout'):296timeout_in_secs = params['with_timeout']['time']297time_unit = params['with_timeout']['time_unit'].upper()298if time_unit == 'SECONDS':299pass300elif time_unit == 'MINUTES':301timeout_in_secs *= 60302elif time_unit == 'HOURS':303timeout_in_secs *= 60 * 60304else:305raise ValueError(f'Invalid time unit: {time_unit}')306307success = jobs_manager.wait(308params['job_ids'],309timeout=timeout_in_secs,310)311res.set_rows([(success,)])312313return res314315316WaitOnJobsHandler.register(overwrite=True)317318319class ShowJobsHandler(SQLHandler):320"""321SHOW JOBS job_ids322[ <extended> ]323[ <like> ]324;325326# Job IDs to show327job_ids = '<job-id>',...328329Description330-----------331Shows the jobs with the specified IDs.332333Arguments334---------335* ``<job-id>``: A list of the IDs of the jobs to show.336* ``<pattern>``: A pattern similar to SQL LIKE clause.337Uses ``%`` as the wildcard character.338339Remarks340-------341* Use the ``LIKE`` clause to specify a pattern and return only342the jobs that match the specified pattern.343* To return more information about the jobs, use the344``EXTENDED`` clause.345346Example347-------348The following command shows extended information on the jobs with IDs349**job1** and **job2** and that match the pattern **example_job_name**::350351SHOW JOBS 'job1', 'job2'352EXTENDED353LIKE 'example_job_name';354355"""356357def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:358res = FusionSQLResult()359res.add_field('JobID', result.STRING)360res.add_field('Name', result.STRING)361res.add_field('CreatedAt', result.DATETIME)362res.add_field('EnqueuedBy', result.STRING)363res.add_field('CompletedExecutions', result.INTEGER)364res.add_field('NotebookPath', result.STRING)365res.add_field('DatabaseName', result.STRING)366res.add_field('TargetID', result.STRING)367res.add_field('TargetType', result.STRING)368369jobs_manager = get_workspace_manager().organizations.current.jobs370371jobs = []372for job_id in params['job_ids']:373jobs.append(jobs_manager.get(job_id))374375if params['extended']:376res.add_field('Description', result.STRING)377res.add_field('TerminatedAt', result.DATETIME)378res.add_field('CreateSnapshot', result.BOOL)379res.add_field('MaxDurationInMins', result.INTEGER)380res.add_field('ExecutionIntervalInMins', result.INTEGER)381res.add_field('Mode', result.STRING)382res.add_field('StartAt', result.DATETIME)383res.add_field('ResumeTarget', result.BOOL)384385def fields(job: Any) -> Any:386database_name = None387resume_target = None388target_id = None389target_type = None390if job.target_config is not None:391database_name = job.target_config.database_name392resume_target = job.target_config.resume_target393target_id = job.target_config.target_id394target_type = job.target_config.target_type.value395return (396job.job_id,397job.name,398dt_isoformat(job.created_at),399job.enqueued_by,400job.completed_executions_count,401job.execution_config.notebook_path,402database_name,403target_id,404target_type,405job.description,406dt_isoformat(job.terminated_at),407job.execution_config.create_snapshot,408job.execution_config.max_duration_in_mins,409job.schedule.execution_interval_in_minutes,410job.schedule.mode.value,411dt_isoformat(job.schedule.start_at),412resume_target,413)414else:415def fields(job: Any) -> Any:416database_name = None417target_id = None418target_type = None419if job.target_config is not None:420database_name = job.target_config.database_name421target_id = job.target_config.target_id422target_type = job.target_config.target_type.value423return (424job.job_id,425job.name,426dt_isoformat(job.created_at),427job.enqueued_by,428job.completed_executions_count,429job.execution_config.notebook_path,430database_name,431target_id,432target_type,433)434435res.set_rows([fields(job) for job in jobs])436437if params['like']:438res = res.like(Name=params['like'])439440return res441442443ShowJobsHandler.register(overwrite=True)444445446class ShowJobExecutionsHandler(SQLHandler):447"""448SHOW JOB EXECUTIONS FOR job_id449from_start450to_end451[ <extended> ];452453# ID of the job to show executions for454job_id = '<job-id>'455456# From start execution number457from_start = FROM <integer>458459# To end execution number460to_end = TO <integer>461462Description463-----------464Shows the executions for the job with the specified ID within the specified range.465466Arguments467---------468* ``<job-id>``: The ID of the job to show executions for.469* ``<integer>``: The execution number to start from or end at.470471Remarks472-------473* Use the ``FROM`` clause to specify the execution number to start from.474* Use the ``TO`` clause to specify the execution number to end at.475* To return more information about the executions, use the476``EXTENDED`` clause.477478Example479-------480The following command shows extended information on the executions for the job481with ID **job1**, from execution number **1** to **10**::482483SHOW JOB EXECUTIONS FOR 'job1'484FROM 1 TO 10485EXTENDED;486"""487488def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:489res = FusionSQLResult()490res.add_field('ExecutionID', result.STRING)491res.add_field('ExecutionNumber', result.INTEGER)492res.add_field('JobID', result.STRING)493res.add_field('Status', result.STRING)494res.add_field('ScheduledStartTime', result.DATETIME)495res.add_field('StartedAt', result.DATETIME)496res.add_field('FinishedAt', result.DATETIME)497498jobs_manager = get_workspace_manager().organizations.current.jobs499500executionsData = jobs_manager.get_executions(501params['job_id'],502params['from_start'],503params['to_end'],504)505506if params['extended']:507res.add_field('SnapshotNotebookPath', result.STRING)508509def fields(execution: Any) -> Any:510return (511execution.execution_id,512execution.execution_number,513execution.job_id,514execution.status.value,515dt_isoformat(execution.scheduled_start_time),516dt_isoformat(execution.started_at),517dt_isoformat(execution.finished_at),518execution.snapshot_notebook_path,519)520else:521def fields(execution: Any) -> Any:522return (523execution.execution_id,524execution.execution_number,525execution.job_id,526execution.status.value,527dt_isoformat(execution.scheduled_start_time),528dt_isoformat(execution.started_at),529dt_isoformat(execution.finished_at),530)531532res.set_rows([fields(execution) for execution in executionsData.executions])533534return res535536537ShowJobExecutionsHandler.register(overwrite=True)538539540class ShowJobParametersHandler(SQLHandler):541"""542SHOW JOB PARAMETERS FOR job_id;543544# ID of the job to show parameters for545job_id = '<job-id>'546547Description548-----------549Shows the parameters for the job with the specified ID.550551Example552-------553The following command shows the parameters for the job with ID **job1**::554555SHOW JOB PARAMETERS FOR 'job1';556"""557558def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:559res = FusionSQLResult()560res.add_field('Name', result.STRING)561res.add_field('Value', result.STRING)562res.add_field('Type', result.STRING)563564jobs_manager = get_workspace_manager().organizations.current.jobs565566parameters = jobs_manager.get_parameters(params['job_id'])567568def fields(parameter: Any) -> Any:569return (570parameter.name,571parameter.value,572parameter.type,573)574575res.set_rows([fields(parameter) for parameter in parameters])576577return res578579580ShowJobParametersHandler.register(overwrite=True)581582583class ShowJobRuntimesHandler(SQLHandler):584"""585SHOW JOB RUNTIMES;586587Description588-----------589Shows the available runtimes for jobs.590591Example592-------593The following command shows the available runtimes for jobs::594595SHOW JOB RUNTIMES;596"""597598def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:599res = FusionSQLResult()600res.add_field('Name', result.STRING)601res.add_field('Description', result.STRING)602603jobs_manager = get_workspace_manager().organizations.current.jobs604605runtimes = jobs_manager.runtimes()606607def fields(runtime: Any) -> Any:608return (609runtime.name,610runtime.description,611)612613res.set_rows([fields(runtime) for runtime in runtimes])614615return res616617618ShowJobRuntimesHandler.register(overwrite=True)619620621class DropJobHandler(SQLHandler):622"""623DROP JOBS job_ids;624625# Job IDs to drop626job_ids = '<job-id>',...627628Description629-----------630Drops the jobs with the specified IDs.631632Arguments633---------634* ``<job-id>``: A list of the IDs of the jobs to drop.635636Example637-------638The following command drops the jobs with ID **job1** and **job2**::639640DROP JOBS 'job1', 'job2';641"""642643def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:644res = FusionSQLResult()645res.add_field('JobID', result.STRING)646res.add_field('Success', result.BOOL)647648jobs_manager = get_workspace_manager().organizations.current.jobs649650results: List[Tuple[Any, ...]] = []651for job_id in params['job_ids']:652success = jobs_manager.delete(job_id)653results.append((job_id, success))654res.set_rows(results)655656return res657658659DropJobHandler.register(overwrite=True)660661662