Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/eks/kubeconfig.py
1567 views
1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import os
15
import yaml
16
import logging
17
import errno
18
from botocore.compat import OrderedDict
19
20
from awscli.customizations.eks.exceptions import EKSError
21
from awscli.customizations.eks.ordered_yaml import (ordered_yaml_load,
22
ordered_yaml_dump)
23
24
25
class KubeconfigError(EKSError):
26
""" Base class for all kubeconfig errors."""
27
28
29
class KubeconfigCorruptedError(KubeconfigError):
30
""" Raised when a kubeconfig cannot be parsed."""
31
32
33
class KubeconfigInaccessableError(KubeconfigError):
34
""" Raised when a kubeconfig cannot be opened for read/writing."""
35
36
37
def _get_new_kubeconfig_content():
38
return OrderedDict([
39
("apiVersion", "v1"),
40
("clusters", []),
41
("contexts", []),
42
("current-context", ""),
43
("kind", "Config"),
44
("preferences", OrderedDict()),
45
("users", [])
46
])
47
48
49
class Kubeconfig(object):
50
def __init__(self, path, content=None):
51
self.path = path
52
if content is None:
53
content = _get_new_kubeconfig_content()
54
self.content = content
55
56
def dump_content(self):
57
""" Return the stored content in yaml format. """
58
return ordered_yaml_dump(self.content)
59
60
def has_cluster(self, name):
61
"""
62
Return true if this kubeconfig contains an entry
63
For the passed cluster name.
64
"""
65
if self.content.get('clusters') is None:
66
return False
67
return name in [cluster['name']
68
for cluster in self.content['clusters'] if 'name' in cluster]
69
70
def __eq__(self, other):
71
return (
72
isinstance(other, Kubeconfig)
73
and self.path == other.path
74
and self.content == other.content
75
)
76
77
78
class KubeconfigValidator(object):
79
def __init__(self):
80
# Validation_content is an empty Kubeconfig
81
# It is used as a way to know what types different entries should be
82
self._validation_content = Kubeconfig(None, None).content
83
84
def validate_config(self, config):
85
"""
86
Raises KubeconfigCorruptedError if the passed content is invalid
87
88
:param config: The config to validate
89
:type config: Kubeconfig
90
"""
91
if not isinstance(config, Kubeconfig):
92
raise KubeconfigCorruptedError("Internal error: "
93
f"Not a {Kubeconfig}.")
94
self._validate_config_types(config)
95
self._validate_list_entry_types(config)
96
97
def _validate_config_types(self, config):
98
"""
99
Raises KubeconfigCorruptedError if any of the entries in config
100
are the wrong type
101
102
:param config: The config to validate
103
:type config: Kubeconfig
104
"""
105
if not isinstance(config.content, dict):
106
raise KubeconfigCorruptedError(f"Content not a {dict}.")
107
for key, value in self._validation_content.items():
108
if (key in config.content and
109
config.content[key] is not None and
110
not isinstance(config.content[key], type(value))):
111
raise KubeconfigCorruptedError(
112
f"{key} is wrong type: {type(config.content[key])} "
113
f"(Should be {type(value)})"
114
)
115
116
def _validate_list_entry_types(self, config):
117
"""
118
Raises KubeconfigCorruptedError if any lists in config contain objects
119
which are not dictionaries
120
121
:param config: The config to validate
122
:type config: Kubeconfig
123
"""
124
for key, value in self._validation_content.items():
125
if (key in config.content and
126
type(config.content[key]) == list):
127
for element in config.content[key]:
128
if not isinstance(element, OrderedDict):
129
raise KubeconfigCorruptedError(
130
f"Entry in {key} not a {dict}. ")
131
132
133
class KubeconfigLoader(object):
134
def __init__(self, validator = None):
135
if validator is None:
136
validator=KubeconfigValidator()
137
self._validator=validator
138
139
def load_kubeconfig(self, path):
140
"""
141
Loads the kubeconfig found at the given path.
142
If no file is found at the given path,
143
Generate a new kubeconfig to write back.
144
If the kubeconfig is valid, loads the content from it.
145
If the kubeconfig is invalid, throw the relevant exception.
146
147
:param path: The path to load a kubeconfig from
148
:type path: string
149
150
:raises KubeconfigInaccessableError: if the kubeconfig can't be opened
151
:raises KubeconfigCorruptedError: if the kubeconfig is invalid
152
153
:return: The loaded kubeconfig
154
:rtype: Kubeconfig
155
"""
156
try:
157
with open(path, "r") as stream:
158
loaded_content=ordered_yaml_load(stream)
159
except IOError as e:
160
if e.errno == errno.ENOENT:
161
loaded_content=None
162
else:
163
raise KubeconfigInaccessableError(
164
f"Can't open kubeconfig for reading: {e}")
165
except yaml.YAMLError as e:
166
raise KubeconfigCorruptedError(
167
f"YamlError while loading kubeconfig: {e}")
168
169
loaded_config=Kubeconfig(path, loaded_content)
170
self._validator.validate_config(loaded_config)
171
172
return loaded_config
173
174
175
class KubeconfigWriter(object):
176
def write_kubeconfig(self, config):
177
"""
178
Write config to disk.
179
OK if the file doesn't exist.
180
181
:param config: The kubeconfig to write
182
:type config: Kubeconfig
183
184
:raises KubeconfigInaccessableError: if the kubeconfig
185
can't be opened for writing
186
"""
187
directory=os.path.dirname(config.path)
188
189
try:
190
os.makedirs(directory)
191
except OSError as e:
192
if e.errno != errno.EEXIST:
193
raise KubeconfigInaccessableError(
194
f"Can't create directory for writing: {e}")
195
try:
196
with os.fdopen(
197
os.open(
198
config.path,
199
os.O_CREAT | os.O_RDWR | os.O_TRUNC,
200
0o600),
201
"w+") as stream:
202
ordered_yaml_dump(config.content, stream)
203
except (IOError, OSError) as e:
204
raise KubeconfigInaccessableError(
205
f"Can't open kubeconfig for writing: {e}")
206
207
208
class KubeconfigAppender(object):
209
def insert_entry(self, config, key, new_entry):
210
"""
211
Insert entry into the entries list at content[key]
212
Overwrite an existing entry if they share the same name
213
214
:param config: The kubeconfig to insert an entry into
215
:type config: Kubeconfig
216
"""
217
entries=self._setdefault_existing_entries(config, key)
218
same_name_index=self._index_same_name(entries, new_entry)
219
if same_name_index is None:
220
entries.append(new_entry)
221
else:
222
entries[same_name_index]=new_entry
223
return config
224
225
def _setdefault_existing_entries(self, config, key):
226
config.content[key]=config.content.get(key) or []
227
entries=config.content[key]
228
if not isinstance(entries, list):
229
raise KubeconfigError(f"Tried to insert into {key}, "
230
f"which is a {type(entries)} "
231
f"not a {list}")
232
return entries
233
234
def _index_same_name(self, entries, new_entry):
235
if "name" in new_entry:
236
name_to_search=new_entry["name"]
237
for i, entry in enumerate(entries):
238
if "name" in entry and entry["name"] == name_to_search:
239
return i
240
return None
241
242
def _make_context(self, cluster, user, alias = None):
243
""" Generate a context to associate cluster and user with a given alias."""
244
return OrderedDict([
245
("context", OrderedDict([
246
("cluster", cluster["name"]),
247
("user", user["name"])
248
])),
249
("name", alias or user["name"])
250
])
251
252
def insert_cluster_user_pair(self, config, cluster, user, alias = None):
253
"""
254
Insert the passed cluster entry and user entry,
255
then make a context to associate them
256
and set current-context to be the new context.
257
Returns the new context
258
259
:param config: the Kubeconfig to insert the pair into
260
:type config: Kubeconfig
261
262
:param cluster: the cluster entry
263
:type cluster: OrderedDict
264
265
:param user: the user entry
266
:type user: OrderedDict
267
268
:param alias: the alias for the context; defaults top user entry name
269
:type context: str
270
271
:return: The generated context
272
:rtype: OrderedDict
273
"""
274
context=self._make_context(cluster, user, alias = alias)
275
self.insert_entry(config, "clusters", cluster)
276
self.insert_entry(config, "users", user)
277
self.insert_entry(config, "contexts", context)
278
279
config.content["current-context"]=context["name"]
280
281
return context
282
283