Path: blob/develop/awscli/customizations/eks/update_kubeconfig.py
2624 views
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.1#2# Licensed under the Apache License, Version 2.0 (the "License"). You3# may not use this file except in compliance with the License. A copy of4# the License is located at5#6# http://aws.amazon.com/apache2.0/7#8# or in the "license" file accompanying this file. This file is9# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF10# ANY KIND, either express or implied. See the License for the specific11# language governing permissions and limitations under the License.1213import os14import logging1516from botocore.compat import OrderedDict1718from awscli.customizations.commands import BasicCommand19from awscli.customizations.utils import uni_print20from awscli.customizations.eks.exceptions import EKSClusterError21from awscli.customizations.eks.kubeconfig import (Kubeconfig,22KubeconfigError,23KubeconfigLoader,24KubeconfigWriter,25KubeconfigValidator,26KubeconfigAppender)27from awscli.customizations.eks.ordered_yaml import ordered_yaml_dump28from awscli.utils import create_nested_client2930LOG = logging.getLogger(__name__)3132DEFAULT_PATH = os.path.expanduser("~/.kube/config")3334# At the time EKS no longer supports Kubernetes v1.21 (probably ~Dec 2023),35# this can be safely changed to default to writing "v1"36API_VERSION = "client.authentication.k8s.io/v1beta1"3738class UpdateKubeconfigCommand(BasicCommand):39NAME = 'update-kubeconfig'4041DESCRIPTION = BasicCommand.FROM_FILE(42'eks',43'update-kubeconfig',44'_description.rst'45)4647ARG_TABLE = [48{49'name': 'name',50'dest': 'cluster_name',51'help_text': ("The name of the cluster for which "52"to create a kubeconfig entry. "53"This cluster must exist in your account and in the "54"specified or configured default Region "55"for your AWS CLI installation."),56'required': True57},58{59'name': 'kubeconfig',60'help_text': ("Optionally specify a kubeconfig file to append "61"with your configuration. "62"By default, the configuration is written to the "63"first file path in the KUBECONFIG "64"environment variable (if it is set) "65"or the default kubeconfig path (.kube/config) "66"in your home directory."),67'required': False68},69{70'name': 'role-arn',71'help_text': ("To assume a role for cluster authentication, "72"specify an IAM role ARN with this option. "73"For example, if you created a cluster "74"while assuming an IAM role, "75"then you must also assume that role to "76"connect to the cluster the first time."),77'required': False78},79{80'name': 'proxy-url',81'help_text': ("Optionally specify a proxy url to route "82"traffic via when connecting to a cluster."),83'required': False84},85{86'name': 'dry-run',87'action': 'store_true',88'default': False,89'help_text': ("Print the merged kubeconfig to stdout instead of "90"writing it to the specified file."),91'required': False92},93{94'name': 'verbose',95'action': 'store_true',96'default': False,97'help_text': ("Print more detailed output "98"when writing to the kubeconfig file, "99"including the appended entries.")100},101{102'name': 'alias',103'help_text': ("Alias for the cluster context name. "104"Defaults to match cluster ARN."),105'required': False106},107{108'name': 'user-alias',109'help_text': ("Alias for the generated user name. "110"Defaults to match cluster ARN."),111'required': False112},113{114'name': 'assume-role-arn',115'help_text': ('To assume a role for retrieving cluster information, '116'specify an IAM role ARN with this option. '117'Use this for cross-account access to get cluster details '118'from the account where the cluster resides.'),119'required': False120}121]122123def _display_entries(self, entries):124"""125Display entries in yaml format126127:param entries: a list of OrderedDicts to be printed128:type entries: list129"""130uni_print("Entries:\n\n")131for entry in entries:132uni_print(ordered_yaml_dump(entry))133uni_print("\n")134135def _run_main(self, parsed_args, parsed_globals):136client = EKSClient(self._session,137parsed_args=parsed_args,138parsed_globals=parsed_globals)139new_cluster_dict = client.get_cluster_entry()140new_user_dict = client.get_user_entry(user_alias=parsed_args.user_alias)141142config_selector = KubeconfigSelector(143os.environ.get("KUBECONFIG", ""),144parsed_args.kubeconfig145)146config = config_selector.choose_kubeconfig(147new_cluster_dict["name"]148)149updating_existing = config.has_cluster(new_cluster_dict["name"])150appender = KubeconfigAppender()151new_context_dict = appender.insert_cluster_user_pair(config,152new_cluster_dict,153new_user_dict,154parsed_args.alias)155156if parsed_args.dry_run:157uni_print(config.dump_content())158else:159writer = KubeconfigWriter()160writer.write_kubeconfig(config)161162if updating_existing:163uni_print("Updated context {0} in {1}\n".format(164new_context_dict["name"], config.path165))166else:167uni_print("Added new context {0} to {1}\n".format(168new_context_dict["name"], config.path169))170171if parsed_args.verbose:172self._display_entries([173new_context_dict,174new_user_dict,175new_cluster_dict176])177178179180class KubeconfigSelector(object):181182def __init__(self, env_variable, path_in, validator=None,183loader=None):184"""185Parse KUBECONFIG into a list of absolute paths.186Also replace the empty list with DEFAULT_PATH187188:param env_variable: KUBECONFIG as a long string189:type env_variable: string190191:param path_in: The path passed in through the CLI192:type path_in: string or None193"""194if validator is None:195validator = KubeconfigValidator()196self._validator = validator197198if loader is None:199loader = KubeconfigLoader(validator)200self._loader = loader201202if path_in is not None:203# Override environment variable204self._paths = [self._expand_path(path_in)]205else:206# Get the list of paths from the environment variable207if env_variable == "":208env_variable = DEFAULT_PATH209self._paths = [self._expand_path(element)210for element in env_variable.split(os.pathsep)211if len(element.strip()) > 0]212if len(self._paths) == 0:213self._paths = [DEFAULT_PATH]214215def choose_kubeconfig(self, cluster_name):216"""217Choose which kubeconfig file to read from.218If name is already an entry in one of the $KUBECONFIG files,219choose that one.220Otherwise choose the first file.221222:param cluster_name: The name of the cluster which is going to be added223:type cluster_name: String224225:return: a chosen Kubeconfig based on above rules226:rtype: Kubeconfig227"""228# Search for an existing entry to update229for candidate_path in self._paths:230try:231loaded_config = self._loader.load_kubeconfig(candidate_path)232233if loaded_config.has_cluster(cluster_name):234LOG.debug("Found entry to update at {0}".format(235candidate_path236))237return loaded_config238except KubeconfigError as e:239LOG.warning("Passing {0}:{1}".format(candidate_path, e))240241# No entry was found, use the first file in KUBECONFIG242#243# Note: This could raise KubeconfigErrors if paths[0] is corrupted244return self._loader.load_kubeconfig(self._paths[0])245246def _expand_path(self, path):247""" A helper to expand a path to a full absolute path. """248return os.path.abspath(os.path.expanduser(path))249250251class EKSClient(object):252def __init__(self, session, parsed_args, parsed_globals=None):253self._session = session254self._cluster_name = parsed_args.cluster_name255self._cluster_description = None256self._parsed_globals = parsed_globals257self._parsed_args = parsed_args258259@property260def cluster_description(self):261"""262Use an eks describe-cluster call to get the cluster description263Cache the response in self._cluster_description.264describe-cluster will only be called once.265"""266if self._cluster_description is not None:267return self._cluster_description268269client_kwargs = {}270if self._parsed_globals:271client_kwargs.update({272"region_name": self._parsed_globals.region,273"endpoint_url": self._parsed_globals.endpoint_url,274"verify": self._parsed_globals.verify_ssl,275})276277# Handle role assumption if needed278if getattr(self._parsed_args, 'assume_role_arn', None):279sts_client = create_nested_client(self._session, 'sts')280credentials = sts_client.assume_role(281RoleArn=self._parsed_args.assume_role_arn,282RoleSessionName='EKSDescribeClusterSession'283)["Credentials"]284285client_kwargs.update({286"aws_access_key_id": credentials["AccessKeyId"],287"aws_secret_access_key": credentials["SecretAccessKey"],288"aws_session_token": credentials["SessionToken"],289})290291client = create_nested_client(self._session, "eks", **client_kwargs)292full_description = client.describe_cluster(name=self._cluster_name)293cluster = full_description.get("cluster")294295if not cluster or "status" not in cluster:296raise EKSClusterError("Cluster not found")297if cluster["status"] not in ["ACTIVE", "UPDATING"]:298raise EKSClusterError(f"Cluster status is {cluster['status']}")299300self._cluster_description = cluster301return cluster302303def get_cluster_entry(self):304"""305Return a cluster entry generated using306the previously obtained description.307"""308309cert_data = self.cluster_description.get("certificateAuthority", {}).get("data", "")310endpoint = self.cluster_description.get("endpoint")311arn = self.cluster_description.get("arn")312313generated_cluster = OrderedDict([314("cluster", OrderedDict([315("certificate-authority-data", cert_data),316("server", endpoint)317])),318("name", arn)319])320321if self._parsed_args.proxy_url is not None:322generated_cluster["cluster"]["proxy-url"] = self._parsed_args.proxy_url323324return generated_cluster325326def get_user_entry(self, user_alias=None):327"""328Return a user entry generated using329the previously obtained description.330"""331region = self.cluster_description.get("arn").split(":")[3]332outpost_config = self.cluster_description.get("outpostConfig")333334if outpost_config is None:335cluster_identification_parameter = "--cluster-name"336cluster_identification_value = self._cluster_name337else:338# If cluster contains outpostConfig, use id for identification339cluster_identification_parameter = "--cluster-id"340cluster_identification_value = self.cluster_description.get("id")341342generated_user = OrderedDict([343("name", user_alias or self.cluster_description.get("arn", "")),344("user", OrderedDict([345("exec", OrderedDict([346("apiVersion", API_VERSION),347("args",348[349"--region",350region,351"eks",352"get-token",353cluster_identification_parameter,354cluster_identification_value,355"--output",356"json",357]),358("command", "aws"),359]))360]))361])362363if self._parsed_args.role_arn is not None:364generated_user["user"]["exec"]["args"].extend([365"--role",366self._parsed_args.role_arn367])368369if self._session.profile:370generated_user["user"]["exec"]["env"] = [OrderedDict([371("name", "AWS_PROFILE"),372("value", self._session.profile)373])]374375return generated_user376377378