import base64
from datetime import datetime
import json
import os
import awscli.compat
from awscli.testutils import mock
from awscli.testutils import BaseAWSCommandParamsTest
from awscli.compat import urlparse
class TestGetTokenCommand(BaseAWSCommandParamsTest):
def setUp(self):
super(TestGetTokenCommand, self).setUp()
self.cluster_name = 'MyCluster'
self.role_arn = 'arn:aws:iam::012345678910:role/RoleArn'
self.access_key = 'ABCDEFGHIJKLMNOPQRST'
self.secret_key = 'TSRQPONMLKJUHGFEDCBA'
self.session_token = 'TOKENTOKENTOKENTOKEN'
self.environ['AWS_ACCESS_KEY_ID'] = self.access_key
self.environ['AWS_SECRET_ACCESS_KEY'] = self.secret_key
self.expected_token_prefix = 'k8s-aws-v1.'
self.datetime_patcher = mock.patch.object(
awscli.compat.datetime, 'datetime', mock.Mock(wraps=datetime)
)
mocked_datetime = self.datetime_patcher.start()
mocked_datetime.now.return_value = datetime(2019, 10, 23, 23, 0, 0, 0)
def tearDown(self):
super().tearDown()
self.datetime_patcher.stop()
def run_get_token(self, cmd):
response, _, _ = self.run_cmd(cmd)
return json.loads(response)
def assert_url_correct(
self,
response,
expected_endpoint='sts.amazonaws.com',
expected_signing_region='us-east-1',
has_session_token=False,
):
url = self._get_url(response)
url_components = urlparse.urlparse(url)
self.assertEqual(url_components.netloc, expected_endpoint)
parsed_qs = urlparse.parse_qs(url_components.query)
self.assertIn(
expected_signing_region, parsed_qs['X-Amz-Credential'][0]
)
if has_session_token:
self.assertEqual(
[self.session_token], parsed_qs['X-Amz-Security-Token']
)
else:
self.assertNotIn('X-Amz-Security-Token', parsed_qs)
self.assertIn(self.access_key, parsed_qs['X-Amz-Credential'][0])
self.assertIn('x-k8s-aws-id', parsed_qs['X-Amz-SignedHeaders'][0])
def _get_url(self, response):
token = response['status']['token']
b64_token = token[len(self.expected_token_prefix) :].encode()
missing_padding = len(b64_token) % 4
if missing_padding:
b64_token += b'=' * (4 - missing_padding)
return base64.urlsafe_b64decode(b64_token).decode()
def set_kubernetes_exec_info(self, api_version):
self.environ['KUBERNETES_EXEC_INFO'] = (
'{"kind":"ExecCredential",'
f'"apiVersion":"client.authentication.k8s.io/{api_version}",'
'"spec":{"interactive":true}}'
)
def test_get_token(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
response = self.run_get_token(cmd)
self.assertEqual(
response,
{
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1beta1",
"spec": {},
"status": {
"expirationTimestamp": "2019-10-23T23:14:00Z",
"token": mock.ANY,
},
},
)
def test_query_nested_object(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --query status'
response = self.run_get_token(cmd)
self.assertEqual(
response,
{
"expirationTimestamp": "2019-10-23T23:14:00Z",
"token": mock.ANY,
},
)
def test_query_value(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --query apiVersion'
response = self.run_get_token(cmd)
self.assertEqual(
response, "client.authentication.k8s.io/v1beta1",
)
def test_output_text(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --output text'
stdout, _, _ = self.run_cmd(cmd)
self.assertIn("ExecCredential", stdout)
self.assertIn("client.authentication.k8s.io/v1beta1", stdout)
self.assertIn("2019-10-23T23:14:00Z", stdout)
def test_output_table(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --output table'
stdout, _, _ = self.run_cmd(cmd)
self.assertIn("ExecCredential", stdout)
self.assertIn("client.authentication.k8s.io/v1beta1", stdout)
self.assertIn("2019-10-23T23:14:00Z", stdout)
def test_url(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
response = self.run_get_token(cmd)
self.assert_url_correct(
response,
expected_endpoint='sts.us-east-1.amazonaws.com',
)
def test_url_with_region(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --region us-west-2'
response = self.run_get_token(cmd)
self.assert_url_correct(
response,
expected_endpoint='sts.us-west-2.amazonaws.com',
expected_signing_region='us-west-2',
)
def test_url_with_arn(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --role-arn %s' % self.role_arn
self.parsed_responses = [
{
"Credentials": {
"AccessKeyId": self.access_key,
"SecretAccessKey": self.secret_key,
"SessionToken": self.session_token,
},
}
]
response = self.run_get_token(cmd)
assume_role_call = self.operations_called[0]
self.assertEqual(assume_role_call[0].name, 'AssumeRole')
self.assertEqual(
assume_role_call[1],
{'RoleArn': self.role_arn, 'RoleSessionName': 'EKSGetTokenAuth'},
)
self.assert_url_correct(
response,
expected_endpoint='sts.us-east-1.amazonaws.com',
has_session_token=True,
)
def test_token_has_no_padding(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
num_rounds = 100
for _ in range(num_rounds):
response = self.run_get_token(cmd)
self.assertNotIn('=', response['status']['token'])
def test_url_different_partition(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
cmd += ' --region cn-north-1'
response = self.run_get_token(cmd)
self.assert_url_correct(
response,
expected_endpoint='sts.cn-north-1.amazonaws.com.cn',
expected_signing_region='cn-north-1',
)
def test_api_version_discovery_deprecated(self):
self.set_kubernetes_exec_info('v1alpha1')
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1alpha1",
)
self.assertEqual(
stderr,
(
"Kubeconfig user entry is using deprecated API "
"version client.authentication.k8s.io/v1alpha1. Run 'aws eks update-kubeconfig' to update.\n"
),
)
def test_api_version_discovery_malformed(self):
self.environ['KUBERNETES_EXEC_INFO'] = (
'{{"kind":"ExecCredential",'
'"apiVersion":"client.authentication.k8s.io/v1alpha1",'
'"spec":{"interactive":true}}'
)
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1beta1",
)
self.assertEqual(
stderr,
(
"Error parsing KUBERNETES_EXEC_INFO, defaulting to client.authentication.k8s.io/v1beta1. "
"This is likely a bug in your Kubernetes client. Please update your Kubernetes client.\n"
),
)
def test_api_version_discovery_empty(self):
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1beta1",
)
self.assertEqual(stderr, "",)
def test_api_version_discovery_v1(self):
self.set_kubernetes_exec_info('v1')
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1",
)
self.assertEqual(stderr, "")
def test_api_version_discovery_v1beta1(self):
self.set_kubernetes_exec_info('v1beta1')
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1beta1",
)
self.assertEqual(stderr, "")
def test_api_version_discovery_unknown(self):
self.set_kubernetes_exec_info('v2')
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
stdout, stderr, _ = self.run_cmd(cmd)
response = json.loads(stdout)
self.assertEqual(
response["apiVersion"],
"client.authentication.k8s.io/v1beta1",
)
self.assertEqual(
stderr,
(
"Unrecognized API version in KUBERNETES_EXEC_INFO, defaulting to client.authentication.k8s.io/v1beta1. "
"This is likely due to an outdated AWS CLI. Please update your AWS CLI.\n"
),
)