Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/eks/get_token.py
1567 views
1
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import base64
14
import botocore
15
import json
16
import os
17
import sys
18
19
from datetime import timedelta
20
from botocore.signers import RequestSigner
21
from botocore.model import ServiceId
22
23
from awscli.formatter import get_formatter
24
from awscli.utils import create_nested_client
25
from awscli.compat import get_current_datetime
26
from awscli.customizations.commands import BasicCommand
27
from awscli.customizations.utils import uni_print
28
from awscli.customizations.utils import validate_mutually_exclusive
29
30
AUTH_SERVICE = "sts"
31
AUTH_COMMAND = "GetCallerIdentity"
32
AUTH_API_VERSION = "2011-06-15"
33
AUTH_SIGNING_VERSION = "v4"
34
35
ALPHA_API = "client.authentication.k8s.io/v1alpha1"
36
BETA_API = "client.authentication.k8s.io/v1beta1"
37
V1_API = "client.authentication.k8s.io/v1"
38
39
FULLY_SUPPORTED_API_VERSIONS = [
40
V1_API,
41
BETA_API,
42
]
43
DEPRECATED_API_VERSIONS = [
44
ALPHA_API,
45
]
46
47
ERROR_MSG_TPL = (
48
"{0} KUBERNETES_EXEC_INFO, defaulting to {1}. This is likely a "
49
"bug in your Kubernetes client. Please update your Kubernetes "
50
"client."
51
)
52
UNRECOGNIZED_MSG_TPL = (
53
"Unrecognized API version in KUBERNETES_EXEC_INFO, defaulting to "
54
"{0}. This is likely due to an outdated AWS "
55
"CLI. Please update your AWS CLI."
56
)
57
DEPRECATION_MSG_TPL = (
58
"Kubeconfig user entry is using deprecated API version {0}. Run "
59
"'aws eks update-kubeconfig' to update."
60
)
61
62
# Presigned url timeout in seconds
63
URL_TIMEOUT = 60
64
65
TOKEN_EXPIRATION_MINS = 14
66
67
TOKEN_PREFIX = 'k8s-aws-v1.'
68
69
K8S_AWS_ID_HEADER = 'x-k8s-aws-id'
70
71
72
class GetTokenCommand(BasicCommand):
73
NAME = 'get-token'
74
75
DESCRIPTION = (
76
"Get a token for authentication with an Amazon EKS cluster. "
77
"This can be used as an alternative to the "
78
"aws-iam-authenticator."
79
)
80
81
ARG_TABLE = [
82
{
83
'name': 'cluster-name',
84
'help_text': (
85
"Specify the name of the Amazon EKS cluster to create a token for. (Note: for local clusters on AWS Outposts, please use --cluster-id parameter)"
86
),
87
'required': False,
88
},
89
{
90
'name': 'role-arn',
91
'help_text': (
92
"Assume this role for credentials when signing the token. "
93
"Use this optional parameter when the credentials for signing "
94
"the token differ from that of the current role session. "
95
"Using this parameter results in new role session credentials "
96
"that are used to sign the token."
97
),
98
'required': False,
99
},
100
{
101
'name': 'cluster-id',
102
# When EKS in-region cluster supports cluster-id, we will need to update this help text
103
'help_text': (
104
"Specify the id of the Amazon EKS cluster to create a token for. (Note: for local clusters on AWS Outposts only)"
105
),
106
'required': False,
107
},
108
]
109
110
def get_expiration_time(self):
111
token_expiration = get_current_datetime() + timedelta(
112
minutes=TOKEN_EXPIRATION_MINS
113
)
114
return token_expiration.strftime('%Y-%m-%dT%H:%M:%SZ')
115
116
def _run_main(self, parsed_args, parsed_globals):
117
client_factory = STSClientFactory(self._session)
118
sts_client = client_factory.get_sts_client(
119
region_name=parsed_globals.region, role_arn=parsed_args.role_arn
120
)
121
122
validate_mutually_exclusive(parsed_args, ['cluster_name'], ['cluster_id'])
123
124
if parsed_args.cluster_id:
125
identifier = parsed_args.cluster_id
126
elif parsed_args.cluster_name:
127
identifier = parsed_args.cluster_name
128
else:
129
return ValueError("Either parameter --cluster-name or --cluster-id must be specified.")
130
131
token = TokenGenerator(sts_client).get_token(identifier)
132
133
# By default STS signs the url for 15 minutes so we are creating a
134
# rfc3339 timestamp with expiration in 14 minutes as part of the token, which
135
# is used by some clients (client-go) who will refresh the token after 14 mins
136
token_expiration = self.get_expiration_time()
137
138
full_object = {
139
"kind": "ExecCredential",
140
"apiVersion": self.discover_api_version(),
141
"spec": {},
142
"status": {
143
"expirationTimestamp": token_expiration,
144
"token": token,
145
},
146
}
147
148
output = parsed_globals.output
149
if output is None:
150
output = self._session.get_config_variable('output')
151
formatter = get_formatter(output, parsed_globals)
152
formatter.query = parsed_globals.query
153
154
formatter(self.NAME, full_object)
155
uni_print('\n')
156
return 0
157
158
def discover_api_version(self):
159
"""
160
Parses the KUBERNETES_EXEC_INFO environment variable and returns the
161
API version. If the environment variable is malformed or invalid,
162
return the v1beta1 response and print a message to stderr.
163
164
If the v1alpha1 API is specified explicitly, a message is printed to
165
stderr with instructions to update.
166
167
:return: The client authentication API version
168
:rtype: string
169
"""
170
# At the time Kubernetes v1.29 is released upstream (approx Dec 2023),
171
# "v1beta1" will be removed. At or around that time, EKS will likely
172
# support v1.22 through v1.28, in which client API version "v1beta1"
173
# will be supported by all EKS versions.
174
fallback_api_version = BETA_API
175
176
error_prefixes = {
177
"error": "Error parsing",
178
"empty": "Empty",
179
}
180
181
exec_info_raw = os.environ.get("KUBERNETES_EXEC_INFO", "")
182
if not exec_info_raw:
183
# All kube clients should be setting this, but client-go clients
184
# (kubectl, kubelet, etc) < 1.20 were not setting this if the API
185
# version defined in the kubeconfig was not v1alpha1.
186
#
187
# This was changed in kubernetes/kubernetes#95489 so that
188
# KUBERNETES_EXEC_INFO is always provided
189
return fallback_api_version
190
try:
191
exec_info = json.loads(exec_info_raw)
192
except json.JSONDecodeError:
193
# The environment variable was malformed
194
uni_print(
195
ERROR_MSG_TPL.format(
196
error_prefixes["error"],
197
fallback_api_version,
198
),
199
sys.stderr,
200
)
201
uni_print("\n", sys.stderr)
202
return fallback_api_version
203
204
api_version_raw = exec_info.get("apiVersion")
205
if api_version_raw in FULLY_SUPPORTED_API_VERSIONS:
206
return api_version_raw
207
elif api_version_raw in DEPRECATED_API_VERSIONS:
208
uni_print(DEPRECATION_MSG_TPL.format(api_version_raw), sys.stderr)
209
uni_print("\n", sys.stderr)
210
return api_version_raw
211
else:
212
uni_print(
213
UNRECOGNIZED_MSG_TPL.format(fallback_api_version),
214
sys.stderr,
215
)
216
uni_print("\n", sys.stderr)
217
return fallback_api_version
218
219
220
class TokenGenerator(object):
221
def __init__(self, sts_client):
222
self._sts_client = sts_client
223
224
def get_token(self, k8s_aws_id):
225
"""Generate a presigned url token to pass to kubectl."""
226
url = self._get_presigned_url(k8s_aws_id)
227
token = TOKEN_PREFIX + base64.urlsafe_b64encode(
228
url.encode('utf-8')
229
).decode('utf-8').rstrip('=')
230
return token
231
232
def _get_presigned_url(self, k8s_aws_id):
233
return self._sts_client.generate_presigned_url(
234
'get_caller_identity',
235
Params={K8S_AWS_ID_HEADER: k8s_aws_id},
236
ExpiresIn=URL_TIMEOUT,
237
HttpMethod='GET',
238
)
239
240
241
class STSClientFactory(object):
242
def __init__(self, session):
243
self._session = session
244
245
def get_sts_client(self, region_name=None, role_arn=None):
246
client_kwargs = {'region_name': region_name}
247
if role_arn is not None:
248
creds = self._get_role_credentials(region_name, role_arn)
249
client_kwargs['aws_access_key_id'] = creds['AccessKeyId']
250
client_kwargs['aws_secret_access_key'] = creds['SecretAccessKey']
251
client_kwargs['aws_session_token'] = creds['SessionToken']
252
sts = create_nested_client(self._session, 'sts', **client_kwargs)
253
self._register_k8s_aws_id_handlers(sts)
254
return sts
255
256
def _get_role_credentials(self, region_name, role_arn):
257
sts = create_nested_client(self._session, 'sts', region_name=region_name)
258
return sts.assume_role(
259
RoleArn=role_arn, RoleSessionName='EKSGetTokenAuth'
260
)['Credentials']
261
262
def _register_k8s_aws_id_handlers(self, sts_client):
263
sts_client.meta.events.register(
264
'provide-client-params.sts.GetCallerIdentity',
265
self._retrieve_k8s_aws_id,
266
)
267
sts_client.meta.events.register(
268
'before-sign.sts.GetCallerIdentity',
269
self._inject_k8s_aws_id_header,
270
)
271
272
def _retrieve_k8s_aws_id(self, params, context, **kwargs):
273
if K8S_AWS_ID_HEADER in params:
274
context[K8S_AWS_ID_HEADER] = params.pop(K8S_AWS_ID_HEADER)
275
276
def _inject_k8s_aws_id_header(self, request, **kwargs):
277
if K8S_AWS_ID_HEADER in request.context:
278
request.headers[K8S_AWS_ID_HEADER] = request.context[K8S_AWS_ID_HEADER]
279
280