Path: blob/develop/tests/unit/customizations/eks/test_kubeconfig.py
1569 views
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.1213import glob14import os15import tempfile16import shutil17from botocore.compat import OrderedDict1819from awscli.testutils import mock, unittest, skip_if_windows20from awscli.customizations.utils import uni_print21from awscli.customizations.eks.kubeconfig import (KubeconfigError,22KubeconfigInaccessableError,23KubeconfigCorruptedError,24Kubeconfig,25KubeconfigValidator,26KubeconfigLoader,27KubeconfigAppender,28KubeconfigWriter,29_get_new_kubeconfig_content,30)31from awscli.customizations.eks.exceptions import EKSError32from awscli.customizations.eks.ordered_yaml import ordered_yaml_load33from tests.functional.eks.test_util import get_testdata3435class TestKubeconfig(unittest.TestCase):36def setUp(self):37self._content = OrderedDict([38("apiVersion", "v1")39])40self._path = "/some_path"4142def test_no_content(self):43config = Kubeconfig(self._path, None)44self.assertEqual(config.content,45_get_new_kubeconfig_content())4647def test_has_cluster(self):48self._content["clusters"] = [49OrderedDict([50("cluster", None),51("name", "clustername")52]),53OrderedDict([54("cluster", None),55("name", "anotherclustername")56])57]5859config = Kubeconfig(self._path, self._content)60self.assertTrue(config.has_cluster("clustername"))61self.assertFalse(config.has_cluster("othercluster"))6263def test_has_cluster_with_no_clusters(self):64config = Kubeconfig(self._path, self._content)65self.assertFalse(config.has_cluster("clustername"))6667def test_has_cluster_with_none_clusters(self):68self._content["clusters"] = None6970config = Kubeconfig(self._path, self._content)71self.assertFalse(config.has_cluster("anyclustername"))7273def test_has_cluster_with_cluster_no_name(self):74self._content["clusters"] = [75OrderedDict([76("cluster", None),77("name", "clustername")78]),79OrderedDict([80("cluster", None),81("name", None),82]),83OrderedDict([84("cluster", None),85]),86]8788config = Kubeconfig(self._path, self._content)89self.assertTrue(config.has_cluster("clustername"))90self.assertFalse(config.has_cluster("othercluster"))919293class TestKubeconfigWriter(unittest.TestCase):9495@skip_if_windows('Windows does not support such permissions set up')96def test_not_world_readable(self):97tmpdir = tempfile.mkdtemp()98self.addCleanup(shutil.rmtree, tmpdir)99config_path = os.path.join(tmpdir, "config")100config = Kubeconfig(config_path, None)101KubeconfigWriter().write_kubeconfig(config)102stat = os.stat(config_path)103self.assertEqual(stat.st_mode & 0o777, 0o600)104105def test_truncates(self):106tmpdir = tempfile.mkdtemp()107self.addCleanup(shutil.rmtree, tmpdir)108config_path = os.path.join(tmpdir, "config")109with open(config_path, "w+") as f:110f.write("#" * 100)111KubeconfigWriter().write_kubeconfig(Kubeconfig(config_path, {}))112empty_stat = os.stat(config_path)113self.assertLessEqual(empty_stat.st_size, 4, "file should be '{}[newline]', 3/4 bytes long ")114115116class TestKubeconfigValidator(unittest.TestCase):117def setUp(self):118self._validator = KubeconfigValidator()119120def test_valid(self):121valid_cases = glob.glob(get_testdata( "valid_*" ))122for case in valid_cases:123with open(case, 'r') as stream:124content_dict = ordered_yaml_load(stream)125if content_dict is not None:126config = Kubeconfig(None, content_dict)127try:128self._validator.validate_config(config)129except KubeconfigError as e:130self.fail("Valid file {0} raised {1}.".format(case, e))131132def test_invalid(self):133invalid_cases = glob.glob(get_testdata("invalid_*"))134for case in invalid_cases:135with open(case, 'r') as stream:136content_dict = ordered_yaml_load(stream)137config = Kubeconfig(None, content_dict)138self.assertRaises(KubeconfigCorruptedError,139self._validator.validate_config,140config)141142class TestKubeconfigAppender(unittest.TestCase):143def setUp(self):144self._appender = KubeconfigAppender()145146def test_basic_insert(self):147initial = OrderedDict([148("apiVersion", "v1"),149("clusters", [150OrderedDict([151("cluster", OrderedDict([152("certificate-authority-data", "data1"),153("server", "endpoint1")154])),155("name", "oldclustername")156])157]),158("contexts", []),159("current-context", "simple"),160("kind", "Config"),161("preferences", OrderedDict()),162("users", [])163])164cluster = OrderedDict([165("cluster", OrderedDict([166("certificate-authority-data", "data2"),167("server", "endpoint2")168])),169("name", "clustername")170])171cluster_added_correct = OrderedDict([172("apiVersion", "v1"),173("clusters", [174OrderedDict([175("cluster", OrderedDict([176("certificate-authority-data", "data1"),177("server", "endpoint1")178])),179("name", "oldclustername")180]),181OrderedDict([182("cluster", OrderedDict([183("certificate-authority-data", "data2"),184("server", "endpoint2")185])),186("name", "clustername")187])188]),189("contexts", []),190("current-context", "simple"),191("kind", "Config"),192("preferences", OrderedDict()),193("users", [])194])195cluster_added = self._appender.insert_entry(Kubeconfig(None, initial),196"clusters",197cluster)198self.assertDictEqual(cluster_added.content, cluster_added_correct)199200def test_update_existing(self):201initial = OrderedDict([202("apiVersion", "v1"),203("clusters", [204OrderedDict([205("cluster", OrderedDict([206("server", "endpoint")207])),208("name", "clustername")209])210]),211("contexts", []),212("current-context", None),213("kind", "Config"),214("preferences", OrderedDict()),215("users", [])216])217cluster = OrderedDict([218("cluster", OrderedDict([219("certificate-authority-data", "data"),220("server", "endpoint")221])),222("name", "clustername")223])224correct = OrderedDict([225("apiVersion", "v1"),226("clusters", [227OrderedDict([228("cluster", OrderedDict([229("certificate-authority-data", "data"),230("server", "endpoint")231])),232("name", "clustername")233])234]),235("contexts", []),236("current-context", None),237("kind", "Config"),238("preferences", OrderedDict()),239("users", [])240])241updated = self._appender.insert_entry(Kubeconfig(None, initial),242"clusters",243cluster)244self.assertDictEqual(updated.content, correct)245246def test_key_not_exist(self):247initial = OrderedDict([248("apiVersion", "v1"),249("contexts", []),250("current-context", None),251("kind", "Config"),252("preferences", OrderedDict()),253("users", [])254])255cluster = OrderedDict([256("cluster", OrderedDict([257("certificate-authority-data", "data"),258("server", "endpoint")259])),260("name", "clustername")261])262correct = OrderedDict([263("apiVersion", "v1"),264("contexts", []),265("current-context", None),266("kind", "Config"),267("preferences", OrderedDict()),268("users", []),269("clusters", [270OrderedDict([271("cluster", OrderedDict([272("certificate-authority-data", "data"),273("server", "endpoint")274])),275("name", "clustername")276])277])278])279updated = self._appender.insert_entry(Kubeconfig(None, initial),280"clusters",281cluster)282self.assertDictEqual(updated.content, correct)283284def test_key_none(self):285initial = OrderedDict([286("apiVersion", "v1"),287("clusters", None),288("contexts", []),289("current-context", None),290("kind", "Config"),291("preferences", OrderedDict()),292("users", [])293])294cluster = OrderedDict([295("cluster", OrderedDict([296("certificate-authority-data", "data"),297("server", "endpoint")298])),299("name", "clustername")300])301correct = OrderedDict([302("apiVersion", "v1"),303("clusters", [304OrderedDict([305("cluster", OrderedDict([306("certificate-authority-data", "data"),307("server", "endpoint")308])),309("name", "clustername")310])311]),312("contexts", []),313("current-context", None),314("kind", "Config"),315("preferences", OrderedDict()),316("users", []),317])318updated = self._appender.insert_entry(Kubeconfig(None, initial),319"clusters",320cluster)321self.assertDictEqual(updated.content, correct)322323def test_key_not_array(self):324initial = OrderedDict([325("apiVersion", "v1"),326("contexts", []),327("current-context", None),328("kind", "Config"),329("preferences", OrderedDict()),330("users", [])331])332cluster = OrderedDict([333("cluster", OrderedDict([334("certificate-authority-data", "data"),335("server", "endpoint")336])),337("name", "clustername")338])339self.assertRaises(KubeconfigError,340self._appender.insert_entry,341Kubeconfig(None, initial),342"kind",343cluster)344345def test_make_context(self):346cluster = OrderedDict([347("name", "clustername"),348("cluster", OrderedDict())349])350user = OrderedDict([351("name", "username"),352("user", OrderedDict())353])354context_correct = OrderedDict([355("context", OrderedDict([356("cluster", "clustername"),357("user", "username")358])),359("name", "username")360])361context = self._appender._make_context(cluster, user)362self.assertDictEqual(context, context_correct)363364def test_make_context_alias(self):365cluster = OrderedDict([366("name", "clustername"),367("cluster", OrderedDict())368])369user = OrderedDict([370("name", "username"),371("user", OrderedDict())372])373context_correct = OrderedDict([374("context", OrderedDict([375("cluster", "clustername"),376("user", "username")377])),378("name", "alias")379])380alias = "alias"381context = self._appender._make_context(cluster, user, alias=alias)382self.assertDictEqual(context, context_correct)383384385