Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/integration/test_assume_role.py
1566 views
1
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import os
14
import shutil
15
import tempfile
16
import json
17
import time
18
19
from botocore.session import Session
20
from botocore.exceptions import ClientError
21
22
from awscli.testutils import unittest, aws, random_chars
23
24
S3_READ_POLICY_ARN = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
25
26
27
class TestAssumeRoleCredentials(unittest.TestCase):
28
def setUp(self):
29
super(TestAssumeRoleCredentials, self).setUp()
30
self.environ = os.environ.copy()
31
self.parent_session = Session()
32
self.iam = self.parent_session.create_client('iam')
33
self.sts = self.parent_session.create_client('sts')
34
self.tempdir = tempfile.mkdtemp()
35
self.config_file = os.path.join(self.tempdir, 'config')
36
37
# A role trust policy that allows the current account to call assume
38
# role on itself.
39
account_id = self.sts.get_caller_identity()['Account']
40
self.role_policy = {
41
"Version": "2012-10-17",
42
"Statement": [
43
{
44
"Effect": "Allow",
45
"Principal": {
46
"AWS": "arn:aws:iam::%s:root" % account_id
47
},
48
"Action": "sts:AssumeRole"
49
}
50
]
51
}
52
53
def tearDown(self):
54
super(TestAssumeRoleCredentials, self).tearDown()
55
shutil.rmtree(self.tempdir)
56
57
def random_name(self):
58
return 'clitest-' + random_chars(10)
59
60
def create_role(self, policy_document, policy_arn=None):
61
name = self.random_name()
62
response = self.iam.create_role(
63
RoleName=name,
64
AssumeRolePolicyDocument=json.dumps(policy_document)
65
)
66
self.addCleanup(self.iam.delete_role, RoleName=name)
67
if policy_arn:
68
self.iam.attach_role_policy(RoleName=name, PolicyArn=policy_arn)
69
self.addCleanup(
70
self.iam.detach_role_policy, RoleName=name,
71
PolicyArn=policy_arn
72
)
73
return response['Role']
74
75
def create_user(self, policy_arns):
76
name = self.random_name()
77
user = self.iam.create_user(UserName=name)['User']
78
self.addCleanup(self.iam.delete_user, UserName=name)
79
80
for arn in policy_arns:
81
self.iam.attach_user_policy(
82
UserName=name,
83
PolicyArn=arn
84
)
85
self.addCleanup(
86
self.iam.detach_user_policy,
87
UserName=name, PolicyArn=arn
88
)
89
90
return user
91
92
def create_creds(self, user_name):
93
creds = self.iam.create_access_key(UserName=user_name)['AccessKey']
94
self.addCleanup(
95
self.iam.delete_access_key,
96
UserName=user_name, AccessKeyId=creds['AccessKeyId']
97
)
98
return creds
99
100
def wait_for_assume_role(self, role_arn, access_key, secret_key,
101
token=None, attempts=30, delay=10,
102
num_success_needed=3):
103
# "Why not use the policy simulator?" you might ask. The answer is
104
# that the policy simulator will return success far before you can
105
# actually make the calls.
106
client = self.parent_session.create_client(
107
'sts', aws_access_key_id=access_key,
108
aws_secret_access_key=secret_key, aws_session_token=token
109
)
110
attempts_remaining = attempts
111
role_session_name = random_chars(10)
112
num_success = 0
113
while attempts_remaining > 0:
114
try:
115
result = client.assume_role(
116
RoleArn=role_arn, RoleSessionName=role_session_name)
117
num_success += 1
118
if num_success == num_success_needed:
119
return result['Credentials']
120
except ClientError as e:
121
code = e.response.get('Error', {}).get('Code')
122
if code not in ["InvalidClientTokenId", "AccessDenied"]:
123
raise
124
attempts_remaining -= 1
125
time.sleep(delay)
126
127
raise Exception("Unable to assume role %s" % role_arn)
128
129
def create_assume_policy(self, role_arn):
130
policy_document = {
131
"Version": "2012-10-17",
132
"Statement": [
133
{
134
"Effect": "Allow",
135
"Resource": role_arn,
136
"Action": "sts:AssumeRole"
137
}
138
]
139
}
140
name = self.random_name()
141
response = self.iam.create_policy(
142
PolicyName=name,
143
PolicyDocument=json.dumps(policy_document)
144
)
145
self.addCleanup(
146
self.iam.delete_policy, PolicyArn=response['Policy']['Arn']
147
)
148
return response['Policy']['Arn']
149
150
def assert_s3_read_only_profile(self, profile_name):
151
# Calls to S3 should succeed
152
command = 's3api list-buckets --profile %s' % profile_name
153
result = aws(command, env_vars=self.environ)
154
self.assertEqual(result.rc, 0, result.stderr)
155
156
# Calls to other services should not
157
command = 'iam list-groups --profile %s' % profile_name
158
result = aws(command, env_vars=self.environ)
159
self.assertNotEqual(result.rc, 0, result.stdout)
160
self.assertIn('AccessDenied', result.stderr)
161
162
def test_recursive_assume_role(self):
163
# Create the final role, the one that will actually have access to s3
164
final_role = self.create_role(self.role_policy, S3_READ_POLICY_ARN)
165
166
# Create the role that can assume the final role
167
middle_policy_arn = self.create_assume_policy(final_role['Arn'])
168
middle_role = self.create_role(self.role_policy, middle_policy_arn)
169
170
# Create a user that can only assume the middle-man role, and then get
171
# static credentials for it.
172
user_policy_arn = self.create_assume_policy(middle_role['Arn'])
173
user = self.create_user([user_policy_arn])
174
user_creds = self.create_creds(user['UserName'])
175
176
# Setup the config file with the profiles we'll be using. For
177
# convenience static credentials are placed here instead of putting
178
# them in the credentials file.
179
config = (
180
'[default]\n'
181
'aws_access_key_id = %s\n'
182
'aws_secret_access_key = %s\n'
183
'[profile middle]\n'
184
'source_profile = default\n'
185
'role_arn = %s\n'
186
'[profile final]\n'
187
'source_profile = middle\n'
188
'role_arn = %s\n'
189
)
190
config = config % (
191
user_creds['AccessKeyId'], user_creds['SecretAccessKey'],
192
middle_role['Arn'], final_role['Arn']
193
)
194
with open(self.config_file, 'w') as f:
195
f.write(config)
196
197
# Wait for IAM permissions to propagate
198
middle_creds = self.wait_for_assume_role(
199
role_arn=middle_role['Arn'],
200
access_key=user_creds['AccessKeyId'],
201
secret_key=user_creds['SecretAccessKey'],
202
)
203
self.wait_for_assume_role(
204
role_arn=final_role['Arn'],
205
access_key=middle_creds['AccessKeyId'],
206
secret_key=middle_creds['SecretAccessKey'],
207
token=middle_creds['SessionToken'],
208
)
209
210
# Configure our credentials file to be THE credentials file
211
self.environ['AWS_CONFIG_FILE'] = self.config_file
212
213
self.assert_s3_read_only_profile(profile_name='final')
214
215
def test_assume_role_with_credential_source(self):
216
# Create a role with read access to S3
217
role = self.create_role(self.role_policy, S3_READ_POLICY_ARN)
218
219
# Create a user that can assume the role and get static credentials
220
# for it.
221
user_policy_arn = self.create_assume_policy(role['Arn'])
222
user = self.create_user([user_policy_arn])
223
user_creds = self.create_creds(user['UserName'])
224
225
# Setup the config file with the profile we'll be using.
226
config = (
227
'[profile assume]\n'
228
'role_arn = %s\n'
229
'credential_source = Environment\n'
230
)
231
config = config % role['Arn']
232
with open(self.config_file, 'w') as f:
233
f.write(config)
234
235
# Wait for IAM permissions to propagate
236
self.wait_for_assume_role(
237
role_arn=role['Arn'],
238
access_key=user_creds['AccessKeyId'],
239
secret_key=user_creds['SecretAccessKey'],
240
)
241
242
# Setup the environment so that our new config file is THE config
243
# file and add the expected credentials since we're using the
244
# environment as our credential source.
245
self.environ['AWS_CONFIG_FILE'] = self.config_file
246
self.environ['AWS_SECRET_ACCESS_KEY'] = user_creds['SecretAccessKey']
247
self.environ['AWS_ACCESS_KEY_ID'] = user_creds['AccessKeyId']
248
249
self.assert_s3_read_only_profile(profile_name='assume')
250
251