Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/eks/test_kubeconfig.py
1569 views
1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
14
import glob
15
import os
16
import tempfile
17
import shutil
18
from botocore.compat import OrderedDict
19
20
from awscli.testutils import mock, unittest, skip_if_windows
21
from awscli.customizations.utils import uni_print
22
from awscli.customizations.eks.kubeconfig import (KubeconfigError,
23
KubeconfigInaccessableError,
24
KubeconfigCorruptedError,
25
Kubeconfig,
26
KubeconfigValidator,
27
KubeconfigLoader,
28
KubeconfigAppender,
29
KubeconfigWriter,
30
_get_new_kubeconfig_content,
31
)
32
from awscli.customizations.eks.exceptions import EKSError
33
from awscli.customizations.eks.ordered_yaml import ordered_yaml_load
34
from tests.functional.eks.test_util import get_testdata
35
36
class TestKubeconfig(unittest.TestCase):
37
def setUp(self):
38
self._content = OrderedDict([
39
("apiVersion", "v1")
40
])
41
self._path = "/some_path"
42
43
def test_no_content(self):
44
config = Kubeconfig(self._path, None)
45
self.assertEqual(config.content,
46
_get_new_kubeconfig_content())
47
48
def test_has_cluster(self):
49
self._content["clusters"] = [
50
OrderedDict([
51
("cluster", None),
52
("name", "clustername")
53
]),
54
OrderedDict([
55
("cluster", None),
56
("name", "anotherclustername")
57
])
58
]
59
60
config = Kubeconfig(self._path, self._content)
61
self.assertTrue(config.has_cluster("clustername"))
62
self.assertFalse(config.has_cluster("othercluster"))
63
64
def test_has_cluster_with_no_clusters(self):
65
config = Kubeconfig(self._path, self._content)
66
self.assertFalse(config.has_cluster("clustername"))
67
68
def test_has_cluster_with_none_clusters(self):
69
self._content["clusters"] = None
70
71
config = Kubeconfig(self._path, self._content)
72
self.assertFalse(config.has_cluster("anyclustername"))
73
74
def test_has_cluster_with_cluster_no_name(self):
75
self._content["clusters"] = [
76
OrderedDict([
77
("cluster", None),
78
("name", "clustername")
79
]),
80
OrderedDict([
81
("cluster", None),
82
("name", None),
83
]),
84
OrderedDict([
85
("cluster", None),
86
]),
87
]
88
89
config = Kubeconfig(self._path, self._content)
90
self.assertTrue(config.has_cluster("clustername"))
91
self.assertFalse(config.has_cluster("othercluster"))
92
93
94
class TestKubeconfigWriter(unittest.TestCase):
95
96
@skip_if_windows('Windows does not support such permissions set up')
97
def test_not_world_readable(self):
98
tmpdir = tempfile.mkdtemp()
99
self.addCleanup(shutil.rmtree, tmpdir)
100
config_path = os.path.join(tmpdir, "config")
101
config = Kubeconfig(config_path, None)
102
KubeconfigWriter().write_kubeconfig(config)
103
stat = os.stat(config_path)
104
self.assertEqual(stat.st_mode & 0o777, 0o600)
105
106
def test_truncates(self):
107
tmpdir = tempfile.mkdtemp()
108
self.addCleanup(shutil.rmtree, tmpdir)
109
config_path = os.path.join(tmpdir, "config")
110
with open(config_path, "w+") as f:
111
f.write("#" * 100)
112
KubeconfigWriter().write_kubeconfig(Kubeconfig(config_path, {}))
113
empty_stat = os.stat(config_path)
114
self.assertLessEqual(empty_stat.st_size, 4, "file should be '{}[newline]', 3/4 bytes long ")
115
116
117
class TestKubeconfigValidator(unittest.TestCase):
118
def setUp(self):
119
self._validator = KubeconfigValidator()
120
121
def test_valid(self):
122
valid_cases = glob.glob(get_testdata( "valid_*" ))
123
for case in valid_cases:
124
with open(case, 'r') as stream:
125
content_dict = ordered_yaml_load(stream)
126
if content_dict is not None:
127
config = Kubeconfig(None, content_dict)
128
try:
129
self._validator.validate_config(config)
130
except KubeconfigError as e:
131
self.fail("Valid file {0} raised {1}.".format(case, e))
132
133
def test_invalid(self):
134
invalid_cases = glob.glob(get_testdata("invalid_*"))
135
for case in invalid_cases:
136
with open(case, 'r') as stream:
137
content_dict = ordered_yaml_load(stream)
138
config = Kubeconfig(None, content_dict)
139
self.assertRaises(KubeconfigCorruptedError,
140
self._validator.validate_config,
141
config)
142
143
class TestKubeconfigAppender(unittest.TestCase):
144
def setUp(self):
145
self._appender = KubeconfigAppender()
146
147
def test_basic_insert(self):
148
initial = OrderedDict([
149
("apiVersion", "v1"),
150
("clusters", [
151
OrderedDict([
152
("cluster", OrderedDict([
153
("certificate-authority-data", "data1"),
154
("server", "endpoint1")
155
])),
156
("name", "oldclustername")
157
])
158
]),
159
("contexts", []),
160
("current-context", "simple"),
161
("kind", "Config"),
162
("preferences", OrderedDict()),
163
("users", [])
164
])
165
cluster = OrderedDict([
166
("cluster", OrderedDict([
167
("certificate-authority-data", "data2"),
168
("server", "endpoint2")
169
])),
170
("name", "clustername")
171
])
172
cluster_added_correct = OrderedDict([
173
("apiVersion", "v1"),
174
("clusters", [
175
OrderedDict([
176
("cluster", OrderedDict([
177
("certificate-authority-data", "data1"),
178
("server", "endpoint1")
179
])),
180
("name", "oldclustername")
181
]),
182
OrderedDict([
183
("cluster", OrderedDict([
184
("certificate-authority-data", "data2"),
185
("server", "endpoint2")
186
])),
187
("name", "clustername")
188
])
189
]),
190
("contexts", []),
191
("current-context", "simple"),
192
("kind", "Config"),
193
("preferences", OrderedDict()),
194
("users", [])
195
])
196
cluster_added = self._appender.insert_entry(Kubeconfig(None, initial),
197
"clusters",
198
cluster)
199
self.assertDictEqual(cluster_added.content, cluster_added_correct)
200
201
def test_update_existing(self):
202
initial = OrderedDict([
203
("apiVersion", "v1"),
204
("clusters", [
205
OrderedDict([
206
("cluster", OrderedDict([
207
("server", "endpoint")
208
])),
209
("name", "clustername")
210
])
211
]),
212
("contexts", []),
213
("current-context", None),
214
("kind", "Config"),
215
("preferences", OrderedDict()),
216
("users", [])
217
])
218
cluster = OrderedDict([
219
("cluster", OrderedDict([
220
("certificate-authority-data", "data"),
221
("server", "endpoint")
222
])),
223
("name", "clustername")
224
])
225
correct = OrderedDict([
226
("apiVersion", "v1"),
227
("clusters", [
228
OrderedDict([
229
("cluster", OrderedDict([
230
("certificate-authority-data", "data"),
231
("server", "endpoint")
232
])),
233
("name", "clustername")
234
])
235
]),
236
("contexts", []),
237
("current-context", None),
238
("kind", "Config"),
239
("preferences", OrderedDict()),
240
("users", [])
241
])
242
updated = self._appender.insert_entry(Kubeconfig(None, initial),
243
"clusters",
244
cluster)
245
self.assertDictEqual(updated.content, correct)
246
247
def test_key_not_exist(self):
248
initial = OrderedDict([
249
("apiVersion", "v1"),
250
("contexts", []),
251
("current-context", None),
252
("kind", "Config"),
253
("preferences", OrderedDict()),
254
("users", [])
255
])
256
cluster = OrderedDict([
257
("cluster", OrderedDict([
258
("certificate-authority-data", "data"),
259
("server", "endpoint")
260
])),
261
("name", "clustername")
262
])
263
correct = OrderedDict([
264
("apiVersion", "v1"),
265
("contexts", []),
266
("current-context", None),
267
("kind", "Config"),
268
("preferences", OrderedDict()),
269
("users", []),
270
("clusters", [
271
OrderedDict([
272
("cluster", OrderedDict([
273
("certificate-authority-data", "data"),
274
("server", "endpoint")
275
])),
276
("name", "clustername")
277
])
278
])
279
])
280
updated = self._appender.insert_entry(Kubeconfig(None, initial),
281
"clusters",
282
cluster)
283
self.assertDictEqual(updated.content, correct)
284
285
def test_key_none(self):
286
initial = OrderedDict([
287
("apiVersion", "v1"),
288
("clusters", None),
289
("contexts", []),
290
("current-context", None),
291
("kind", "Config"),
292
("preferences", OrderedDict()),
293
("users", [])
294
])
295
cluster = OrderedDict([
296
("cluster", OrderedDict([
297
("certificate-authority-data", "data"),
298
("server", "endpoint")
299
])),
300
("name", "clustername")
301
])
302
correct = OrderedDict([
303
("apiVersion", "v1"),
304
("clusters", [
305
OrderedDict([
306
("cluster", OrderedDict([
307
("certificate-authority-data", "data"),
308
("server", "endpoint")
309
])),
310
("name", "clustername")
311
])
312
]),
313
("contexts", []),
314
("current-context", None),
315
("kind", "Config"),
316
("preferences", OrderedDict()),
317
("users", []),
318
])
319
updated = self._appender.insert_entry(Kubeconfig(None, initial),
320
"clusters",
321
cluster)
322
self.assertDictEqual(updated.content, correct)
323
324
def test_key_not_array(self):
325
initial = OrderedDict([
326
("apiVersion", "v1"),
327
("contexts", []),
328
("current-context", None),
329
("kind", "Config"),
330
("preferences", OrderedDict()),
331
("users", [])
332
])
333
cluster = OrderedDict([
334
("cluster", OrderedDict([
335
("certificate-authority-data", "data"),
336
("server", "endpoint")
337
])),
338
("name", "clustername")
339
])
340
self.assertRaises(KubeconfigError,
341
self._appender.insert_entry,
342
Kubeconfig(None, initial),
343
"kind",
344
cluster)
345
346
def test_make_context(self):
347
cluster = OrderedDict([
348
("name", "clustername"),
349
("cluster", OrderedDict())
350
])
351
user = OrderedDict([
352
("name", "username"),
353
("user", OrderedDict())
354
])
355
context_correct = OrderedDict([
356
("context", OrderedDict([
357
("cluster", "clustername"),
358
("user", "username")
359
])),
360
("name", "username")
361
])
362
context = self._appender._make_context(cluster, user)
363
self.assertDictEqual(context, context_correct)
364
365
def test_make_context_alias(self):
366
cluster = OrderedDict([
367
("name", "clustername"),
368
("cluster", OrderedDict())
369
])
370
user = OrderedDict([
371
("name", "username"),
372
("user", OrderedDict())
373
])
374
context_correct = OrderedDict([
375
("context", OrderedDict([
376
("cluster", "clustername"),
377
("user", "username")
378
])),
379
("name", "alias")
380
])
381
alias = "alias"
382
context = self._appender._make_context(cluster, user, alias=alias)
383
self.assertDictEqual(context, context_correct)
384
385