import os
import shutil
import tempfile
import json
import time
from botocore.session import Session
from botocore.exceptions import ClientError
from awscli.testutils import unittest, aws, random_chars
S3_READ_POLICY_ARN = 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
class TestAssumeRoleCredentials(unittest.TestCase):
def setUp(self):
super(TestAssumeRoleCredentials, self).setUp()
self.environ = os.environ.copy()
self.parent_session = Session()
self.iam = self.parent_session.create_client('iam')
self.sts = self.parent_session.create_client('sts')
self.tempdir = tempfile.mkdtemp()
self.config_file = os.path.join(self.tempdir, 'config')
account_id = self.sts.get_caller_identity()['Account']
self.role_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::%s:root" % account_id
},
"Action": "sts:AssumeRole"
}
]
}
def tearDown(self):
super(TestAssumeRoleCredentials, self).tearDown()
shutil.rmtree(self.tempdir)
def random_name(self):
return 'clitest-' + random_chars(10)
def create_role(self, policy_document, policy_arn=None):
name = self.random_name()
response = self.iam.create_role(
RoleName=name,
AssumeRolePolicyDocument=json.dumps(policy_document)
)
self.addCleanup(self.iam.delete_role, RoleName=name)
if policy_arn:
self.iam.attach_role_policy(RoleName=name, PolicyArn=policy_arn)
self.addCleanup(
self.iam.detach_role_policy, RoleName=name,
PolicyArn=policy_arn
)
return response['Role']
def create_user(self, policy_arns):
name = self.random_name()
user = self.iam.create_user(UserName=name)['User']
self.addCleanup(self.iam.delete_user, UserName=name)
for arn in policy_arns:
self.iam.attach_user_policy(
UserName=name,
PolicyArn=arn
)
self.addCleanup(
self.iam.detach_user_policy,
UserName=name, PolicyArn=arn
)
return user
def create_creds(self, user_name):
creds = self.iam.create_access_key(UserName=user_name)['AccessKey']
self.addCleanup(
self.iam.delete_access_key,
UserName=user_name, AccessKeyId=creds['AccessKeyId']
)
return creds
def wait_for_assume_role(self, role_arn, access_key, secret_key,
token=None, attempts=30, delay=10,
num_success_needed=3):
client = self.parent_session.create_client(
'sts', aws_access_key_id=access_key,
aws_secret_access_key=secret_key, aws_session_token=token
)
attempts_remaining = attempts
role_session_name = random_chars(10)
num_success = 0
while attempts_remaining > 0:
try:
result = client.assume_role(
RoleArn=role_arn, RoleSessionName=role_session_name)
num_success += 1
if num_success == num_success_needed:
return result['Credentials']
except ClientError as e:
code = e.response.get('Error', {}).get('Code')
if code not in ["InvalidClientTokenId", "AccessDenied"]:
raise
attempts_remaining -= 1
time.sleep(delay)
raise Exception("Unable to assume role %s" % role_arn)
def create_assume_policy(self, role_arn):
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Resource": role_arn,
"Action": "sts:AssumeRole"
}
]
}
name = self.random_name()
response = self.iam.create_policy(
PolicyName=name,
PolicyDocument=json.dumps(policy_document)
)
self.addCleanup(
self.iam.delete_policy, PolicyArn=response['Policy']['Arn']
)
return response['Policy']['Arn']
def assert_s3_read_only_profile(self, profile_name):
command = 's3api list-buckets --profile %s' % profile_name
result = aws(command, env_vars=self.environ)
self.assertEqual(result.rc, 0, result.stderr)
command = 'iam list-groups --profile %s' % profile_name
result = aws(command, env_vars=self.environ)
self.assertNotEqual(result.rc, 0, result.stdout)
self.assertIn('AccessDenied', result.stderr)
def test_recursive_assume_role(self):
final_role = self.create_role(self.role_policy, S3_READ_POLICY_ARN)
middle_policy_arn = self.create_assume_policy(final_role['Arn'])
middle_role = self.create_role(self.role_policy, middle_policy_arn)
user_policy_arn = self.create_assume_policy(middle_role['Arn'])
user = self.create_user([user_policy_arn])
user_creds = self.create_creds(user['UserName'])
config = (
'[default]\n'
'aws_access_key_id = %s\n'
'aws_secret_access_key = %s\n'
'[profile middle]\n'
'source_profile = default\n'
'role_arn = %s\n'
'[profile final]\n'
'source_profile = middle\n'
'role_arn = %s\n'
)
config = config % (
user_creds['AccessKeyId'], user_creds['SecretAccessKey'],
middle_role['Arn'], final_role['Arn']
)
with open(self.config_file, 'w') as f:
f.write(config)
middle_creds = self.wait_for_assume_role(
role_arn=middle_role['Arn'],
access_key=user_creds['AccessKeyId'],
secret_key=user_creds['SecretAccessKey'],
)
self.wait_for_assume_role(
role_arn=final_role['Arn'],
access_key=middle_creds['AccessKeyId'],
secret_key=middle_creds['SecretAccessKey'],
token=middle_creds['SessionToken'],
)
self.environ['AWS_CONFIG_FILE'] = self.config_file
self.assert_s3_read_only_profile(profile_name='final')
def test_assume_role_with_credential_source(self):
role = self.create_role(self.role_policy, S3_READ_POLICY_ARN)
user_policy_arn = self.create_assume_policy(role['Arn'])
user = self.create_user([user_policy_arn])
user_creds = self.create_creds(user['UserName'])
config = (
'[profile assume]\n'
'role_arn = %s\n'
'credential_source = Environment\n'
)
config = config % role['Arn']
with open(self.config_file, 'w') as f:
f.write(config)
self.wait_for_assume_role(
role_arn=role['Arn'],
access_key=user_creds['AccessKeyId'],
secret_key=user_creds['SecretAccessKey'],
)
self.environ['AWS_CONFIG_FILE'] = self.config_file
self.environ['AWS_SECRET_ACCESS_KEY'] = user_creds['SecretAccessKey']
self.environ['AWS_ACCESS_KEY_ID'] = user_creds['AccessKeyId']
self.assert_s3_read_only_profile(profile_name='assume')