Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/emrcontainers/test_update_assume_role_policy.py
1569 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import copy
15
import json
16
17
from awscli.testutils import BaseAWSCommandParamsTest, mock, unittest
18
from awscli.customizations.emrcontainers.base36 import Base36
19
from awscli.customizations.emrcontainers.constants \
20
import TRUST_POLICY_STATEMENT_FORMAT, \
21
TRUST_POLICY_STATEMENT_ALREADY_EXISTS, \
22
TRUST_POLICY_UPDATE_SUCCESSFUL
23
24
25
def json_matches(first, second):
26
return json.dumps(first, sort_keys=True) == json.dumps(second,
27
sort_keys=True)
28
29
30
class TestUpdateAssumeRolePolicy(BaseAWSCommandParamsTest):
31
cluster_name = 'test-cluster'
32
namespace = 'test'
33
role_name = 'myrole'
34
account_id = '123456789012'
35
oidc_provider = 'oidc-provider/id/test'
36
aws_partition = 'aws'
37
aws_cn_partition = 'aws-cn'
38
39
base36_encoded_role_name = Base36().encode(role_name)
40
expected_statement = TRUST_POLICY_STATEMENT_FORMAT % {
41
"AWS_ACCOUNT_ID": account_id,
42
"OIDC_PROVIDER": oidc_provider,
43
"NAMESPACE": namespace,
44
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
45
"AWS_PARTITION": aws_partition
46
}
47
48
expected_statement_cn = TRUST_POLICY_STATEMENT_FORMAT % {
49
"AWS_ACCOUNT_ID": account_id,
50
"OIDC_PROVIDER": oidc_provider,
51
"NAMESPACE": namespace,
52
"BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name,
53
"AWS_PARTITION": aws_cn_partition
54
}
55
56
def setUp(self):
57
super(TestUpdateAssumeRolePolicy, self).setUp()
58
59
self.command = (
60
'emr-containers update-role-trust-policy --cluster-name=%s '
61
'--namespace=%s --role-name=%s' % (
62
self.cluster_name, self.namespace, self.role_name)
63
)
64
self.policy_document = {
65
"Version": "2012-10-17",
66
"Statement": [
67
{
68
"Effect": "Allow",
69
"Principal": {
70
"AWS": "arn:aws:iam::123456789012:root"
71
},
72
"Action": "sts:AssumeRole"
73
}
74
]
75
}
76
77
self.expected_policy_document = copy.deepcopy(self.policy_document)
78
self.expected_policy_document.get("Statement").append(
79
json.loads(self.expected_statement))
80
81
self.expected_policy_document_cn = copy.deepcopy(self.policy_document)
82
self.expected_policy_document_cn.get("Statement").append(
83
json.loads(self.expected_statement_cn))
84
85
# Assert the call to update trust policy of the role
86
def assert_trust_policy_updated(self, cmd_output,
87
expected_policy_document=None):
88
if expected_policy_document is None:
89
expected_policy_document = self.expected_policy_document
90
91
self.assertTrue(TRUST_POLICY_UPDATE_SUCCESSFUL % self.role_name
92
in cmd_output)
93
94
# Check if UpdateAssumeRolePolicy was invoked
95
self.assertEqual(len(self.operations_called), 1)
96
self.assertEqual(self.operations_called[0][0].name,
97
'UpdateAssumeRolePolicy')
98
self.assertEqual(self.operations_called[0][1]['RoleName'],
99
self.role_name)
100
101
self.assertTrue(json_matches(json.loads(
102
self.operations_called[0][1]['PolicyDocument']),
103
expected_policy_document))
104
105
# Use case: Expected trust policy does not exist
106
# Expected results: Operation is performed by client
107
# to update the trust policy in expected format
108
@mock.patch('awscli.customizations.emrcontainers.'
109
'iam.IAM.get_assume_role_policy')
110
@mock.patch('awscli.customizations.emrcontainers.'
111
'eks.EKS.get_oidc_issuer_id')
112
@mock.patch('awscli.customizations.emrcontainers.'
113
'eks.EKS.get_account_id')
114
def test_trust_policy_does_not_exist(self, get_account_id_patch,
115
get_oidc_issuer_id_patch,
116
get_assume_role_policy_patch):
117
get_assume_role_policy_patch.return_value = self.policy_document
118
get_oidc_issuer_id_patch.return_value = self.oidc_provider
119
get_account_id_patch.return_value = self.account_id
120
121
output = self.run_cmd(self.command, expected_rc=0)
122
self.assert_trust_policy_updated(output[0])
123
124
# Use case: Expected trust policy exists but the condition section
125
# has an additional condition
126
# Expected results: Operation is performed by client to update
127
# the trust policy in expected format
128
@mock.patch('awscli.customizations.emrcontainers.'
129
'iam.IAM.get_assume_role_policy')
130
@mock.patch('awscli.customizations.emrcontainers.'
131
'eks.EKS.get_oidc_issuer_id')
132
@mock.patch('awscli.customizations.emrcontainers.'
133
'eks.EKS.get_account_id')
134
def test_trust_policy_exists_with_more_keys(self, get_account_id_patch,
135
get_oidc_issuer_id_patch,
136
get_assume_role_policy_patch):
137
statement_with_additional_condition_key = json.loads(
138
self.expected_statement)
139
statement_with_additional_condition_key.get("Condition").get(
140
"StringLike")["test:key"] = "value"
141
self.policy_document.get("Statement").append(
142
statement_with_additional_condition_key)
143
144
self.expected_policy_document = copy.deepcopy(self.policy_document)
145
self.expected_policy_document.get("Statement").append(json.loads(
146
self.expected_statement))
147
148
get_assume_role_policy_patch.return_value = self.policy_document
149
get_oidc_issuer_id_patch.return_value = self.oidc_provider
150
get_account_id_patch.return_value = self.account_id
151
152
output = self.run_cmd(self.command, expected_rc=0)
153
self.assert_trust_policy_updated(output[0])
154
155
# Use case: Initial trust policy document does not have Statements section
156
# Expected results: Operation is performed by client to update
157
# the trust policy in expected format
158
@mock.patch('awscli.customizations.emrcontainers.'
159
'iam.IAM.get_assume_role_policy')
160
@mock.patch('awscli.customizations.emrcontainers.'
161
'eks.EKS.get_oidc_issuer_id')
162
@mock.patch('awscli.customizations.emrcontainers.'
163
'eks.EKS.get_account_id')
164
def test_policy_document_has_missing_key(self,
165
get_account_id_patch,
166
get_oidc_issuer_id_patch,
167
get_assume_role_policy_patch):
168
del self.policy_document["Statement"]
169
170
self.expected_policy_document = copy.deepcopy(self.policy_document)
171
self.expected_policy_document["Statement"] = [json.loads(
172
self.expected_statement)]
173
174
get_assume_role_policy_patch.return_value = self.policy_document
175
get_oidc_issuer_id_patch.return_value = self.oidc_provider
176
get_account_id_patch.return_value = self.account_id
177
178
output = self.run_cmd(self.command, expected_rc=0)
179
self.assert_trust_policy_updated(output[0])
180
181
# Use case: Initial trust policy document has empty Statements section
182
# Expected results: Operation is performed by client to update
183
# the trust policy in expected format
184
@mock.patch('awscli.customizations.emrcontainers.'
185
'iam.IAM.get_assume_role_policy')
186
@mock.patch('awscli.customizations.emrcontainers.'
187
'eks.EKS.get_oidc_issuer_id')
188
@mock.patch('awscli.customizations.emrcontainers.'
189
'eks.EKS.get_account_id')
190
def test_policy_document_has_empty_statements(self,
191
get_account_id_patch,
192
get_oidc_issuer_id_patch,
193
get_assume_role_policy_patch):
194
del self.policy_document.get("Statement")[:]
195
196
self.expected_policy_document = copy.deepcopy(self.policy_document)
197
self.expected_policy_document.get("Statement").append(json.loads(
198
self.expected_statement))
199
200
get_assume_role_policy_patch.return_value = self.policy_document
201
get_oidc_issuer_id_patch.return_value = self.oidc_provider
202
get_account_id_patch.return_value = self.account_id
203
204
output = self.run_cmd(self.command, expected_rc=0)
205
self.assert_trust_policy_updated(output[0])
206
207
# Use case: Expected trust policy does not exist and user performs a dry run
208
# Expected results: No operation is performed by client
209
# The command should print the expected policy document to stdout
210
@mock.patch('awscli.customizations.emrcontainers.'
211
'iam.IAM.get_assume_role_policy')
212
@mock.patch('awscli.customizations.emrcontainers.'
213
'eks.EKS.get_oidc_issuer_id')
214
@mock.patch('awscli.customizations.emrcontainers.'
215
'eks.EKS.get_account_id')
216
def test_trust_policy_does_not_exist_dry_run(self, get_account_id_patch,
217
get_oidc_issuer_id_patch,
218
get_assume_role_policy_patch):
219
get_assume_role_policy_patch.return_value = self.policy_document
220
get_oidc_issuer_id_patch.return_value = self.oidc_provider
221
get_account_id_patch.return_value = self.account_id
222
223
output = self.run_cmd(self.command + " --dry-run", expected_rc=0)
224
self.assertEqual(len(self.operations_called), 0)
225
self.assertTrue(json_matches(json.loads(output[0]),
226
self.expected_policy_document))
227
228
# Use case: Expected trust policy already exists
229
# Expected results: No operation is performed by client
230
# The command should print that the trust policy statement already exists
231
@mock.patch('awscli.customizations.emrcontainers.'
232
'iam.IAM.get_assume_role_policy')
233
@mock.patch('awscli.customizations.emrcontainers.'
234
'eks.EKS.get_oidc_issuer_id')
235
@mock.patch('awscli.customizations.emrcontainers.'
236
'eks.EKS.get_account_id')
237
def test_trust_policy_exists(self, get_account_id_patch,
238
get_oidc_issuer_id_patch,
239
get_assume_role_policy_patch):
240
self.policy_document = self.expected_policy_document
241
242
get_assume_role_policy_patch.return_value = self.policy_document
243
get_oidc_issuer_id_patch.return_value = self.oidc_provider
244
get_account_id_patch.return_value = self.account_id
245
246
output = self.run_cmd(self.command, expected_rc=0)
247
self.assertEqual(len(self.operations_called), 0)
248
self.assertTrue(TRUST_POLICY_STATEMENT_ALREADY_EXISTS % self.role_name
249
in output[0])
250
251
# Use case: Expected trust policy does not exist in cn-north-1
252
# Expected results: Operation is performed by client in cn-north-1
253
# to update the trust policy in expected format
254
@mock.patch('awscli.customizations.emrcontainers.'
255
'iam.IAM.get_assume_role_policy')
256
@mock.patch('awscli.customizations.emrcontainers.'
257
'eks.EKS.get_oidc_issuer_id')
258
@mock.patch('awscli.customizations.emrcontainers.'
259
'eks.EKS.get_account_id')
260
def test_trust_policy_does_not_exist_in_cn(self,
261
get_account_id_patch,
262
get_oidc_issuer_id_patch,
263
get_assume_role_policy_patch):
264
get_assume_role_policy_patch.return_value = self.policy_document
265
get_oidc_issuer_id_patch.return_value = self.oidc_provider
266
get_account_id_patch.return_value = self.account_id
267
self.command += ' --region cn-north-1'
268
269
output = self.run_cmd(self.command, expected_rc=0)
270
self.assert_trust_policy_updated(output[0],
271
self.expected_policy_document_cn)
272
273
274
if __name__ == "__main__":
275
unittest.main()
276
277