Path: blob/develop/awscli/customizations/eks/update_kubeconfig.py
1567 views
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.1213import os14import logging1516from botocore.compat import OrderedDict1718from awscli.customizations.commands import BasicCommand19from awscli.customizations.utils import uni_print20from awscli.customizations.eks.exceptions import EKSClusterError21from awscli.customizations.eks.kubeconfig import (Kubeconfig,22KubeconfigError,23KubeconfigLoader,24KubeconfigWriter,25KubeconfigValidator,26KubeconfigAppender)27from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump28from awscli.utils import create_nested_client2930LOG = logging.getLogger(__name__)3132DEFAULT_PATH = os.path.expanduser("~/.kube/config")3334# At the time EKS no longer supports Kubernetes v1.21 (probably ~Dec 2023),35# this can be safely changed to default to writing "v1"36API_VERSION = "client.authentication.k8s.io/v1beta1"3738class UpdateKubeconfigCommand(BasicCommand):39NAME = 'update-kubeconfig'4041DESCRIPTION = BasicCommand.FROM_FILE(42'eks',43'update-kubeconfig',44'_description.rst'45)4647ARG_TABLE = [48{49'name': 'name',50'dest': 'cluster_name',51'help_text': ("The name of the cluster for which "52"to create a kubeconfig entry. "53"This cluster must exist in your account and in the "54"specified or configured default Region "55"for your AWS CLI installation."),56'required': True57},58{59'name': 'kubeconfig',60'help_text': ("Optionally specify a kubeconfig file to append "61"with your configuration. "62"By default, the configuration is written to the "63"first file path in the KUBECONFIG "64"environment variable (if it is set) "65"or the default kubeconfig path (.kube/config) "66"in your home directory."),67'required': False68},69{70'name': 'role-arn',71'help_text': ("To assume a role for cluster authentication, "72"specify an IAM role ARN with this option. "73"For example, if you created a cluster "74"while assuming an IAM role, "75"then you must also assume that role to "76"connect to the cluster the first time."),77'required': False78},79{80'name': 'dry-run',81'action': 'store_true',82'default': False,83'help_text': ("Print the merged kubeconfig to stdout instead of "84"writing it to the specified file."),85'required': False86},87{88'name': 'verbose',89'action': 'store_true',90'default': False,91'help_text': ("Print more detailed output "92"when writing to the kubeconfig file, "93"including the appended entries.")94},95{96'name': 'alias',97'help_text': ("Alias for the cluster context name. "98"Defaults to match cluster ARN."),99'required': False100},101{102'name': 'user-alias',103'help_text': ("Alias for the generated user name. "104"Defaults to match cluster ARN."),105'required': False106},107{108'name': 'assume-role-arn',109'help_text': ('To assume a role for retrieving cluster information, '110'specify an IAM role ARN with this option. '111'Use this for cross-account access to get cluster details '112'from the account where the cluster resides.'),113'required': False114}115]116117def _display_entries(self, entries):118"""119Display entries in yaml format120121:param entries: a list of OrderedDicts to be printed122:type entries: list123"""124uni_print("Entries:\n\n")125for entry in entries:126uni_print(ordered_yaml_dump(entry))127uni_print("\n")128129def _run_main(self, parsed_args, parsed_globals):130client = EKSClient(self._session,131parsed_args=parsed_args,132parsed_globals=parsed_globals)133new_cluster_dict = client.get_cluster_entry()134new_user_dict = client.get_user_entry(user_alias=parsed_args.user_alias)135136config_selector = KubeconfigSelector(137os.environ.get("KUBECONFIG", ""),138parsed_args.kubeconfig139)140config = config_selector.choose_kubeconfig(141new_cluster_dict["name"]142)143updating_existing = config.has_cluster(new_cluster_dict["name"])144appender = KubeconfigAppender()145new_context_dict = appender.insert_cluster_user_pair(config,146new_cluster_dict,147new_user_dict,148parsed_args.alias)149150if parsed_args.dry_run:151uni_print(config.dump_content())152else:153writer = KubeconfigWriter()154writer.write_kubeconfig(config)155156if updating_existing:157uni_print("Updated context {0} in {1}\n".format(158new_context_dict["name"], config.path159))160else:161uni_print("Added new context {0} to {1}\n".format(162new_context_dict["name"], config.path163))164165if parsed_args.verbose:166self._display_entries([167new_context_dict,168new_user_dict,169new_cluster_dict170])171172173174class KubeconfigSelector(object):175176def __init__(self, env_variable, path_in, validator=None,177loader=None):178"""179Parse KUBECONFIG into a list of absolute paths.180Also replace the empty list with DEFAULT_PATH181182:param env_variable: KUBECONFIG as a long string183:type env_variable: string184185:param path_in: The path passed in through the CLI186:type path_in: string or None187"""188if validator is None:189validator = KubeconfigValidator()190self._validator = validator191192if loader is None:193loader = KubeconfigLoader(validator)194self._loader = loader195196if path_in is not None:197# Override environment variable198self._paths = [self._expand_path(path_in)]199else:200# Get the list of paths from the environment variable201if env_variable == "":202env_variable = DEFAULT_PATH203self._paths = [self._expand_path(element)204for element in env_variable.split(os.pathsep)205if len(element.strip()) > 0]206if len(self._paths) == 0:207self._paths = [DEFAULT_PATH]208209def choose_kubeconfig(self, cluster_name):210"""211Choose which kubeconfig file to read from.212If name is already an entry in one of the $KUBECONFIG files,213choose that one.214Otherwise choose the first file.215216:param cluster_name: The name of the cluster which is going to be added217:type cluster_name: String218219:return: a chosen Kubeconfig based on above rules220:rtype: Kubeconfig221"""222# Search for an existing entry to update223for candidate_path in self._paths:224try:225loaded_config = self._loader.load_kubeconfig(candidate_path)226227if loaded_config.has_cluster(cluster_name):228LOG.debug("Found entry to update at {0}".format(229candidate_path230))231return loaded_config232except KubeconfigError as e:233LOG.warning("Passing {0}:{1}".format(candidate_path, e))234235# No entry was found, use the first file in KUBECONFIG236#237# Note: This could raise KubeconfigErrors if paths[0] is corrupted238return self._loader.load_kubeconfig(self._paths[0])239240def _expand_path(self, path):241""" A helper to expand a path to a full absolute path. """242return os.path.abspath(os.path.expanduser(path))243244245class EKSClient(object):246def __init__(self, session, parsed_args, parsed_globals=None):247self._session = session248self._cluster_name = parsed_args.cluster_name249self._cluster_description = None250self._parsed_globals = parsed_globals251self._parsed_args = parsed_args252253@property254def cluster_description(self):255"""256Use an eks describe-cluster call to get the cluster description257Cache the response in self._cluster_description.258describe-cluster will only be called once.259"""260if self._cluster_description is not None:261return self._cluster_description262263client_kwargs = {}264if self._parsed_globals:265client_kwargs.update({266"region_name": self._parsed_globals.region,267"endpoint_url": self._parsed_globals.endpoint_url,268"verify": self._parsed_globals.verify_ssl,269})270271# Handle role assumption if needed272if getattr(self._parsed_args, 'assume_role_arn', None):273sts_client = create_nested_client(self._session, 'sts')274credentials = sts_client.assume_role(275RoleArn=self._parsed_args.assume_role_arn,276RoleSessionName='EKSDescribeClusterSession'277)["Credentials"]278279client_kwargs.update({280"aws_access_key_id": credentials["AccessKeyId"],281"aws_secret_access_key": credentials["SecretAccessKey"],282"aws_session_token": credentials["SessionToken"],283})284285client = create_nested_client(self._session, "eks", **client_kwargs)286full_description = client.describe_cluster(name=self._cluster_name)287cluster = full_description.get("cluster")288289if not cluster or "status" not in cluster:290raise EKSClusterError("Cluster not found")291if cluster["status"] not in ["ACTIVE", "UPDATING"]:292raise EKSClusterError(f"Cluster status is {cluster['status']}")293294self._cluster_description = cluster295return cluster296297def get_cluster_entry(self):298"""299Return a cluster entry generated using300the previously obtained description.301"""302303cert_data = self.cluster_description.get("certificateAuthority", {}).get("data", "")304endpoint = self.cluster_description.get("endpoint")305arn = self.cluster_description.get("arn")306307return OrderedDict([308("cluster", OrderedDict([309("certificate-authority-data", cert_data),310("server", endpoint)311])),312("name", arn)313])314315def get_user_entry(self, user_alias=None):316"""317Return a user entry generated using318the previously obtained description.319"""320region = self.cluster_description.get("arn").split(":")[3]321outpost_config = self.cluster_description.get("outpostConfig")322323if outpost_config is None:324cluster_identification_parameter = "--cluster-name"325cluster_identification_value = self._cluster_name326else:327# If cluster contains outpostConfig, use id for identification328cluster_identification_parameter = "--cluster-id"329cluster_identification_value = self.cluster_description.get("id")330331generated_user = OrderedDict([332("name", user_alias or self.cluster_description.get("arn", "")),333("user", OrderedDict([334("exec", OrderedDict([335("apiVersion", API_VERSION),336("args",337[338"--region",339region,340"eks",341"get-token",342cluster_identification_parameter,343cluster_identification_value,344"--output",345"json",346]),347("command", "aws"),348]))349]))350])351352if self._parsed_args.role_arn is not None:353generated_user["user"]["exec"]["args"].extend([354"--role",355self._parsed_args.role_arn356])357358if self._session.profile:359generated_user["user"]["exec"]["env"] = [OrderedDict([360("name", "AWS_PROFILE"),361("value", self._session.profile)362])]363364return generated_user365366367