Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
singlestore-labs
GitHub Repository: singlestore-labs/singlestoredb-python
Path: blob/main/singlestoredb/management/job.py
469 views
1
#!/usr/bin/env python
2
"""SingleStoreDB Cloud Scheduled Notebook Job."""
3
import datetime
4
import time
5
from enum import Enum
6
from typing import Any
7
from typing import Dict
8
from typing import List
9
from typing import Optional
10
from typing import Type
11
from typing import Union
12
13
from ..exceptions import ManagementError
14
from .manager import Manager
15
from .utils import camel_to_snake
16
from .utils import from_datetime
17
from .utils import get_cluster_id
18
from .utils import get_database_name
19
from .utils import get_virtual_workspace_id
20
from .utils import get_workspace_id
21
from .utils import to_datetime
22
from .utils import to_datetime_strict
23
from .utils import vars_to_str
24
25
26
type_to_parameter_conversion_map = {
27
str: 'string',
28
int: 'integer',
29
float: 'float',
30
bool: 'boolean',
31
}
32
33
34
class Mode(Enum):
35
ONCE = 'Once'
36
RECURRING = 'Recurring'
37
38
@classmethod
39
def from_str(cls, s: str) -> 'Mode':
40
try:
41
return cls[str(camel_to_snake(s)).upper()]
42
except KeyError:
43
raise ValueError(f'Unknown Mode: {s}')
44
45
def __str__(self) -> str:
46
"""Return string representation."""
47
return self.value
48
49
def __repr__(self) -> str:
50
"""Return string representation."""
51
return str(self)
52
53
54
class TargetType(Enum):
55
WORKSPACE = 'Workspace'
56
CLUSTER = 'Cluster'
57
VIRTUAL_WORKSPACE = 'VirtualWorkspace'
58
59
@classmethod
60
def from_str(cls, s: str) -> 'TargetType':
61
try:
62
return cls[str(camel_to_snake(s)).upper()]
63
except KeyError:
64
raise ValueError(f'Unknown TargetType: {s}')
65
66
def __str__(self) -> str:
67
"""Return string representation."""
68
return self.value
69
70
def __repr__(self) -> str:
71
"""Return string representation."""
72
return str(self)
73
74
75
class Status(Enum):
76
UNKNOWN = 'Unknown'
77
SCHEDULED = 'Scheduled'
78
RUNNING = 'Running'
79
COMPLETED = 'Completed'
80
FAILED = 'Failed'
81
ERROR = 'Error'
82
CANCELED = 'Canceled'
83
84
@classmethod
85
def from_str(cls, s: str) -> 'Status':
86
try:
87
return cls[str(camel_to_snake(s)).upper()]
88
except KeyError:
89
return cls.UNKNOWN
90
91
def __str__(self) -> str:
92
"""Return string representation."""
93
return self.value
94
95
def __repr__(self) -> str:
96
"""Return string representation."""
97
return str(self)
98
99
100
class Parameter(object):
101
102
name: str
103
value: str
104
type: str
105
106
def __init__(
107
self,
108
name: str,
109
value: str,
110
type: str,
111
):
112
self.name = name
113
self.value = value
114
self.type = type
115
116
@classmethod
117
def from_dict(cls, obj: Dict[str, Any]) -> 'Parameter':
118
"""
119
Construct a Parameter from a dictionary of values.
120
121
Parameters
122
----------
123
obj : dict
124
Dictionary of values
125
126
Returns
127
-------
128
:class:`Parameter`
129
130
"""
131
out = cls(
132
name=obj['name'],
133
value=obj['value'],
134
type=obj['type'],
135
)
136
137
return out
138
139
def __str__(self) -> str:
140
"""Return string representation."""
141
return vars_to_str(self)
142
143
def __repr__(self) -> str:
144
"""Return string representation."""
145
return str(self)
146
147
148
class Runtime(object):
149
150
name: str
151
description: str
152
153
def __init__(
154
self,
155
name: str,
156
description: str,
157
):
158
self.name = name
159
self.description = description
160
161
@classmethod
162
def from_dict(cls, obj: Dict[str, Any]) -> 'Runtime':
163
"""
164
Construct a Runtime from a dictionary of values.
165
166
Parameters
167
----------
168
obj : dict
169
Dictionary of values
170
171
Returns
172
-------
173
:class:`Runtime`
174
175
"""
176
out = cls(
177
name=obj['name'],
178
description=obj['description'],
179
)
180
181
return out
182
183
def __str__(self) -> str:
184
"""Return string representation."""
185
return vars_to_str(self)
186
187
def __repr__(self) -> str:
188
"""Return string representation."""
189
return str(self)
190
191
192
class JobMetadata(object):
193
194
avg_duration_in_seconds: Optional[float]
195
count: int
196
max_duration_in_seconds: Optional[float]
197
status: Status
198
199
def __init__(
200
self,
201
avg_duration_in_seconds: Optional[float],
202
count: int,
203
max_duration_in_seconds: Optional[float],
204
status: Status,
205
):
206
self.avg_duration_in_seconds = avg_duration_in_seconds
207
self.count = count
208
self.max_duration_in_seconds = max_duration_in_seconds
209
self.status = status
210
211
@classmethod
212
def from_dict(cls, obj: Dict[str, Any]) -> 'JobMetadata':
213
"""
214
Construct a JobMetadata from a dictionary of values.
215
216
Parameters
217
----------
218
obj : dict
219
Dictionary of values
220
221
Returns
222
-------
223
:class:`JobMetadata`
224
225
"""
226
out = cls(
227
avg_duration_in_seconds=obj.get('avgDurationInSeconds'),
228
count=obj['count'],
229
max_duration_in_seconds=obj.get('maxDurationInSeconds'),
230
status=Status.from_str(obj['status']),
231
)
232
233
return out
234
235
def __str__(self) -> str:
236
"""Return string representation."""
237
return vars_to_str(self)
238
239
def __repr__(self) -> str:
240
"""Return string representation."""
241
return str(self)
242
243
244
class ExecutionMetadata(object):
245
246
start_execution_number: int
247
end_execution_number: int
248
249
def __init__(
250
self,
251
start_execution_number: int,
252
end_execution_number: int,
253
):
254
self.start_execution_number = start_execution_number
255
self.end_execution_number = end_execution_number
256
257
@classmethod
258
def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionMetadata':
259
"""
260
Construct an ExecutionMetadata from a dictionary of values.
261
262
Parameters
263
----------
264
obj : dict
265
Dictionary of values
266
267
Returns
268
-------
269
:class:`ExecutionMetadata`
270
271
"""
272
out = cls(
273
start_execution_number=obj['startExecutionNumber'],
274
end_execution_number=obj['endExecutionNumber'],
275
)
276
277
return out
278
279
def __str__(self) -> str:
280
"""Return string representation."""
281
return vars_to_str(self)
282
283
def __repr__(self) -> str:
284
"""Return string representation."""
285
return str(self)
286
287
288
class Execution(object):
289
290
execution_id: str
291
job_id: str
292
status: Status
293
snapshot_notebook_path: Optional[str]
294
scheduled_start_time: datetime.datetime
295
started_at: Optional[datetime.datetime]
296
finished_at: Optional[datetime.datetime]
297
execution_number: int
298
299
def __init__(
300
self,
301
execution_id: str,
302
job_id: str,
303
status: Status,
304
scheduled_start_time: datetime.datetime,
305
started_at: Optional[datetime.datetime],
306
finished_at: Optional[datetime.datetime],
307
execution_number: int,
308
snapshot_notebook_path: Optional[str],
309
):
310
self.execution_id = execution_id
311
self.job_id = job_id
312
self.status = status
313
self.scheduled_start_time = scheduled_start_time
314
self.started_at = started_at
315
self.finished_at = finished_at
316
self.execution_number = execution_number
317
self.snapshot_notebook_path = snapshot_notebook_path
318
319
@classmethod
320
def from_dict(cls, obj: Dict[str, Any]) -> 'Execution':
321
"""
322
Construct an Execution from a dictionary of values.
323
324
Parameters
325
----------
326
obj : dict
327
Dictionary of values
328
329
Returns
330
-------
331
:class:`Execution`
332
333
"""
334
out = cls(
335
execution_id=obj['executionID'],
336
job_id=obj['jobID'],
337
status=Status.from_str(obj['status']),
338
snapshot_notebook_path=obj.get('snapshotNotebookPath'),
339
scheduled_start_time=to_datetime_strict(obj['scheduledStartTime']),
340
started_at=to_datetime(obj.get('startedAt')),
341
finished_at=to_datetime(obj.get('finishedAt')),
342
execution_number=obj['executionNumber'],
343
)
344
345
return out
346
347
def __str__(self) -> str:
348
"""Return string representation."""
349
return vars_to_str(self)
350
351
def __repr__(self) -> str:
352
"""Return string representation."""
353
return str(self)
354
355
356
class ExecutionsData(object):
357
358
executions: List[Execution]
359
metadata: ExecutionMetadata
360
361
def __init__(
362
self,
363
executions: List[Execution],
364
metadata: ExecutionMetadata,
365
):
366
self.executions = executions
367
self.metadata = metadata
368
369
@classmethod
370
def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionsData':
371
"""
372
Construct an ExecutionsData from a dictionary of values.
373
374
Parameters
375
----------
376
obj : dict
377
Dictionary of values
378
379
Returns
380
-------
381
:class:`ExecutionsData`
382
383
"""
384
out = cls(
385
executions=[Execution.from_dict(x) for x in obj['executions']],
386
metadata=ExecutionMetadata.from_dict(obj['executionsMetadata']),
387
)
388
389
return out
390
391
def __str__(self) -> str:
392
"""Return string representation."""
393
return vars_to_str(self)
394
395
def __repr__(self) -> str:
396
"""Return string representation."""
397
return str(self)
398
399
400
class ExecutionConfig(object):
401
402
create_snapshot: bool
403
max_duration_in_mins: int
404
notebook_path: str
405
406
def __init__(
407
self,
408
create_snapshot: bool,
409
max_duration_in_mins: int,
410
notebook_path: str,
411
):
412
self.create_snapshot = create_snapshot
413
self.max_duration_in_mins = max_duration_in_mins
414
self.notebook_path = notebook_path
415
416
@classmethod
417
def from_dict(cls, obj: Dict[str, Any]) -> 'ExecutionConfig':
418
"""
419
Construct an ExecutionConfig from a dictionary of values.
420
421
Parameters
422
----------
423
obj : dict
424
Dictionary of values
425
426
Returns
427
-------
428
:class:`ExecutionConfig`
429
430
"""
431
out = cls(
432
create_snapshot=obj['createSnapshot'],
433
max_duration_in_mins=obj['maxAllowedExecutionDurationInMinutes'],
434
notebook_path=obj['notebookPath'],
435
)
436
437
return out
438
439
def __str__(self) -> str:
440
"""Return string representation."""
441
return vars_to_str(self)
442
443
def __repr__(self) -> str:
444
"""Return string representation."""
445
return str(self)
446
447
448
class Schedule(object):
449
450
execution_interval_in_minutes: Optional[int]
451
mode: Mode
452
start_at: Optional[datetime.datetime]
453
454
def __init__(
455
self,
456
execution_interval_in_minutes: Optional[int],
457
mode: Mode,
458
start_at: Optional[datetime.datetime],
459
):
460
self.execution_interval_in_minutes = execution_interval_in_minutes
461
self.mode = mode
462
self.start_at = start_at
463
464
@classmethod
465
def from_dict(cls, obj: Dict[str, Any]) -> 'Schedule':
466
"""
467
Construct a Schedule from a dictionary of values.
468
469
Parameters
470
----------
471
obj : dict
472
Dictionary of values
473
474
Returns
475
-------
476
:class:`Schedule`
477
478
"""
479
out = cls(
480
execution_interval_in_minutes=obj.get('executionIntervalInMinutes'),
481
mode=Mode.from_str(obj['mode']),
482
start_at=to_datetime(obj.get('startAt')),
483
)
484
485
return out
486
487
def __str__(self) -> str:
488
"""Return string representation."""
489
return vars_to_str(self)
490
491
def __repr__(self) -> str:
492
"""Return string representation."""
493
return str(self)
494
495
496
class TargetConfig(object):
497
498
database_name: Optional[str]
499
resume_target: bool
500
target_id: str
501
target_type: TargetType
502
503
def __init__(
504
self,
505
database_name: Optional[str],
506
resume_target: bool,
507
target_id: str,
508
target_type: TargetType,
509
):
510
self.database_name = database_name
511
self.resume_target = resume_target
512
self.target_id = target_id
513
self.target_type = target_type
514
515
@classmethod
516
def from_dict(cls, obj: Dict[str, Any]) -> 'TargetConfig':
517
"""
518
Construct a TargetConfig from a dictionary of values.
519
520
Parameters
521
----------
522
obj : dict
523
Dictionary of values
524
525
Returns
526
-------
527
:class:`TargetConfig`
528
529
"""
530
out = cls(
531
database_name=obj.get('databaseName'),
532
resume_target=obj['resumeTarget'],
533
target_id=obj['targetID'],
534
target_type=TargetType.from_str(obj['targetType']),
535
)
536
537
return out
538
539
def __str__(self) -> str:
540
"""Return string representation."""
541
return vars_to_str(self)
542
543
def __repr__(self) -> str:
544
"""Return string representation."""
545
return str(self)
546
547
548
class Job(object):
549
"""
550
Scheduled Notebook Job definition.
551
552
This object is not directly instantiated. It is used in results
553
of API calls on the :class:`JobsManager`. See :meth:`JobsManager.run`.
554
"""
555
556
completed_executions_count: int
557
created_at: datetime.datetime
558
description: Optional[str]
559
enqueued_by: str
560
execution_config: ExecutionConfig
561
job_id: str
562
job_metadata: List[JobMetadata]
563
name: Optional[str]
564
schedule: Schedule
565
target_config: Optional[TargetConfig]
566
terminated_at: Optional[datetime.datetime]
567
568
def __init__(
569
self,
570
completed_executions_count: int,
571
created_at: datetime.datetime,
572
description: Optional[str],
573
enqueued_by: str,
574
execution_config: ExecutionConfig,
575
job_id: str,
576
job_metadata: List[JobMetadata],
577
name: Optional[str],
578
schedule: Schedule,
579
target_config: Optional[TargetConfig],
580
terminated_at: Optional[datetime.datetime],
581
):
582
self.completed_executions_count = completed_executions_count
583
self.created_at = created_at
584
self.description = description
585
self.enqueued_by = enqueued_by
586
self.execution_config = execution_config
587
self.job_id = job_id
588
self.job_metadata = job_metadata
589
self.name = name
590
self.schedule = schedule
591
self.target_config = target_config
592
self.terminated_at = terminated_at
593
self._manager: Optional[JobsManager] = None
594
595
@classmethod
596
def from_dict(cls, obj: Dict[str, Any], manager: 'JobsManager') -> 'Job':
597
"""
598
Construct a Job from a dictionary of values.
599
600
Parameters
601
----------
602
obj : dict
603
Dictionary of values
604
605
Returns
606
-------
607
:class:`Job`
608
609
"""
610
target_config = obj.get('targetConfig')
611
if target_config is not None:
612
target_config = TargetConfig.from_dict(target_config)
613
614
out = cls(
615
completed_executions_count=obj['completedExecutionsCount'],
616
created_at=to_datetime_strict(obj['createdAt']),
617
description=obj.get('description'),
618
enqueued_by=obj['enqueuedBy'],
619
execution_config=ExecutionConfig.from_dict(obj['executionConfig']),
620
job_id=obj['jobID'],
621
job_metadata=[JobMetadata.from_dict(x) for x in obj['jobMetadata']],
622
name=obj.get('name'),
623
schedule=Schedule.from_dict(obj['schedule']),
624
target_config=target_config,
625
terminated_at=to_datetime(obj.get('terminatedAt')),
626
)
627
out._manager = manager
628
return out
629
630
def wait(self, timeout: Optional[int] = None) -> bool:
631
"""Wait for the job to complete."""
632
if self._manager is None:
633
raise ManagementError(msg='Job not initialized with JobsManager')
634
return self._manager._wait_for_job(self, timeout)
635
636
def get_executions(
637
self,
638
start_execution_number: int,
639
end_execution_number: int,
640
) -> ExecutionsData:
641
"""Get executions for the job."""
642
if self._manager is None:
643
raise ManagementError(msg='Job not initialized with JobsManager')
644
return self._manager.get_executions(
645
self.job_id,
646
start_execution_number,
647
end_execution_number,
648
)
649
650
def get_parameters(self) -> List[Parameter]:
651
"""Get parameters for the job."""
652
if self._manager is None:
653
raise ManagementError(msg='Job not initialized with JobsManager')
654
return self._manager.get_parameters(self.job_id)
655
656
def delete(self) -> bool:
657
"""Delete the job."""
658
if self._manager is None:
659
raise ManagementError(msg='Job not initialized with JobsManager')
660
return self._manager.delete(self.job_id)
661
662
def __str__(self) -> str:
663
"""Return string representation."""
664
return vars_to_str(self)
665
666
def __repr__(self) -> str:
667
"""Return string representation."""
668
return str(self)
669
670
671
class JobsManager(object):
672
"""
673
SingleStoreDB scheduled notebook jobs manager.
674
675
This class should be instantiated using :attr:`Organization.jobs`.
676
677
Parameters
678
----------
679
manager : WorkspaceManager, optional
680
The WorkspaceManager the JobsManager belongs to
681
682
See Also
683
--------
684
:attr:`Organization.jobs`
685
"""
686
687
def __init__(self, manager: Optional[Manager]):
688
self._manager = manager
689
690
def schedule(
691
self,
692
notebook_path: str,
693
mode: Mode,
694
create_snapshot: bool,
695
name: Optional[str] = None,
696
description: Optional[str] = None,
697
execution_interval_in_minutes: Optional[int] = None,
698
start_at: Optional[datetime.datetime] = None,
699
runtime_name: Optional[str] = None,
700
resume_target: Optional[bool] = None,
701
parameters: Optional[Dict[str, Any]] = None,
702
) -> Job:
703
"""Creates and returns a scheduled notebook job."""
704
if self._manager is None:
705
raise ManagementError(msg='JobsManager not initialized')
706
707
schedule = dict(
708
mode=mode.value,
709
) # type: Dict[str, Any]
710
711
if start_at is not None:
712
schedule['startAt'] = from_datetime(start_at)
713
714
if execution_interval_in_minutes is not None:
715
schedule['executionIntervalInMinutes'] = execution_interval_in_minutes
716
717
execution_config = dict(
718
createSnapshot=create_snapshot,
719
notebookPath=notebook_path,
720
) # type: Dict[str, Any]
721
722
if runtime_name is not None:
723
execution_config['runtimeName'] = runtime_name
724
725
target_config = None # type: Optional[Dict[str, Any]]
726
database_name = get_database_name()
727
if database_name is not None:
728
target_config = dict(
729
databaseName=database_name,
730
)
731
732
if resume_target is not None:
733
target_config['resumeTarget'] = resume_target
734
735
workspace_id = get_workspace_id()
736
virtual_workspace_id = get_virtual_workspace_id()
737
cluster_id = get_cluster_id()
738
if virtual_workspace_id is not None:
739
target_config['targetID'] = virtual_workspace_id
740
target_config['targetType'] = TargetType.VIRTUAL_WORKSPACE.value
741
742
elif workspace_id is not None:
743
target_config['targetID'] = workspace_id
744
target_config['targetType'] = TargetType.WORKSPACE.value
745
746
elif cluster_id is not None:
747
target_config['targetID'] = cluster_id
748
target_config['targetType'] = TargetType.CLUSTER.value
749
750
job_run_json = dict(
751
schedule=schedule,
752
executionConfig=execution_config,
753
) # type: Dict[str, Any]
754
755
if target_config is not None:
756
job_run_json['targetConfig'] = target_config
757
758
if name is not None:
759
job_run_json['name'] = name
760
761
if description is not None:
762
job_run_json['description'] = description
763
764
if parameters is not None:
765
job_run_json['parameters'] = [
766
dict(
767
name=k,
768
value=str(parameters[k]),
769
type=type_to_parameter_conversion_map[type(parameters[k])],
770
) for k in parameters
771
]
772
773
res = self._manager._post('jobs', json=job_run_json).json()
774
return Job.from_dict(res, self)
775
776
def run(
777
self,
778
notebook_path: str,
779
runtime_name: Optional[str] = None,
780
parameters: Optional[Dict[str, Any]] = None,
781
) -> Job:
782
"""Creates and returns a scheduled notebook job that runs once immediately."""
783
return self.schedule(
784
notebook_path,
785
Mode.ONCE,
786
False,
787
start_at=datetime.datetime.now(),
788
runtime_name=runtime_name,
789
parameters=parameters,
790
)
791
792
def wait(self, jobs: List[Union[str, Job]], timeout: Optional[int] = None) -> bool:
793
"""Wait for jobs to finish executing."""
794
if timeout is not None:
795
if timeout <= 0:
796
return False
797
finish_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
798
799
for job in jobs:
800
if timeout is not None:
801
job_timeout = int((finish_time - datetime.datetime.now()).total_seconds())
802
else:
803
job_timeout = None
804
805
res = self._wait_for_job(job, job_timeout)
806
if not res:
807
return False
808
809
return True
810
811
def _wait_for_job(self, job: Union[str, Job], timeout: Optional[int] = None) -> bool:
812
if self._manager is None:
813
raise ManagementError(msg='JobsManager not initialized')
814
815
if timeout is not None:
816
if timeout <= 0:
817
return False
818
finish_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
819
820
if isinstance(job, str):
821
job_id = job
822
else:
823
job_id = job.job_id
824
825
while True:
826
if timeout is not None and datetime.datetime.now() >= finish_time:
827
return False
828
829
res = self._manager._get(f'jobs/{job_id}').json()
830
job = Job.from_dict(res, self)
831
if job.schedule.mode == Mode.ONCE and job.completed_executions_count > 0:
832
return True
833
if job.schedule.mode == Mode.RECURRING:
834
raise ValueError(f'Cannot wait for recurring job {job_id}')
835
time.sleep(5)
836
837
def get(self, job_id: str) -> Job:
838
"""Get a job by its ID."""
839
if self._manager is None:
840
raise ManagementError(msg='JobsManager not initialized')
841
842
res = self._manager._get(f'jobs/{job_id}').json()
843
return Job.from_dict(res, self)
844
845
def get_executions(
846
self,
847
job_id: str,
848
start_execution_number: int,
849
end_execution_number: int,
850
) -> ExecutionsData:
851
"""Get executions for a job by its ID."""
852
if self._manager is None:
853
raise ManagementError(msg='JobsManager not initialized')
854
path = (
855
f'jobs/{job_id}/executions'
856
f'?start={start_execution_number}'
857
f'&end={end_execution_number}'
858
)
859
res = self._manager._get(path).json()
860
return ExecutionsData.from_dict(res)
861
862
def get_parameters(self, job_id: str) -> List[Parameter]:
863
"""Get parameters for a job by its ID."""
864
if self._manager is None:
865
raise ManagementError(msg='JobsManager not initialized')
866
867
res = self._manager._get(f'jobs/{job_id}/parameters').json()
868
return [Parameter.from_dict(p) for p in res]
869
870
def delete(self, job_id: str) -> bool:
871
"""Delete a job by its ID."""
872
if self._manager is None:
873
raise ManagementError(msg='JobsManager not initialized')
874
875
return self._manager._delete(f'jobs/{job_id}').json()
876
877
def modes(self) -> Type[Mode]:
878
"""Get all possible job scheduling modes."""
879
return Mode
880
881
def runtimes(self) -> List[Runtime]:
882
"""Get all available job runtimes."""
883
if self._manager is None:
884
raise ManagementError(msg='JobsManager not initialized')
885
886
res = self._manager._get('jobs/runtimes').json()
887
return [Runtime.from_dict(r) for r in res]
888
889