Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/functional/eks/test_get_token.py
1567 views
1
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import base64
14
from datetime import datetime
15
import json
16
import os
17
18
import awscli.compat
19
from awscli.testutils import mock
20
from awscli.testutils import BaseAWSCommandParamsTest
21
from awscli.compat import urlparse
22
23
24
class TestGetTokenCommand(BaseAWSCommandParamsTest):
25
def setUp(self):
26
super(TestGetTokenCommand, self).setUp()
27
self.cluster_name = 'MyCluster'
28
self.role_arn = 'arn:aws:iam::012345678910:role/RoleArn'
29
self.access_key = 'ABCDEFGHIJKLMNOPQRST'
30
self.secret_key = 'TSRQPONMLKJUHGFEDCBA'
31
self.session_token = 'TOKENTOKENTOKENTOKEN'
32
self.environ['AWS_ACCESS_KEY_ID'] = self.access_key
33
self.environ['AWS_SECRET_ACCESS_KEY'] = self.secret_key
34
self.expected_token_prefix = 'k8s-aws-v1.'
35
self.datetime_patcher = mock.patch.object(
36
awscli.compat.datetime, 'datetime', mock.Mock(wraps=datetime)
37
)
38
mocked_datetime = self.datetime_patcher.start()
39
mocked_datetime.now.return_value = datetime(2019, 10, 23, 23, 0, 0, 0)
40
41
def tearDown(self):
42
super().tearDown()
43
self.datetime_patcher.stop()
44
45
def run_get_token(self, cmd):
46
response, _, _ = self.run_cmd(cmd)
47
return json.loads(response)
48
49
def assert_url_correct(
50
self,
51
response,
52
expected_endpoint='sts.amazonaws.com',
53
expected_signing_region='us-east-1',
54
has_session_token=False,
55
):
56
url = self._get_url(response)
57
url_components = urlparse.urlparse(url)
58
self.assertEqual(url_components.netloc, expected_endpoint)
59
parsed_qs = urlparse.parse_qs(url_components.query)
60
self.assertIn(
61
expected_signing_region, parsed_qs['X-Amz-Credential'][0]
62
)
63
if has_session_token:
64
self.assertEqual(
65
[self.session_token], parsed_qs['X-Amz-Security-Token']
66
)
67
else:
68
self.assertNotIn('X-Amz-Security-Token', parsed_qs)
69
70
self.assertIn(self.access_key, parsed_qs['X-Amz-Credential'][0])
71
self.assertIn('x-k8s-aws-id', parsed_qs['X-Amz-SignedHeaders'][0])
72
73
def _get_url(self, response):
74
token = response['status']['token']
75
b64_token = token[len(self.expected_token_prefix) :].encode()
76
missing_padding = len(b64_token) % 4
77
if missing_padding:
78
b64_token += b'=' * (4 - missing_padding)
79
return base64.urlsafe_b64decode(b64_token).decode()
80
81
def set_kubernetes_exec_info(self, api_version):
82
# Set KUBERNETES_EXEC_INFO env var with default payload and provided
83
# api version
84
self.environ['KUBERNETES_EXEC_INFO'] = (
85
'{"kind":"ExecCredential",'
86
f'"apiVersion":"client.authentication.k8s.io/{api_version}",'
87
'"spec":{"interactive":true}}'
88
)
89
90
def test_get_token(self):
91
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
92
response = self.run_get_token(cmd)
93
self.assertEqual(
94
response,
95
{
96
"kind": "ExecCredential",
97
"apiVersion": "client.authentication.k8s.io/v1beta1",
98
"spec": {},
99
"status": {
100
"expirationTimestamp": "2019-10-23T23:14:00Z",
101
"token": mock.ANY, # This is asserted in later cases
102
},
103
},
104
)
105
106
def test_query_nested_object(self):
107
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
108
cmd += ' --query status'
109
response = self.run_get_token(cmd)
110
self.assertEqual(
111
response,
112
{
113
"expirationTimestamp": "2019-10-23T23:14:00Z",
114
"token": mock.ANY, # This is asserted in later cases
115
},
116
)
117
118
def test_query_value(self):
119
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
120
cmd += ' --query apiVersion'
121
response = self.run_get_token(cmd)
122
self.assertEqual(
123
response, "client.authentication.k8s.io/v1beta1",
124
)
125
126
def test_output_text(self):
127
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
128
cmd += ' --output text'
129
stdout, _, _ = self.run_cmd(cmd)
130
self.assertIn("ExecCredential", stdout)
131
self.assertIn("client.authentication.k8s.io/v1beta1", stdout)
132
self.assertIn("2019-10-23T23:14:00Z", stdout)
133
134
def test_output_table(self):
135
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
136
cmd += ' --output table'
137
stdout, _, _ = self.run_cmd(cmd)
138
self.assertIn("ExecCredential", stdout)
139
self.assertIn("client.authentication.k8s.io/v1beta1", stdout)
140
self.assertIn("2019-10-23T23:14:00Z", stdout)
141
142
def test_url(self):
143
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
144
response = self.run_get_token(cmd)
145
self.assert_url_correct(
146
response,
147
expected_endpoint='sts.us-east-1.amazonaws.com',
148
)
149
150
def test_url_with_region(self):
151
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
152
cmd += ' --region us-west-2'
153
response = self.run_get_token(cmd)
154
# Previously, even though us-west-2 is specified, we still
155
# signed for the global endpoint. Now, the STS client defaults
156
# to using the regional endpoint, so the expected signing region
157
# should be the same as the endpoint region.
158
self.assert_url_correct(
159
response,
160
expected_endpoint='sts.us-west-2.amazonaws.com',
161
expected_signing_region='us-west-2',
162
)
163
164
def test_url_with_arn(self):
165
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
166
cmd += ' --role-arn %s' % self.role_arn
167
self.parsed_responses = [
168
{
169
"Credentials": {
170
"AccessKeyId": self.access_key,
171
"SecretAccessKey": self.secret_key,
172
"SessionToken": self.session_token,
173
},
174
}
175
]
176
response = self.run_get_token(cmd)
177
assume_role_call = self.operations_called[0]
178
self.assertEqual(assume_role_call[0].name, 'AssumeRole')
179
self.assertEqual(
180
assume_role_call[1],
181
{'RoleArn': self.role_arn, 'RoleSessionName': 'EKSGetTokenAuth'},
182
)
183
self.assert_url_correct(
184
response,
185
expected_endpoint='sts.us-east-1.amazonaws.com',
186
has_session_token=True,
187
)
188
189
def test_token_has_no_padding(self):
190
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
191
num_rounds = 100
192
# It is difficult to patch everything out to get an exact
193
# reproduceable token. So to make sure there is no padding, we
194
# run the command N times and make sure there is no padding in the
195
# generated token.
196
for _ in range(num_rounds):
197
response = self.run_get_token(cmd)
198
self.assertNotIn('=', response['status']['token'])
199
200
def test_url_different_partition(self):
201
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
202
cmd += ' --region cn-north-1'
203
response = self.run_get_token(cmd)
204
self.assert_url_correct(
205
response,
206
expected_endpoint='sts.cn-north-1.amazonaws.com.cn',
207
expected_signing_region='cn-north-1',
208
)
209
210
def test_api_version_discovery_deprecated(self):
211
self.set_kubernetes_exec_info('v1alpha1')
212
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
213
stdout, stderr, _ = self.run_cmd(cmd)
214
response = json.loads(stdout)
215
216
self.assertEqual(
217
response["apiVersion"],
218
"client.authentication.k8s.io/v1alpha1",
219
)
220
221
self.assertEqual(
222
stderr,
223
(
224
"Kubeconfig user entry is using deprecated API "
225
"version client.authentication.k8s.io/v1alpha1. Run 'aws eks update-kubeconfig' to update.\n"
226
),
227
)
228
229
def test_api_version_discovery_malformed(self):
230
self.environ['KUBERNETES_EXEC_INFO'] = (
231
'{{"kind":"ExecCredential",'
232
'"apiVersion":"client.authentication.k8s.io/v1alpha1",'
233
'"spec":{"interactive":true}}'
234
)
235
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
236
stdout, stderr, _ = self.run_cmd(cmd)
237
response = json.loads(stdout)
238
239
self.assertEqual(
240
response["apiVersion"],
241
"client.authentication.k8s.io/v1beta1",
242
)
243
244
self.assertEqual(
245
stderr,
246
(
247
"Error parsing KUBERNETES_EXEC_INFO, defaulting to client.authentication.k8s.io/v1beta1. "
248
"This is likely a bug in your Kubernetes client. Please update your Kubernetes client.\n"
249
),
250
)
251
252
def test_api_version_discovery_empty(self):
253
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
254
stdout, stderr, _ = self.run_cmd(cmd)
255
response = json.loads(stdout)
256
257
self.assertEqual(
258
response["apiVersion"],
259
"client.authentication.k8s.io/v1beta1",
260
)
261
262
self.assertEqual(stderr, "",)
263
264
def test_api_version_discovery_v1(self):
265
self.set_kubernetes_exec_info('v1')
266
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
267
stdout, stderr, _ = self.run_cmd(cmd)
268
response = json.loads(stdout)
269
270
self.assertEqual(
271
response["apiVersion"],
272
"client.authentication.k8s.io/v1",
273
)
274
275
self.assertEqual(stderr, "")
276
277
def test_api_version_discovery_v1beta1(self):
278
self.set_kubernetes_exec_info('v1beta1')
279
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
280
stdout, stderr, _ = self.run_cmd(cmd)
281
response = json.loads(stdout)
282
283
self.assertEqual(
284
response["apiVersion"],
285
"client.authentication.k8s.io/v1beta1",
286
)
287
288
self.assertEqual(stderr, "")
289
290
def test_api_version_discovery_unknown(self):
291
self.set_kubernetes_exec_info('v2')
292
cmd = 'eks get-token --cluster-name %s' % self.cluster_name
293
stdout, stderr, _ = self.run_cmd(cmd)
294
response = json.loads(stdout)
295
296
self.assertEqual(
297
response["apiVersion"],
298
"client.authentication.k8s.io/v1beta1",
299
)
300
301
self.assertEqual(
302
stderr,
303
(
304
"Unrecognized API version in KUBERNETES_EXEC_INFO, defaulting to client.authentication.k8s.io/v1beta1. "
305
"This is likely due to an outdated AWS CLI. Please update your AWS CLI.\n"
306
),
307
)
308
309