Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/fusion/handlers/job.py
469 views
1
#!/usr/bin/env python3
2
from typing import Any
3
from typing import Dict
4
from typing import List
5
from typing import Optional
6
from typing import Tuple
7
8
from .. import result
9
from ...management.utils import to_datetime
10
from ..handler import SQLHandler
11
from ..result import FusionSQLResult
12
from .utils import dt_isoformat
13
from .utils import get_workspace_manager
14
from singlestoredb.management.job import Mode
15
16
17
class ScheduleJobHandler(SQLHandler):
18
"""
19
SCHEDULE JOB USING NOTEBOOK notebook_path
20
with_mode
21
[ create_snapshot ]
22
[ with_runtime ]
23
[ with_name ]
24
[ with_description ]
25
[ execute_every ]
26
[ start_at ]
27
[ resume_target ]
28
[ with_parameters ]
29
;
30
31
# Path to notebook file
32
notebook_path = '<notebook-path>'
33
34
# Mode to use (either Once or Recurring)
35
with_mode = WITH MODE '<mode>'
36
37
# Create snapshot
38
create_snapshot = CREATE SNAPSHOT
39
40
# Runtime to use
41
with_runtime = WITH RUNTIME '<runtime-name>'
42
43
# Name of the job
44
with_name = WITH NAME '<job-name>'
45
46
# Description of the job
47
with_description = WITH DESCRIPTION '<job-description>'
48
49
# Execution interval
50
execute_every = EXECUTE EVERY interval time_unit
51
interval = <integer>
52
time_unit = { MINUTES | HOURS | DAYS }
53
54
# Start time
55
start_at = START AT '<year>-<month>-<day> <hour>:<min>:<sec>'
56
57
# Resume target if suspended
58
resume_target = RESUME TARGET
59
60
# Parameters to pass to the job
61
with_parameters = WITH PARAMETERS <json>
62
63
Description
64
-----------
65
Creates a scheduled notebook job.
66
67
Arguments
68
---------
69
* ``<notebook-path>``: The path in the Stage where the notebook file is
70
stored.
71
* ``<mode>``: The mode of the job. Either **Once** or **Recurring**.
72
* ``<runtime-name>``: The name of the runtime the job will be run with.
73
* ``<job-name>``: The name of the job.
74
* ``<job-description>``: The description of the job.
75
* ``<integer>``: The interval at which the job will be executed.
76
* ``<year>-<month>-<day> <hour>:<min>:<sec>``: The start date and time of the
77
job in UTC. The format is **yyyy-MM-dd HH:mm:ss**. The hour is in 24-hour format.
78
* ``<json>``: The parameters to pass to the job. A JSON object with
79
the following format: ``{"<paramName>": "<paramValue>", ...}``.
80
81
Remarks
82
-------
83
* The ``WITH MODE`` clause specifies the mode of the job and is either
84
**Once** or **Recurring**.
85
* The ``EXECUTE EVERY`` clause specifies the interval at which the job will be
86
executed. The interval can be in minutes, hours, or days. It is mandatory to
87
specify the interval if the mode is **Recurring**.
88
* The ``CREATE SNAPSHOT`` clause creates a snapshot of the notebook executed by
89
the job.
90
* The ``WITH RUNTIME`` clause specifies the name of the runtime that
91
the job will be run with.
92
* The ``RESUME TARGET`` clause resumes the job's target if it is suspended.
93
* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The
94
only supported parameter value types are strings, integers, floats, and booleans.
95
96
Example
97
-------
98
The following command creates a job that will run the content of notebook
99
**example_notebook.ipynb** every 5 minutes starting at **2024-06-25 21:35:06**
100
using the runtime **notebooks-cpu-small**. The job's target will be resumed if it
101
is suspended, a snapshot of the notebook will be created and the job is named
102
**example_job** with the description **This is an example job**. The job will
103
have the following parameters: **strParam** with value **"string"**, **intParam**
104
with value **1**, **floatParam** with value **1.0**, and **boolParam** with value
105
**true**::
106
107
SCHEDULE JOB USING NOTEBOOK 'example_notebook.ipynb'
108
WITH MODE 'Recurring'
109
CREATE SNAPSHOT
110
WITH RUNTIME 'notebooks-cpu-small'
111
WITH NAME 'example_job'
112
WITH DESCRIPTION 'This is an example job'
113
EXECUTE EVERY 5 MINUTES
114
START AT '2024-06-25 21:35:06'
115
RESUME TARGET
116
WITH PARAMETERS {
117
"strParam": "string",
118
"intParam": 1,
119
"floatParam": 1.0,
120
"boolParam": true
121
}
122
;
123
"""
124
125
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
126
res = FusionSQLResult()
127
res.add_field('JobID', result.STRING)
128
129
jobs_manager = get_workspace_manager().organizations.current.jobs
130
131
parameters = None
132
if params.get('with_parameters'):
133
parameters = {}
134
for name, value in params['with_parameters'].items():
135
parameters[name] = value
136
137
execution_interval_in_mins = None
138
if params.get('execute_every'):
139
execution_interval_in_mins = params['execute_every']['interval']
140
time_unit = params['execute_every']['time_unit'].upper()
141
if time_unit == 'MINUTES':
142
pass
143
elif time_unit == 'HOURS':
144
execution_interval_in_mins *= 60
145
elif time_unit == 'DAYS':
146
execution_interval_in_mins *= 60 * 24
147
else:
148
raise ValueError(f'Invalid time unit: {time_unit}')
149
150
job = jobs_manager.schedule(
151
notebook_path=params['notebook_path'],
152
mode=Mode.from_str(params['with_mode']),
153
runtime_name=params['with_runtime'],
154
create_snapshot=params['create_snapshot'],
155
name=params['with_name'],
156
description=params['with_description'],
157
execution_interval_in_minutes=execution_interval_in_mins,
158
start_at=to_datetime(params.get('start_at')),
159
resume_target=params['resume_target'],
160
parameters=parameters,
161
)
162
res.set_rows([(job.job_id,)])
163
164
return res
165
166
167
ScheduleJobHandler.register(overwrite=True)
168
169
170
class RunJobHandler(SQLHandler):
171
"""
172
RUN JOB USING NOTEBOOK notebook_path
173
[ with_runtime ]
174
[ with_parameters ]
175
;
176
177
# Path to notebook file
178
notebook_path = '<notebook-path>'
179
180
# Runtime to use
181
with_runtime = WITH RUNTIME '<runtime-name>'
182
183
# Parameters to pass to the job
184
with_parameters = WITH PARAMETERS <json>
185
186
Description
187
-----------
188
Creates a scheduled notebook job that runs once immediately on the specified runtime.
189
190
Arguments
191
---------
192
* ``<notebook-path>``: The path in the Stage where the notebook file is stored.
193
* ``<runtime-name>``: The name of the runtime the job will be run with.
194
* ``<json>``: The parameters to pass to the job. A JSON object with
195
the following format: ``{"<paramName>": "<paramValue>", ...}``.
196
197
Remarks
198
-------
199
* The job is run immediately after the command is executed.
200
* The ``WITH RUNTIME`` clause specifies the name of the runtime that
201
the job will be run with.
202
* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The
203
only supported parameter value types are strings, integers, floats, and booleans.
204
205
Example
206
-------
207
The following command creates a job that will run the content of notebook
208
**example_notebook.ipynb** using the runtime **notebooks-cpu-small** immediately.
209
The job will have the following parameters: **strParam** with value **"string"**,
210
**intParam** with value **1**, **floatParam** with value **1.0**, and **boolParam**
211
with value **true**::
212
213
RUN JOB USING NOTEBOOK 'example_notebook.ipynb'
214
WITH RUNTIME 'notebooks-cpu-small'
215
WITH PARAMETERS {
216
"strParam": "string",
217
"intParam": 1,
218
"floatParam": 1.0,
219
"boolParam": true
220
}
221
;
222
223
"""
224
225
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
226
res = FusionSQLResult()
227
res.add_field('JobID', result.STRING)
228
229
jobs_manager = get_workspace_manager().organizations.current.jobs
230
231
parameters = None
232
if params.get('with_parameters'):
233
parameters = {}
234
for name, value in params['with_parameters'].items():
235
parameters[name] = value
236
237
job = jobs_manager.run(
238
params['notebook_path'],
239
runtime_name=params['with_runtime'],
240
parameters=parameters,
241
)
242
res.set_rows([(job.job_id,)])
243
244
return res
245
246
247
RunJobHandler.register(overwrite=True)
248
249
250
class WaitOnJobsHandler(SQLHandler):
251
"""
252
WAIT ON JOBS job_ids
253
[ with_timeout ]
254
;
255
256
# Job IDs to wait on
257
job_ids = '<job-id>',...
258
259
# Timeout
260
with_timeout = WITH TIMEOUT time time_unit
261
time = <integer>
262
time_unit = { SECONDS | MINUTES | HOURS }
263
264
Description
265
-----------
266
Waits for the jobs with the specified IDs to complete.
267
268
Arguments
269
---------
270
* ``<job-id>``: A list of the IDs of the jobs to wait on.
271
* ``<integer>``: The number of seconds to wait for the jobs to complete.
272
273
Remarks
274
-------
275
* The ``WITH TIMEOUT`` clause specifies the time to wait for the jobs to complete.
276
The time can be in seconds, minutes, or hours.
277
278
Example
279
-------
280
The following command waits for the jobs with IDs **job1** and **job2** to complete
281
with a timeout of 60 seconds::
282
283
WAIT ON JOBS 'job1', 'job2' WITH TIMEOUT 60;
284
285
"""
286
287
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
288
res = FusionSQLResult()
289
res.add_field('Success', result.BOOL)
290
291
jobs_manager = get_workspace_manager().organizations.current.jobs
292
293
timeout_in_secs = None
294
if params.get('with_timeout'):
295
timeout_in_secs = params['with_timeout']['time']
296
time_unit = params['with_timeout']['time_unit'].upper()
297
if time_unit == 'SECONDS':
298
pass
299
elif time_unit == 'MINUTES':
300
timeout_in_secs *= 60
301
elif time_unit == 'HOURS':
302
timeout_in_secs *= 60 * 60
303
else:
304
raise ValueError(f'Invalid time unit: {time_unit}')
305
306
success = jobs_manager.wait(
307
params['job_ids'],
308
timeout=timeout_in_secs,
309
)
310
res.set_rows([(success,)])
311
312
return res
313
314
315
WaitOnJobsHandler.register(overwrite=True)
316
317
318
class ShowJobsHandler(SQLHandler):
319
"""
320
SHOW JOBS job_ids
321
[ <extended> ]
322
[ <like> ]
323
;
324
325
# Job IDs to show
326
job_ids = '<job-id>',...
327
328
Description
329
-----------
330
Shows the jobs with the specified IDs.
331
332
Arguments
333
---------
334
* ``<job-id>``: A list of the IDs of the jobs to show.
335
* ``<pattern>``: A pattern similar to SQL LIKE clause.
336
Uses ``%`` as the wildcard character.
337
338
Remarks
339
-------
340
* Use the ``LIKE`` clause to specify a pattern and return only
341
the jobs that match the specified pattern.
342
* To return more information about the jobs, use the
343
``EXTENDED`` clause.
344
345
Example
346
-------
347
The following command shows extended information on the jobs with IDs
348
**job1** and **job2** and that match the pattern **example_job_name**::
349
350
SHOW JOBS 'job1', 'job2'
351
EXTENDED
352
LIKE 'example_job_name';
353
354
"""
355
356
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
357
res = FusionSQLResult()
358
res.add_field('JobID', result.STRING)
359
res.add_field('Name', result.STRING)
360
res.add_field('CreatedAt', result.DATETIME)
361
res.add_field('EnqueuedBy', result.STRING)
362
res.add_field('CompletedExecutions', result.INTEGER)
363
res.add_field('NotebookPath', result.STRING)
364
res.add_field('DatabaseName', result.STRING)
365
res.add_field('TargetID', result.STRING)
366
res.add_field('TargetType', result.STRING)
367
368
jobs_manager = get_workspace_manager().organizations.current.jobs
369
370
jobs = []
371
for job_id in params['job_ids']:
372
jobs.append(jobs_manager.get(job_id))
373
374
if params['extended']:
375
res.add_field('Description', result.STRING)
376
res.add_field('TerminatedAt', result.DATETIME)
377
res.add_field('CreateSnapshot', result.BOOL)
378
res.add_field('MaxDurationInMins', result.INTEGER)
379
res.add_field('ExecutionIntervalInMins', result.INTEGER)
380
res.add_field('Mode', result.STRING)
381
res.add_field('StartAt', result.DATETIME)
382
res.add_field('ResumeTarget', result.BOOL)
383
384
def fields(job: Any) -> Any:
385
database_name = None
386
resume_target = None
387
target_id = None
388
target_type = None
389
if job.target_config is not None:
390
database_name = job.target_config.database_name
391
resume_target = job.target_config.resume_target
392
target_id = job.target_config.target_id
393
target_type = job.target_config.target_type.value
394
return (
395
job.job_id,
396
job.name,
397
dt_isoformat(job.created_at),
398
job.enqueued_by,
399
job.completed_executions_count,
400
job.execution_config.notebook_path,
401
database_name,
402
target_id,
403
target_type,
404
job.description,
405
dt_isoformat(job.terminated_at),
406
job.execution_config.create_snapshot,
407
job.execution_config.max_duration_in_mins,
408
job.schedule.execution_interval_in_minutes,
409
job.schedule.mode.value,
410
dt_isoformat(job.schedule.start_at),
411
resume_target,
412
)
413
else:
414
def fields(job: Any) -> Any:
415
database_name = None
416
target_id = None
417
target_type = None
418
if job.target_config is not None:
419
database_name = job.target_config.database_name
420
target_id = job.target_config.target_id
421
target_type = job.target_config.target_type.value
422
return (
423
job.job_id,
424
job.name,
425
dt_isoformat(job.created_at),
426
job.enqueued_by,
427
job.completed_executions_count,
428
job.execution_config.notebook_path,
429
database_name,
430
target_id,
431
target_type,
432
)
433
434
res.set_rows([fields(job) for job in jobs])
435
436
if params['like']:
437
res = res.like(Name=params['like'])
438
439
return res
440
441
442
ShowJobsHandler.register(overwrite=True)
443
444
445
class ShowJobExecutionsHandler(SQLHandler):
446
"""
447
SHOW JOB EXECUTIONS FOR job_id
448
from_start
449
to_end
450
[ <extended> ];
451
452
# ID of the job to show executions for
453
job_id = '<job-id>'
454
455
# From start execution number
456
from_start = FROM <integer>
457
458
# To end execution number
459
to_end = TO <integer>
460
461
Description
462
-----------
463
Shows the executions for the job with the specified ID within the specified range.
464
465
Arguments
466
---------
467
* ``<job-id>``: The ID of the job to show executions for.
468
* ``<integer>``: The execution number to start from or end at.
469
470
Remarks
471
-------
472
* Use the ``FROM`` clause to specify the execution number to start from.
473
* Use the ``TO`` clause to specify the execution number to end at.
474
* To return more information about the executions, use the
475
``EXTENDED`` clause.
476
477
Example
478
-------
479
The following command shows extended information on the executions for the job
480
with ID **job1**, from execution number **1** to **10**::
481
482
SHOW JOB EXECUTIONS FOR 'job1'
483
FROM 1 TO 10
484
EXTENDED;
485
"""
486
487
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
488
res = FusionSQLResult()
489
res.add_field('ExecutionID', result.STRING)
490
res.add_field('ExecutionNumber', result.INTEGER)
491
res.add_field('JobID', result.STRING)
492
res.add_field('Status', result.STRING)
493
res.add_field('ScheduledStartTime', result.DATETIME)
494
res.add_field('StartedAt', result.DATETIME)
495
res.add_field('FinishedAt', result.DATETIME)
496
497
jobs_manager = get_workspace_manager().organizations.current.jobs
498
499
executionsData = jobs_manager.get_executions(
500
params['job_id'],
501
params['from_start'],
502
params['to_end'],
503
)
504
505
if params['extended']:
506
res.add_field('SnapshotNotebookPath', result.STRING)
507
508
def fields(execution: Any) -> Any:
509
return (
510
execution.execution_id,
511
execution.execution_number,
512
execution.job_id,
513
execution.status.value,
514
dt_isoformat(execution.scheduled_start_time),
515
dt_isoformat(execution.started_at),
516
dt_isoformat(execution.finished_at),
517
execution.snapshot_notebook_path,
518
)
519
else:
520
def fields(execution: Any) -> Any:
521
return (
522
execution.execution_id,
523
execution.execution_number,
524
execution.job_id,
525
execution.status.value,
526
dt_isoformat(execution.scheduled_start_time),
527
dt_isoformat(execution.started_at),
528
dt_isoformat(execution.finished_at),
529
)
530
531
res.set_rows([fields(execution) for execution in executionsData.executions])
532
533
return res
534
535
536
ShowJobExecutionsHandler.register(overwrite=True)
537
538
539
class ShowJobParametersHandler(SQLHandler):
540
"""
541
SHOW JOB PARAMETERS FOR job_id;
542
543
# ID of the job to show parameters for
544
job_id = '<job-id>'
545
546
Description
547
-----------
548
Shows the parameters for the job with the specified ID.
549
550
Example
551
-------
552
The following command shows the parameters for the job with ID **job1**::
553
554
SHOW JOB PARAMETERS FOR 'job1';
555
"""
556
557
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
558
res = FusionSQLResult()
559
res.add_field('Name', result.STRING)
560
res.add_field('Value', result.STRING)
561
res.add_field('Type', result.STRING)
562
563
jobs_manager = get_workspace_manager().organizations.current.jobs
564
565
parameters = jobs_manager.get_parameters(params['job_id'])
566
567
def fields(parameter: Any) -> Any:
568
return (
569
parameter.name,
570
parameter.value,
571
parameter.type,
572
)
573
574
res.set_rows([fields(parameter) for parameter in parameters])
575
576
return res
577
578
579
ShowJobParametersHandler.register(overwrite=True)
580
581
582
class ShowJobRuntimesHandler(SQLHandler):
583
"""
584
SHOW JOB RUNTIMES;
585
586
Description
587
-----------
588
Shows the available runtimes for jobs.
589
590
Example
591
-------
592
The following command shows the available runtimes for jobs::
593
594
SHOW JOB RUNTIMES;
595
"""
596
597
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
598
res = FusionSQLResult()
599
res.add_field('Name', result.STRING)
600
res.add_field('Description', result.STRING)
601
602
jobs_manager = get_workspace_manager().organizations.current.jobs
603
604
runtimes = jobs_manager.runtimes()
605
606
def fields(runtime: Any) -> Any:
607
return (
608
runtime.name,
609
runtime.description,
610
)
611
612
res.set_rows([fields(runtime) for runtime in runtimes])
613
614
return res
615
616
617
ShowJobRuntimesHandler.register(overwrite=True)
618
619
620
class DropJobHandler(SQLHandler):
621
"""
622
DROP JOBS job_ids;
623
624
# Job IDs to drop
625
job_ids = '<job-id>',...
626
627
Description
628
-----------
629
Drops the jobs with the specified IDs.
630
631
Arguments
632
---------
633
* ``<job-id>``: A list of the IDs of the jobs to drop.
634
635
Example
636
-------
637
The following command drops the jobs with ID **job1** and **job2**::
638
639
DROP JOBS 'job1', 'job2';
640
"""
641
642
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
643
res = FusionSQLResult()
644
res.add_field('JobID', result.STRING)
645
res.add_field('Success', result.BOOL)
646
647
jobs_manager = get_workspace_manager().organizations.current.jobs
648
649
results: List[Tuple[Any, ...]] = []
650
for job_id in params['job_ids']:
651
success = jobs_manager.delete(job_id)
652
results.append((job_id, success))
653
res.set_rows(results)
654
655
return res
656
657
658
DropJobHandler.register(overwrite=True)
659
660