Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/eks/update_kubeconfig.py
2624 views
1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import os
15
import logging
16
17
from botocore.compat import OrderedDict
18
19
from awscli.customizations.commands import BasicCommand
20
from awscli.customizations.utils import uni_print
21
from awscli.customizations.eks.exceptions import EKSClusterError
22
from awscli.customizations.eks.kubeconfig import (Kubeconfig,
23
KubeconfigError,
24
KubeconfigLoader,
25
KubeconfigWriter,
26
KubeconfigValidator,
27
KubeconfigAppender)
28
from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump
29
from awscli.utils import create_nested_client
30
31
LOG = logging.getLogger(__name__)
32
33
DEFAULT_PATH = os.path.expanduser("~/.kube/config")
34
35
# At the time EKS no longer supports Kubernetes v1.21 (probably ~Dec 2023),
36
# this can be safely changed to default to writing "v1"
37
API_VERSION = "client.authentication.k8s.io/v1beta1"
38
39
class UpdateKubeconfigCommand(BasicCommand):
40
NAME = 'update-kubeconfig'
41
42
DESCRIPTION = BasicCommand.FROM_FILE(
43
'eks',
44
'update-kubeconfig',
45
'_description.rst'
46
)
47
48
ARG_TABLE = [
49
{
50
'name': 'name',
51
'dest': 'cluster_name',
52
'help_text': ("The name of the cluster for which "
53
"to create a kubeconfig entry. "
54
"This cluster must exist in your account and in the "
55
"specified or configured default Region "
56
"for your AWS CLI installation."),
57
'required': True
58
},
59
{
60
'name': 'kubeconfig',
61
'help_text': ("Optionally specify a kubeconfig file to append "
62
"with your configuration. "
63
"By default, the configuration is written to the "
64
"first file path in the KUBECONFIG "
65
"environment variable (if it is set) "
66
"or the default kubeconfig path (.kube/config) "
67
"in your home directory."),
68
'required': False
69
},
70
{
71
'name': 'role-arn',
72
'help_text': ("To assume a role for cluster authentication, "
73
"specify an IAM role ARN with this option. "
74
"For example, if you created a cluster "
75
"while assuming an IAM role, "
76
"then you must also assume that role to "
77
"connect to the cluster the first time."),
78
'required': False
79
},
80
{
81
'name': 'proxy-url',
82
'help_text': ("Optionally specify a proxy url to route "
83
"traffic via when connecting to a cluster."),
84
'required': False
85
},
86
{
87
'name': 'dry-run',
88
'action': 'store_true',
89
'default': False,
90
'help_text': ("Print the merged kubeconfig to stdout instead of "
91
"writing it to the specified file."),
92
'required': False
93
},
94
{
95
'name': 'verbose',
96
'action': 'store_true',
97
'default': False,
98
'help_text': ("Print more detailed output "
99
"when writing to the kubeconfig file, "
100
"including the appended entries.")
101
},
102
{
103
'name': 'alias',
104
'help_text': ("Alias for the cluster context name. "
105
"Defaults to match cluster ARN."),
106
'required': False
107
},
108
{
109
'name': 'user-alias',
110
'help_text': ("Alias for the generated user name. "
111
"Defaults to match cluster ARN."),
112
'required': False
113
},
114
{
115
'name': 'assume-role-arn',
116
'help_text': ('To assume a role for retrieving cluster information, '
117
'specify an IAM role ARN with this option. '
118
'Use this for cross-account access to get cluster details '
119
'from the account where the cluster resides.'),
120
'required': False
121
}
122
]
123
124
def _display_entries(self, entries):
125
"""
126
Display entries in yaml format
127
128
:param entries: a list of OrderedDicts to be printed
129
:type entries: list
130
"""
131
uni_print("Entries:\n\n")
132
for entry in entries:
133
uni_print(ordered_yaml_dump(entry))
134
uni_print("\n")
135
136
def _run_main(self, parsed_args, parsed_globals):
137
client = EKSClient(self._session,
138
parsed_args=parsed_args,
139
parsed_globals=parsed_globals)
140
new_cluster_dict = client.get_cluster_entry()
141
new_user_dict = client.get_user_entry(user_alias=parsed_args.user_alias)
142
143
config_selector = KubeconfigSelector(
144
os.environ.get("KUBECONFIG", ""),
145
parsed_args.kubeconfig
146
)
147
config = config_selector.choose_kubeconfig(
148
new_cluster_dict["name"]
149
)
150
updating_existing = config.has_cluster(new_cluster_dict["name"])
151
appender = KubeconfigAppender()
152
new_context_dict = appender.insert_cluster_user_pair(config,
153
new_cluster_dict,
154
new_user_dict,
155
parsed_args.alias)
156
157
if parsed_args.dry_run:
158
uni_print(config.dump_content())
159
else:
160
writer = KubeconfigWriter()
161
writer.write_kubeconfig(config)
162
163
if updating_existing:
164
uni_print("Updated context {0} in {1}\n".format(
165
new_context_dict["name"], config.path
166
))
167
else:
168
uni_print("Added new context {0} to {1}\n".format(
169
new_context_dict["name"], config.path
170
))
171
172
if parsed_args.verbose:
173
self._display_entries([
174
new_context_dict,
175
new_user_dict,
176
new_cluster_dict
177
])
178
179
180
181
class KubeconfigSelector(object):
182
183
def __init__(self, env_variable, path_in, validator=None,
184
loader=None):
185
"""
186
Parse KUBECONFIG into a list of absolute paths.
187
Also replace the empty list with DEFAULT_PATH
188
189
:param env_variable: KUBECONFIG as a long string
190
:type env_variable: string
191
192
:param path_in: The path passed in through the CLI
193
:type path_in: string or None
194
"""
195
if validator is None:
196
validator = KubeconfigValidator()
197
self._validator = validator
198
199
if loader is None:
200
loader = KubeconfigLoader(validator)
201
self._loader = loader
202
203
if path_in is not None:
204
# Override environment variable
205
self._paths = [self._expand_path(path_in)]
206
else:
207
# Get the list of paths from the environment variable
208
if env_variable == "":
209
env_variable = DEFAULT_PATH
210
self._paths = [self._expand_path(element)
211
for element in env_variable.split(os.pathsep)
212
if len(element.strip()) > 0]
213
if len(self._paths) == 0:
214
self._paths = [DEFAULT_PATH]
215
216
def choose_kubeconfig(self, cluster_name):
217
"""
218
Choose which kubeconfig file to read from.
219
If name is already an entry in one of the $KUBECONFIG files,
220
choose that one.
221
Otherwise choose the first file.
222
223
:param cluster_name: The name of the cluster which is going to be added
224
:type cluster_name: String
225
226
:return: a chosen Kubeconfig based on above rules
227
:rtype: Kubeconfig
228
"""
229
# Search for an existing entry to update
230
for candidate_path in self._paths:
231
try:
232
loaded_config = self._loader.load_kubeconfig(candidate_path)
233
234
if loaded_config.has_cluster(cluster_name):
235
LOG.debug("Found entry to update at {0}".format(
236
candidate_path
237
))
238
return loaded_config
239
except KubeconfigError as e:
240
LOG.warning("Passing {0}:{1}".format(candidate_path, e))
241
242
# No entry was found, use the first file in KUBECONFIG
243
#
244
# Note: This could raise KubeconfigErrors if paths[0] is corrupted
245
return self._loader.load_kubeconfig(self._paths[0])
246
247
def _expand_path(self, path):
248
""" A helper to expand a path to a full absolute path. """
249
return os.path.abspath(os.path.expanduser(path))
250
251
252
class EKSClient(object):
253
def __init__(self, session, parsed_args, parsed_globals=None):
254
self._session = session
255
self._cluster_name = parsed_args.cluster_name
256
self._cluster_description = None
257
self._parsed_globals = parsed_globals
258
self._parsed_args = parsed_args
259
260
@property
261
def cluster_description(self):
262
"""
263
Use an eks describe-cluster call to get the cluster description
264
Cache the response in self._cluster_description.
265
describe-cluster will only be called once.
266
"""
267
if self._cluster_description is not None:
268
return self._cluster_description
269
270
client_kwargs = {}
271
if self._parsed_globals:
272
client_kwargs.update({
273
"region_name": self._parsed_globals.region,
274
"endpoint_url": self._parsed_globals.endpoint_url,
275
"verify": self._parsed_globals.verify_ssl,
276
})
277
278
# Handle role assumption if needed
279
if getattr(self._parsed_args, 'assume_role_arn', None):
280
sts_client = create_nested_client(self._session, 'sts')
281
credentials = sts_client.assume_role(
282
RoleArn=self._parsed_args.assume_role_arn,
283
RoleSessionName='EKSDescribeClusterSession'
284
)["Credentials"]
285
286
client_kwargs.update({
287
"aws_access_key_id": credentials["AccessKeyId"],
288
"aws_secret_access_key": credentials["SecretAccessKey"],
289
"aws_session_token": credentials["SessionToken"],
290
})
291
292
client = create_nested_client(self._session, "eks", **client_kwargs)
293
full_description = client.describe_cluster(name=self._cluster_name)
294
cluster = full_description.get("cluster")
295
296
if not cluster or "status" not in cluster:
297
raise EKSClusterError("Cluster not found")
298
if cluster["status"] not in ["ACTIVE", "UPDATING"]:
299
raise EKSClusterError(f"Cluster status is {cluster['status']}")
300
301
self._cluster_description = cluster
302
return cluster
303
304
def get_cluster_entry(self):
305
"""
306
Return a cluster entry generated using
307
the previously obtained description.
308
"""
309
310
cert_data = self.cluster_description.get("certificateAuthority", {}).get("data", "")
311
endpoint = self.cluster_description.get("endpoint")
312
arn = self.cluster_description.get("arn")
313
314
generated_cluster = OrderedDict([
315
("cluster", OrderedDict([
316
("certificate-authority-data", cert_data),
317
("server", endpoint)
318
])),
319
("name", arn)
320
])
321
322
if self._parsed_args.proxy_url is not None:
323
generated_cluster["cluster"]["proxy-url"] = self._parsed_args.proxy_url
324
325
return generated_cluster
326
327
def get_user_entry(self, user_alias=None):
328
"""
329
Return a user entry generated using
330
the previously obtained description.
331
"""
332
region = self.cluster_description.get("arn").split(":")[3]
333
outpost_config = self.cluster_description.get("outpostConfig")
334
335
if outpost_config is None:
336
cluster_identification_parameter = "--cluster-name"
337
cluster_identification_value = self._cluster_name
338
else:
339
# If cluster contains outpostConfig, use id for identification
340
cluster_identification_parameter = "--cluster-id"
341
cluster_identification_value = self.cluster_description.get("id")
342
343
generated_user = OrderedDict([
344
("name", user_alias or self.cluster_description.get("arn", "")),
345
("user", OrderedDict([
346
("exec", OrderedDict([
347
("apiVersion", API_VERSION),
348
("args",
349
[
350
"--region",
351
region,
352
"eks",
353
"get-token",
354
cluster_identification_parameter,
355
cluster_identification_value,
356
"--output",
357
"json",
358
]),
359
("command", "aws"),
360
]))
361
]))
362
])
363
364
if self._parsed_args.role_arn is not None:
365
generated_user["user"]["exec"]["args"].extend([
366
"--role",
367
self._parsed_args.role_arn
368
])
369
370
if self._session.profile:
371
generated_user["user"]["exec"]["env"] = [OrderedDict([
372
("name", "AWS_PROFILE"),
373
("value", self._session.profile)
374
])]
375
376
return generated_user
377
378