Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/fusion/handlers/job.py
802 views
1
#!/usr/bin/env python3
2
from typing import Any
3
from typing import Dict
4
from typing import List
5
from typing import Optional
6
from typing import Tuple
7
8
from .. import result
9
from ...management.utils import to_datetime
10
from ..handler import SQLHandler
11
from ..result import FusionSQLResult
12
from .utils import dt_isoformat
13
from .utils import get_workspace_manager
14
from singlestoredb.management.job import Mode
15
16
17
class ScheduleJobHandler(SQLHandler):
18
"""
19
SCHEDULE JOB USING NOTEBOOK notebook_path
20
with_mode
21
[ create_snapshot ]
22
[ with_runtime ]
23
[ with_name ]
24
[ with_description ]
25
[ execute_every ]
26
[ start_at ]
27
[ resume_target ]
28
[ with_parameters ]
29
;
30
31
# Path to notebook file
32
notebook_path = '<notebook-path>'
33
34
# Mode to use (either Once or Recurring)
35
with_mode = WITH MODE '<mode>'
36
37
# Create snapshot
38
create_snapshot = CREATE SNAPSHOT
39
40
# Runtime to use
41
with_runtime = WITH RUNTIME '<runtime-name>'
42
43
# Name of the job
44
with_name = WITH NAME '<job-name>'
45
46
# Description of the job
47
with_description = WITH DESCRIPTION '<job-description>'
48
49
# Execution interval
50
execute_every = EXECUTE EVERY interval time_unit
51
interval = <integer>
52
time_unit = { MINUTES | HOURS | DAYS }
53
54
# Start time
55
start_at = START AT '<year>-<month>-<day> <hour>:<min>:<sec>'
56
57
# Resume target if suspended
58
resume_target = RESUME TARGET
59
60
# Parameters to pass to the job
61
with_parameters = WITH PARAMETERS <json>
62
63
Description
64
-----------
65
Creates a scheduled notebook job.
66
67
Arguments
68
---------
69
* ``<notebook-path>``: The path in the Stage where the notebook file is
70
stored.
71
* ``<mode>``: The mode of the job. Either **Once** or **Recurring**.
72
* ``<runtime-name>``: The name of the runtime the job will be run with.
73
* ``<job-name>``: The name of the job.
74
* ``<job-description>``: The description of the job.
75
* ``<integer>``: The interval at which the job will be executed.
76
* ``<year>-<month>-<day> <hour>:<min>:<sec>``: The start date and time of the
77
job in UTC. The format is **yyyy-MM-dd HH:mm:ss**. The hour is in 24-hour format.
78
* ``<json>``: The parameters to pass to the job. A JSON object with
79
the following format: ``{"<paramName>": "<paramValue>", ...}``.
80
81
Remarks
82
-------
83
* The ``WITH MODE`` clause specifies the mode of the job and is either
84
**Once** or **Recurring**.
85
* The ``EXECUTE EVERY`` clause specifies the interval at which the job will be
86
executed. The interval can be in minutes, hours, or days. It is mandatory to
87
specify the interval if the mode is **Recurring**.
88
* The ``CREATE SNAPSHOT`` clause creates a snapshot of the notebook executed by
89
the job.
90
* The ``WITH RUNTIME`` clause specifies the name of the runtime that
91
the job will be run with.
92
* The ``RESUME TARGET`` clause resumes the job's target if it is suspended.
93
* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The
94
only supported parameter value types are strings, integers, floats, and booleans.
95
96
Example
97
-------
98
The following command creates a job that will run the content of notebook
99
**example_notebook.ipynb** every 5 minutes starting at **2024-06-25 21:35:06**
100
using the runtime **notebooks-cpu-small**. The job's target will be resumed if it
101
is suspended, a snapshot of the notebook will be created and the job is named
102
**example_job** with the description **This is an example job**. The job will
103
have the following parameters: **strParam** with value **"string"**, **intParam**
104
with value **1**, **floatParam** with value **1.0**, and **boolParam** with value
105
**true**::
106
107
SCHEDULE JOB USING NOTEBOOK 'example_notebook.ipynb'
108
WITH MODE 'Recurring'
109
CREATE SNAPSHOT
110
WITH RUNTIME 'notebooks-cpu-small'
111
WITH NAME 'example_job'
112
WITH DESCRIPTION 'This is an example job'
113
EXECUTE EVERY 5 MINUTES
114
START AT '2024-06-25 21:35:06'
115
RESUME TARGET
116
WITH PARAMETERS {
117
"strParam": "string",
118
"intParam": 1,
119
"floatParam": 1.0,
120
"boolParam": true
121
}
122
;
123
"""
124
125
_preview = True
126
127
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
128
res = FusionSQLResult()
129
res.add_field('JobID', result.STRING)
130
131
jobs_manager = get_workspace_manager().organizations.current.jobs
132
133
parameters = None
134
if params.get('with_parameters'):
135
parameters = {}
136
for name, value in params['with_parameters'].items():
137
parameters[name] = value
138
139
execution_interval_in_mins = None
140
if params.get('execute_every'):
141
execution_interval_in_mins = params['execute_every']['interval']
142
time_unit = params['execute_every']['time_unit'].upper()
143
if time_unit == 'MINUTES':
144
pass
145
elif time_unit == 'HOURS':
146
execution_interval_in_mins *= 60
147
elif time_unit == 'DAYS':
148
execution_interval_in_mins *= 60 * 24
149
else:
150
raise ValueError(f'Invalid time unit: {time_unit}')
151
152
job = jobs_manager.schedule(
153
notebook_path=params['notebook_path'],
154
mode=Mode.from_str(params['with_mode']),
155
runtime_name=params['with_runtime'],
156
create_snapshot=params['create_snapshot'],
157
name=params['with_name'],
158
description=params['with_description'],
159
execution_interval_in_minutes=execution_interval_in_mins,
160
start_at=to_datetime(params.get('start_at')),
161
resume_target=params['resume_target'],
162
parameters=parameters,
163
)
164
res.set_rows([(job.job_id,)])
165
166
return res
167
168
169
ScheduleJobHandler.register(overwrite=True)
170
171
172
class RunJobHandler(SQLHandler):
173
"""
174
RUN JOB USING NOTEBOOK notebook_path
175
[ with_runtime ]
176
[ with_parameters ]
177
;
178
179
# Path to notebook file
180
notebook_path = '<notebook-path>'
181
182
# Runtime to use
183
with_runtime = WITH RUNTIME '<runtime-name>'
184
185
# Parameters to pass to the job
186
with_parameters = WITH PARAMETERS <json>
187
188
Description
189
-----------
190
Creates a scheduled notebook job that runs once immediately on the specified runtime.
191
192
Arguments
193
---------
194
* ``<notebook-path>``: The path in the Stage where the notebook file is stored.
195
* ``<runtime-name>``: The name of the runtime the job will be run with.
196
* ``<json>``: The parameters to pass to the job. A JSON object with
197
the following format: ``{"<paramName>": "<paramValue>", ...}``.
198
199
Remarks
200
-------
201
* The job is run immediately after the command is executed.
202
* The ``WITH RUNTIME`` clause specifies the name of the runtime that
203
the job will be run with.
204
* The ``WITH PARAMETERS`` clause specifies the parameters to pass to the job. The
205
only supported parameter value types are strings, integers, floats, and booleans.
206
207
Example
208
-------
209
The following command creates a job that will run the content of notebook
210
**example_notebook.ipynb** using the runtime **notebooks-cpu-small** immediately.
211
The job will have the following parameters: **strParam** with value **"string"**,
212
**intParam** with value **1**, **floatParam** with value **1.0**, and **boolParam**
213
with value **true**::
214
215
RUN JOB USING NOTEBOOK 'example_notebook.ipynb'
216
WITH RUNTIME 'notebooks-cpu-small'
217
WITH PARAMETERS {
218
"strParam": "string",
219
"intParam": 1,
220
"floatParam": 1.0,
221
"boolParam": true
222
}
223
;
224
225
"""
226
227
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
228
res = FusionSQLResult()
229
res.add_field('JobID', result.STRING)
230
231
jobs_manager = get_workspace_manager().organizations.current.jobs
232
233
parameters = None
234
if params.get('with_parameters'):
235
parameters = {}
236
for name, value in params['with_parameters'].items():
237
parameters[name] = value
238
239
job = jobs_manager.run(
240
params['notebook_path'],
241
runtime_name=params['with_runtime'],
242
parameters=parameters,
243
)
244
res.set_rows([(job.job_id,)])
245
246
return res
247
248
249
RunJobHandler.register(overwrite=True)
250
251
252
class WaitOnJobsHandler(SQLHandler):
253
"""
254
WAIT ON JOBS job_ids
255
[ with_timeout ]
256
;
257
258
# Job IDs to wait on
259
job_ids = '<job-id>',...
260
261
# Timeout
262
with_timeout = WITH TIMEOUT time time_unit
263
time = <integer>
264
time_unit = { SECONDS | MINUTES | HOURS }
265
266
Description
267
-----------
268
Waits for the jobs with the specified IDs to complete.
269
270
Arguments
271
---------
272
* ``<job-id>``: A list of the IDs of the jobs to wait on.
273
* ``<integer>``: The number of seconds to wait for the jobs to complete.
274
275
Remarks
276
-------
277
* The ``WITH TIMEOUT`` clause specifies the time to wait for the jobs to complete.
278
The time can be in seconds, minutes, or hours.
279
280
Example
281
-------
282
The following command waits for the jobs with IDs **job1** and **job2** to complete
283
with a timeout of 60 seconds::
284
285
WAIT ON JOBS 'job1', 'job2' WITH TIMEOUT 60;
286
287
"""
288
289
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
290
res = FusionSQLResult()
291
res.add_field('Success', result.BOOL)
292
293
jobs_manager = get_workspace_manager().organizations.current.jobs
294
295
timeout_in_secs = None
296
if params.get('with_timeout'):
297
timeout_in_secs = params['with_timeout']['time']
298
time_unit = params['with_timeout']['time_unit'].upper()
299
if time_unit == 'SECONDS':
300
pass
301
elif time_unit == 'MINUTES':
302
timeout_in_secs *= 60
303
elif time_unit == 'HOURS':
304
timeout_in_secs *= 60 * 60
305
else:
306
raise ValueError(f'Invalid time unit: {time_unit}')
307
308
success = jobs_manager.wait(
309
params['job_ids'],
310
timeout=timeout_in_secs,
311
)
312
res.set_rows([(success,)])
313
314
return res
315
316
317
WaitOnJobsHandler.register(overwrite=True)
318
319
320
class ShowJobsHandler(SQLHandler):
321
"""
322
SHOW JOBS job_ids
323
[ <extended> ]
324
[ <like> ]
325
;
326
327
# Job IDs to show
328
job_ids = '<job-id>',...
329
330
Description
331
-----------
332
Shows the jobs with the specified IDs.
333
334
Arguments
335
---------
336
* ``<job-id>``: A list of the IDs of the jobs to show.
337
* ``<pattern>``: A pattern similar to SQL LIKE clause.
338
Uses ``%`` as the wildcard character.
339
340
Remarks
341
-------
342
* Use the ``LIKE`` clause to specify a pattern and return only
343
the jobs that match the specified pattern.
344
* To return more information about the jobs, use the
345
``EXTENDED`` clause.
346
347
Example
348
-------
349
The following command shows extended information on the jobs with IDs
350
**job1** and **job2** and that match the pattern **example_job_name**::
351
352
SHOW JOBS 'job1', 'job2'
353
EXTENDED
354
LIKE 'example_job_name';
355
356
"""
357
358
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
359
res = FusionSQLResult()
360
res.add_field('JobID', result.STRING)
361
res.add_field('Name', result.STRING)
362
res.add_field('CreatedAt', result.DATETIME)
363
res.add_field('EnqueuedBy', result.STRING)
364
res.add_field('CompletedExecutions', result.INTEGER)
365
res.add_field('NotebookPath', result.STRING)
366
res.add_field('DatabaseName', result.STRING)
367
res.add_field('TargetID', result.STRING)
368
res.add_field('TargetType', result.STRING)
369
370
jobs_manager = get_workspace_manager().organizations.current.jobs
371
372
jobs = []
373
for job_id in params['job_ids']:
374
jobs.append(jobs_manager.get(job_id))
375
376
if params['extended']:
377
res.add_field('Description', result.STRING)
378
res.add_field('TerminatedAt', result.DATETIME)
379
res.add_field('CreateSnapshot', result.BOOL)
380
res.add_field('MaxDurationInMins', result.INTEGER)
381
res.add_field('ExecutionIntervalInMins', result.INTEGER)
382
res.add_field('Mode', result.STRING)
383
res.add_field('StartAt', result.DATETIME)
384
res.add_field('ResumeTarget', result.BOOL)
385
386
def fields(job: Any) -> Any:
387
database_name = None
388
resume_target = None
389
target_id = None
390
target_type = None
391
if job.target_config is not None:
392
database_name = job.target_config.database_name
393
resume_target = job.target_config.resume_target
394
target_id = job.target_config.target_id
395
target_type = job.target_config.target_type.value
396
return (
397
job.job_id,
398
job.name,
399
dt_isoformat(job.created_at),
400
job.enqueued_by,
401
job.completed_executions_count,
402
job.execution_config.notebook_path,
403
database_name,
404
target_id,
405
target_type,
406
job.description,
407
dt_isoformat(job.terminated_at),
408
job.execution_config.create_snapshot,
409
job.execution_config.max_duration_in_mins,
410
job.schedule.execution_interval_in_minutes,
411
job.schedule.mode.value,
412
dt_isoformat(job.schedule.start_at),
413
resume_target,
414
)
415
else:
416
def fields(job: Any) -> Any:
417
database_name = None
418
target_id = None
419
target_type = None
420
if job.target_config is not None:
421
database_name = job.target_config.database_name
422
target_id = job.target_config.target_id
423
target_type = job.target_config.target_type.value
424
return (
425
job.job_id,
426
job.name,
427
dt_isoformat(job.created_at),
428
job.enqueued_by,
429
job.completed_executions_count,
430
job.execution_config.notebook_path,
431
database_name,
432
target_id,
433
target_type,
434
)
435
436
res.set_rows([fields(job) for job in jobs])
437
438
if params['like']:
439
res = res.like(Name=params['like'])
440
441
return res
442
443
444
ShowJobsHandler.register(overwrite=True)
445
446
447
class ShowJobExecutionsHandler(SQLHandler):
448
"""
449
SHOW JOB EXECUTIONS FOR job_id
450
from_start
451
to_end
452
[ <extended> ];
453
454
# ID of the job to show executions for
455
job_id = '<job-id>'
456
457
# From start execution number
458
from_start = FROM <integer>
459
460
# To end execution number
461
to_end = TO <integer>
462
463
Description
464
-----------
465
Shows the executions for the job with the specified ID within the specified range.
466
467
Arguments
468
---------
469
* ``<job-id>``: The ID of the job to show executions for.
470
* ``<integer>``: The execution number to start from or end at.
471
472
Remarks
473
-------
474
* Use the ``FROM`` clause to specify the execution number to start from.
475
* Use the ``TO`` clause to specify the execution number to end at.
476
* To return more information about the executions, use the
477
``EXTENDED`` clause.
478
479
Example
480
-------
481
The following command shows extended information on the executions for the job
482
with ID **job1**, from execution number **1** to **10**::
483
484
SHOW JOB EXECUTIONS FOR 'job1'
485
FROM 1 TO 10
486
EXTENDED;
487
"""
488
489
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
490
res = FusionSQLResult()
491
res.add_field('ExecutionID', result.STRING)
492
res.add_field('ExecutionNumber', result.INTEGER)
493
res.add_field('JobID', result.STRING)
494
res.add_field('Status', result.STRING)
495
res.add_field('ScheduledStartTime', result.DATETIME)
496
res.add_field('StartedAt', result.DATETIME)
497
res.add_field('FinishedAt', result.DATETIME)
498
499
jobs_manager = get_workspace_manager().organizations.current.jobs
500
501
executionsData = jobs_manager.get_executions(
502
params['job_id'],
503
params['from_start'],
504
params['to_end'],
505
)
506
507
if params['extended']:
508
res.add_field('SnapshotNotebookPath', result.STRING)
509
510
def fields(execution: Any) -> Any:
511
return (
512
execution.execution_id,
513
execution.execution_number,
514
execution.job_id,
515
execution.status.value,
516
dt_isoformat(execution.scheduled_start_time),
517
dt_isoformat(execution.started_at),
518
dt_isoformat(execution.finished_at),
519
execution.snapshot_notebook_path,
520
)
521
else:
522
def fields(execution: Any) -> Any:
523
return (
524
execution.execution_id,
525
execution.execution_number,
526
execution.job_id,
527
execution.status.value,
528
dt_isoformat(execution.scheduled_start_time),
529
dt_isoformat(execution.started_at),
530
dt_isoformat(execution.finished_at),
531
)
532
533
res.set_rows([fields(execution) for execution in executionsData.executions])
534
535
return res
536
537
538
ShowJobExecutionsHandler.register(overwrite=True)
539
540
541
class ShowJobParametersHandler(SQLHandler):
542
"""
543
SHOW JOB PARAMETERS FOR job_id;
544
545
# ID of the job to show parameters for
546
job_id = '<job-id>'
547
548
Description
549
-----------
550
Shows the parameters for the job with the specified ID.
551
552
Example
553
-------
554
The following command shows the parameters for the job with ID **job1**::
555
556
SHOW JOB PARAMETERS FOR 'job1';
557
"""
558
559
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
560
res = FusionSQLResult()
561
res.add_field('Name', result.STRING)
562
res.add_field('Value', result.STRING)
563
res.add_field('Type', result.STRING)
564
565
jobs_manager = get_workspace_manager().organizations.current.jobs
566
567
parameters = jobs_manager.get_parameters(params['job_id'])
568
569
def fields(parameter: Any) -> Any:
570
return (
571
parameter.name,
572
parameter.value,
573
parameter.type,
574
)
575
576
res.set_rows([fields(parameter) for parameter in parameters])
577
578
return res
579
580
581
ShowJobParametersHandler.register(overwrite=True)
582
583
584
class ShowJobRuntimesHandler(SQLHandler):
585
"""
586
SHOW JOB RUNTIMES;
587
588
Description
589
-----------
590
Shows the available runtimes for jobs.
591
592
Example
593
-------
594
The following command shows the available runtimes for jobs::
595
596
SHOW JOB RUNTIMES;
597
"""
598
599
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
600
res = FusionSQLResult()
601
res.add_field('Name', result.STRING)
602
res.add_field('Description', result.STRING)
603
604
jobs_manager = get_workspace_manager().organizations.current.jobs
605
606
runtimes = jobs_manager.runtimes()
607
608
def fields(runtime: Any) -> Any:
609
return (
610
runtime.name,
611
runtime.description,
612
)
613
614
res.set_rows([fields(runtime) for runtime in runtimes])
615
616
return res
617
618
619
ShowJobRuntimesHandler.register(overwrite=True)
620
621
622
class DropJobHandler(SQLHandler):
623
"""
624
DROP JOBS job_ids;
625
626
# Job IDs to drop
627
job_ids = '<job-id>',...
628
629
Description
630
-----------
631
Drops the jobs with the specified IDs.
632
633
Arguments
634
---------
635
* ``<job-id>``: A list of the IDs of the jobs to drop.
636
637
Example
638
-------
639
The following command drops the jobs with ID **job1** and **job2**::
640
641
DROP JOBS 'job1', 'job2';
642
"""
643
644
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
645
res = FusionSQLResult()
646
res.add_field('JobID', result.STRING)
647
res.add_field('Success', result.BOOL)
648
649
jobs_manager = get_workspace_manager().organizations.current.jobs
650
651
results: List[Tuple[Any, ...]] = []
652
for job_id in params['job_ids']:
653
success = jobs_manager.delete(job_id)
654
results.append((job_id, success))
655
res.set_rows(results)
656
657
return res
658
659
660
DropJobHandler.register(overwrite=True)
661
662