Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/eks/update_kubeconfig.py
1567 views
1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import os
15
import logging
16
17
from botocore.compat import OrderedDict
18
19
from awscli.customizations.commands import BasicCommand
20
from awscli.customizations.utils import uni_print
21
from awscli.customizations.eks.exceptions import EKSClusterError
22
from awscli.customizations.eks.kubeconfig import (Kubeconfig,
23
KubeconfigError,
24
KubeconfigLoader,
25
KubeconfigWriter,
26
KubeconfigValidator,
27
KubeconfigAppender)
28
from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump
29
from awscli.utils import create_nested_client
30
31
LOG = logging.getLogger(__name__)
32
33
DEFAULT_PATH = os.path.expanduser("~/.kube/config")
34
35
# At the time EKS no longer supports Kubernetes v1.21 (probably ~Dec 2023),
36
# this can be safely changed to default to writing "v1"
37
API_VERSION = "client.authentication.k8s.io/v1beta1"
38
39
class UpdateKubeconfigCommand(BasicCommand):
40
NAME = 'update-kubeconfig'
41
42
DESCRIPTION = BasicCommand.FROM_FILE(
43
'eks',
44
'update-kubeconfig',
45
'_description.rst'
46
)
47
48
ARG_TABLE = [
49
{
50
'name': 'name',
51
'dest': 'cluster_name',
52
'help_text': ("The name of the cluster for which "
53
"to create a kubeconfig entry. "
54
"This cluster must exist in your account and in the "
55
"specified or configured default Region "
56
"for your AWS CLI installation."),
57
'required': True
58
},
59
{
60
'name': 'kubeconfig',
61
'help_text': ("Optionally specify a kubeconfig file to append "
62
"with your configuration. "
63
"By default, the configuration is written to the "
64
"first file path in the KUBECONFIG "
65
"environment variable (if it is set) "
66
"or the default kubeconfig path (.kube/config) "
67
"in your home directory."),
68
'required': False
69
},
70
{
71
'name': 'role-arn',
72
'help_text': ("To assume a role for cluster authentication, "
73
"specify an IAM role ARN with this option. "
74
"For example, if you created a cluster "
75
"while assuming an IAM role, "
76
"then you must also assume that role to "
77
"connect to the cluster the first time."),
78
'required': False
79
},
80
{
81
'name': 'dry-run',
82
'action': 'store_true',
83
'default': False,
84
'help_text': ("Print the merged kubeconfig to stdout instead of "
85
"writing it to the specified file."),
86
'required': False
87
},
88
{
89
'name': 'verbose',
90
'action': 'store_true',
91
'default': False,
92
'help_text': ("Print more detailed output "
93
"when writing to the kubeconfig file, "
94
"including the appended entries.")
95
},
96
{
97
'name': 'alias',
98
'help_text': ("Alias for the cluster context name. "
99
"Defaults to match cluster ARN."),
100
'required': False
101
},
102
{
103
'name': 'user-alias',
104
'help_text': ("Alias for the generated user name. "
105
"Defaults to match cluster ARN."),
106
'required': False
107
},
108
{
109
'name': 'assume-role-arn',
110
'help_text': ('To assume a role for retrieving cluster information, '
111
'specify an IAM role ARN with this option. '
112
'Use this for cross-account access to get cluster details '
113
'from the account where the cluster resides.'),
114
'required': False
115
}
116
]
117
118
def _display_entries(self, entries):
119
"""
120
Display entries in yaml format
121
122
:param entries: a list of OrderedDicts to be printed
123
:type entries: list
124
"""
125
uni_print("Entries:\n\n")
126
for entry in entries:
127
uni_print(ordered_yaml_dump(entry))
128
uni_print("\n")
129
130
def _run_main(self, parsed_args, parsed_globals):
131
client = EKSClient(self._session,
132
parsed_args=parsed_args,
133
parsed_globals=parsed_globals)
134
new_cluster_dict = client.get_cluster_entry()
135
new_user_dict = client.get_user_entry(user_alias=parsed_args.user_alias)
136
137
config_selector = KubeconfigSelector(
138
os.environ.get("KUBECONFIG", ""),
139
parsed_args.kubeconfig
140
)
141
config = config_selector.choose_kubeconfig(
142
new_cluster_dict["name"]
143
)
144
updating_existing = config.has_cluster(new_cluster_dict["name"])
145
appender = KubeconfigAppender()
146
new_context_dict = appender.insert_cluster_user_pair(config,
147
new_cluster_dict,
148
new_user_dict,
149
parsed_args.alias)
150
151
if parsed_args.dry_run:
152
uni_print(config.dump_content())
153
else:
154
writer = KubeconfigWriter()
155
writer.write_kubeconfig(config)
156
157
if updating_existing:
158
uni_print("Updated context {0} in {1}\n".format(
159
new_context_dict["name"], config.path
160
))
161
else:
162
uni_print("Added new context {0} to {1}\n".format(
163
new_context_dict["name"], config.path
164
))
165
166
if parsed_args.verbose:
167
self._display_entries([
168
new_context_dict,
169
new_user_dict,
170
new_cluster_dict
171
])
172
173
174
175
class KubeconfigSelector(object):
176
177
def __init__(self, env_variable, path_in, validator=None,
178
loader=None):
179
"""
180
Parse KUBECONFIG into a list of absolute paths.
181
Also replace the empty list with DEFAULT_PATH
182
183
:param env_variable: KUBECONFIG as a long string
184
:type env_variable: string
185
186
:param path_in: The path passed in through the CLI
187
:type path_in: string or None
188
"""
189
if validator is None:
190
validator = KubeconfigValidator()
191
self._validator = validator
192
193
if loader is None:
194
loader = KubeconfigLoader(validator)
195
self._loader = loader
196
197
if path_in is not None:
198
# Override environment variable
199
self._paths = [self._expand_path(path_in)]
200
else:
201
# Get the list of paths from the environment variable
202
if env_variable == "":
203
env_variable = DEFAULT_PATH
204
self._paths = [self._expand_path(element)
205
for element in env_variable.split(os.pathsep)
206
if len(element.strip()) > 0]
207
if len(self._paths) == 0:
208
self._paths = [DEFAULT_PATH]
209
210
def choose_kubeconfig(self, cluster_name):
211
"""
212
Choose which kubeconfig file to read from.
213
If name is already an entry in one of the $KUBECONFIG files,
214
choose that one.
215
Otherwise choose the first file.
216
217
:param cluster_name: The name of the cluster which is going to be added
218
:type cluster_name: String
219
220
:return: a chosen Kubeconfig based on above rules
221
:rtype: Kubeconfig
222
"""
223
# Search for an existing entry to update
224
for candidate_path in self._paths:
225
try:
226
loaded_config = self._loader.load_kubeconfig(candidate_path)
227
228
if loaded_config.has_cluster(cluster_name):
229
LOG.debug("Found entry to update at {0}".format(
230
candidate_path
231
))
232
return loaded_config
233
except KubeconfigError as e:
234
LOG.warning("Passing {0}:{1}".format(candidate_path, e))
235
236
# No entry was found, use the first file in KUBECONFIG
237
#
238
# Note: This could raise KubeconfigErrors if paths[0] is corrupted
239
return self._loader.load_kubeconfig(self._paths[0])
240
241
def _expand_path(self, path):
242
""" A helper to expand a path to a full absolute path. """
243
return os.path.abspath(os.path.expanduser(path))
244
245
246
class EKSClient(object):
247
def __init__(self, session, parsed_args, parsed_globals=None):
248
self._session = session
249
self._cluster_name = parsed_args.cluster_name
250
self._cluster_description = None
251
self._parsed_globals = parsed_globals
252
self._parsed_args = parsed_args
253
254
@property
255
def cluster_description(self):
256
"""
257
Use an eks describe-cluster call to get the cluster description
258
Cache the response in self._cluster_description.
259
describe-cluster will only be called once.
260
"""
261
if self._cluster_description is not None:
262
return self._cluster_description
263
264
client_kwargs = {}
265
if self._parsed_globals:
266
client_kwargs.update({
267
"region_name": self._parsed_globals.region,
268
"endpoint_url": self._parsed_globals.endpoint_url,
269
"verify": self._parsed_globals.verify_ssl,
270
})
271
272
# Handle role assumption if needed
273
if getattr(self._parsed_args, 'assume_role_arn', None):
274
sts_client = create_nested_client(self._session, 'sts')
275
credentials = sts_client.assume_role(
276
RoleArn=self._parsed_args.assume_role_arn,
277
RoleSessionName='EKSDescribeClusterSession'
278
)["Credentials"]
279
280
client_kwargs.update({
281
"aws_access_key_id": credentials["AccessKeyId"],
282
"aws_secret_access_key": credentials["SecretAccessKey"],
283
"aws_session_token": credentials["SessionToken"],
284
})
285
286
client = create_nested_client(self._session, "eks", **client_kwargs)
287
full_description = client.describe_cluster(name=self._cluster_name)
288
cluster = full_description.get("cluster")
289
290
if not cluster or "status" not in cluster:
291
raise EKSClusterError("Cluster not found")
292
if cluster["status"] not in ["ACTIVE", "UPDATING"]:
293
raise EKSClusterError(f"Cluster status is {cluster['status']}")
294
295
self._cluster_description = cluster
296
return cluster
297
298
def get_cluster_entry(self):
299
"""
300
Return a cluster entry generated using
301
the previously obtained description.
302
"""
303
304
cert_data = self.cluster_description.get("certificateAuthority", {}).get("data", "")
305
endpoint = self.cluster_description.get("endpoint")
306
arn = self.cluster_description.get("arn")
307
308
return OrderedDict([
309
("cluster", OrderedDict([
310
("certificate-authority-data", cert_data),
311
("server", endpoint)
312
])),
313
("name", arn)
314
])
315
316
def get_user_entry(self, user_alias=None):
317
"""
318
Return a user entry generated using
319
the previously obtained description.
320
"""
321
region = self.cluster_description.get("arn").split(":")[3]
322
outpost_config = self.cluster_description.get("outpostConfig")
323
324
if outpost_config is None:
325
cluster_identification_parameter = "--cluster-name"
326
cluster_identification_value = self._cluster_name
327
else:
328
# If cluster contains outpostConfig, use id for identification
329
cluster_identification_parameter = "--cluster-id"
330
cluster_identification_value = self.cluster_description.get("id")
331
332
generated_user = OrderedDict([
333
("name", user_alias or self.cluster_description.get("arn", "")),
334
("user", OrderedDict([
335
("exec", OrderedDict([
336
("apiVersion", API_VERSION),
337
("args",
338
[
339
"--region",
340
region,
341
"eks",
342
"get-token",
343
cluster_identification_parameter,
344
cluster_identification_value,
345
"--output",
346
"json",
347
]),
348
("command", "aws"),
349
]))
350
]))
351
])
352
353
if self._parsed_args.role_arn is not None:
354
generated_user["user"]["exec"]["args"].extend([
355
"--role",
356
self._parsed_args.role_arn
357
])
358
359
if self._session.profile:
360
generated_user["user"]["exec"]["env"] = [OrderedDict([
361
("name", "AWS_PROFILE"),
362
("value", self._session.profile)
363
])]
364
365
return generated_user
366
367