Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/emr/test_modify_cluster_attributes.py
1569 views
1
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
from tests.unit.customizations.emr import EMRBaseAWSCommandParamsTest as \
15
BaseAWSCommandParamsTest
16
17
18
class TestModifyClusterAttributes(BaseAWSCommandParamsTest):
19
prefix = 'emr modify-cluster-attributes'
20
21
def test_visible_to_all(self):
22
args = ' --cluster-id j-ABC123456 --visible-to-all-users'
23
cmdline = self.prefix + args
24
result = {'JobFlowIds': ['j-ABC123456'], 'VisibleToAllUsers': True}
25
self.assert_params_for_cmd(cmdline, result)
26
27
def test_no_visible_to_all(self):
28
args = ' --cluster-id j-ABC123456 --no-visible-to-all-users'
29
cmdline = self.prefix + args
30
result = {'JobFlowIds': ['j-ABC123456'], 'VisibleToAllUsers': False}
31
self.assert_params_for_cmd(cmdline, result)
32
33
def test_termination_protected(self):
34
args = ' --cluster-id j-ABC123456 --termination-protected'
35
cmdline = self.prefix + args
36
result = {'JobFlowIds': ['j-ABC123456'], 'TerminationProtected': True}
37
self.assert_params_for_cmd(cmdline, result)
38
39
def test_no_termination_protected(self):
40
args = ' --cluster-id j-ABC123456 --no-termination-protected'
41
cmdline = self.prefix + args
42
result = {'JobFlowIds': ['j-ABC123456'], 'TerminationProtected': False}
43
self.assert_params_for_cmd(cmdline, result)
44
45
def test_no_auto_terminate(self):
46
args = ' --cluster-id j-ABC123456 --no-auto-terminate'
47
cmdline = self.prefix + args
48
result = {'JobFlowIds': ['j-ABC123456'], 'KeepJobFlowAliveWhenNoSteps': True}
49
self.assert_params_for_cmd(cmdline, result)
50
51
def test_auto_terminate(self):
52
args = ' --cluster-id j-ABC123456 --auto-terminate'
53
cmdline = self.prefix + args
54
result = {'JobFlowIds': ['j-ABC123456'], 'KeepJobFlowAliveWhenNoSteps': False}
55
56
def test_unhealthy_node_replacement(self):
57
args = ' --cluster-id j-ABC123456 --unhealthy-node-replacement'
58
cmdline = self.prefix + args
59
result = {'JobFlowIds': ['j-ABC123456'], 'UnhealthyNodeReplacement': True}
60
self.assert_params_for_cmd(cmdline, result)
61
62
def test_no_unhealthy_node_replacement(self):
63
args = ' --cluster-id j-ABC123456 --no-unhealthy-node-replacement'
64
cmdline = self.prefix + args
65
result = {'JobFlowIds': ['j-ABC123456'], 'UnhealthyNodeReplacement': False}
66
self.assert_params_for_cmd(cmdline, result)
67
68
def test_visible_to_all_and_no_visible_to_all(self):
69
args = ' --cluster-id j-ABC123456 --no-visible-to-all-users'\
70
' --visible-to-all-users'
71
cmdline = self.prefix + args
72
expected_error_msg = (
73
'\naws: error: You cannot specify both --visible-to-all-users '
74
'and --no-visible-to-all-users options together.\n')
75
result = self.run_cmd(cmdline, 255)
76
self.assertEqual(expected_error_msg, result[1])
77
78
def test_temination_protected_and_no_termination_protected(self):
79
args = ' --cluster-id j-ABC123456 --no-termination-protected'\
80
' --termination-protected'
81
cmdline = self.prefix + args
82
expected_error_msg = (
83
'\naws: error: You cannot specify both --termination-protected '
84
'and --no-termination-protected options together.\n')
85
result = self.run_cmd(cmdline, 255)
86
self.assertEqual(expected_error_msg, result[1])
87
88
def test_auto_terminate_and_no_auto_terminate(self):
89
args = ' --cluster-id j-ABC123456 --auto-terminate'\
90
' --no-auto-terminate'
91
cmdline = self.prefix + args
92
expected_error_msg = (
93
'\naws: error: You cannot specify both --auto-terminate '
94
'and --no-auto-terminate options together.\n')
95
result = self.run_cmd(cmdline, 255)
96
self.assertEqual(expected_error_msg, result[1])
97
98
def test_can_set_multiple_attributes(self):
99
args = ' --cluster-id j-ABC123456 --termination-protected'\
100
' --visible-to-all-users --unhealthy-node-replacement'
101
cmdline = self.prefix + args
102
result_set_termination_protection = {
103
'JobFlowIds': ['j-ABC123456'], 'TerminationProtected': True}
104
result_set_visible_to_all_users = {
105
'JobFlowIds': ['j-ABC123456'], 'VisibleToAllUsers': True}
106
result_set_unhealty_node_replacement = {
107
'JobFlowIds': ['j-ABC123456'], 'UnhealthyNodeReplacement': True}
108
self.run_cmd(cmdline)
109
self.assertDictEqual(
110
self.operations_called[0][1], result_set_visible_to_all_users)
111
self.assertDictEqual(
112
self.operations_called[1][1], result_set_termination_protection)
113
self.assertDictEqual(
114
self.operations_called[2][1], result_set_unhealty_node_replacement)
115
116
def test_can_set_multiple_attributes_with_no(self):
117
args = ' --cluster-id j-ABC123456 --termination-protected'\
118
' --no-visible-to-all-users --unhealthy-node-replacement'
119
cmdline = self.prefix + args
120
result_set_termination_protection = {
121
'JobFlowIds': ['j-ABC123456'], 'TerminationProtected': True}
122
result_set_visible_to_all_users = {
123
'JobFlowIds': ['j-ABC123456'], 'VisibleToAllUsers': False}
124
result_set_unhealty_node_replacement = {
125
'JobFlowIds': ['j-ABC123456'], 'UnhealthyNodeReplacement': True}
126
self.run_cmd(cmdline)
127
self.assertDictEqual(
128
self.operations_called[0][1], result_set_visible_to_all_users)
129
self.assertDictEqual(
130
self.operations_called[1][1], result_set_termination_protection)
131
self.assertDictEqual(
132
self.operations_called[2][1], result_set_unhealty_node_replacement)
133
134
def test_at_least_one_option(self):
135
args = ' --cluster-id j-ABC123456'
136
cmdline = self.prefix + args
137
expected_error_msg = (
138
'\naws: error: Must specify one of the following boolean options: '
139
'--visible-to-all-users|--no-visible-to-all-users, '
140
'--termination-protected|--no-termination-protected, '
141
'--auto-terminate|--no-auto-terminate, '
142
'--unhealthy-node-replacement|--no-unhealthy-node-replacement.\n')
143
result = self.run_cmd(cmdline, 255)
144
self.assertEqual(expected_error_msg, result[1])
145
146
if __name__ == "__main__":
147
unittest.main()
148
149