Path: blob/main/pkg/operator/clientutil/clientutil.go
4095 views
package clientutil12import (3"context"4"fmt"5"regexp"6"strings"78"github.com/go-kit/log"9"github.com/go-kit/log/level"10apps_v1 "k8s.io/api/apps/v1"11v1 "k8s.io/api/core/v1"12k8s_errors "k8s.io/apimachinery/pkg/api/errors"13meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"14"k8s.io/apimachinery/pkg/types"15"k8s.io/apimachinery/pkg/util/validation"16"sigs.k8s.io/controller-runtime/pkg/client"17)1819var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+")2021// SanitizeVolumeName ensures that the given volume name is a valid DNS-1123 label22// accepted by Kubernetes.23//24// Copied from github.com/prometheus-operator/prometheus-operator/pkg/k8sutil.25func SanitizeVolumeName(name string) string {26name = strings.ToLower(name)27name = invalidDNS1123Characters.ReplaceAllString(name, "-")28if len(name) > validation.DNS1123LabelMaxLength {29name = name[0:validation.DNS1123LabelMaxLength]30}31return strings.Trim(name, "-")32}3334// CreateOrUpdateSecret applies the given secret against the client.35func CreateOrUpdateSecret(ctx context.Context, c client.Client, s *v1.Secret) error {36var exist v1.Secret37err := c.Get(ctx, client.ObjectKeyFromObject(s), &exist)38if err != nil && !k8s_errors.IsNotFound(err) {39return fmt.Errorf("failed to retrieve existing service: %w", err)40}4142if k8s_errors.IsNotFound(err) {43err := c.Create(ctx, s)44if err != nil {45return fmt.Errorf("failed to create service: %w", err)46}47} else {48s.ResourceVersion = exist.ResourceVersion49s.SetOwnerReferences(mergeOwnerReferences(s.GetOwnerReferences(), exist.GetOwnerReferences()))50s.SetLabels(mergeMaps(s.Labels, exist.Labels))51s.SetAnnotations(mergeMaps(s.Annotations, exist.Annotations))5253err := c.Update(ctx, s)54if err != nil && !k8s_errors.IsNotFound(err) {55return fmt.Errorf("failed to update service: %w", err)56}57}5859return nil60}6162// CreateOrUpdateService applies the given svc against the client.63func CreateOrUpdateService(ctx context.Context, c client.Client, svc *v1.Service) error {64var exist v1.Service65err := c.Get(ctx, client.ObjectKeyFromObject(svc), &exist)66if err != nil && !k8s_errors.IsNotFound(err) {67return fmt.Errorf("failed to retrieve existing service: %w", err)68}6970if k8s_errors.IsNotFound(err) {71err := c.Create(ctx, svc)72if err != nil {73return fmt.Errorf("failed to create service: %w", err)74}75} else {76svc.ResourceVersion = exist.ResourceVersion77svc.Spec.IPFamilies = exist.Spec.IPFamilies78svc.SetOwnerReferences(mergeOwnerReferences(svc.GetOwnerReferences(), exist.GetOwnerReferences()))79svc.SetLabels(mergeMaps(svc.Labels, exist.Labels))80svc.SetAnnotations(mergeMaps(svc.Annotations, exist.Annotations))8182err := c.Update(ctx, svc)83if err != nil && !k8s_errors.IsNotFound(err) {84return fmt.Errorf("failed to update service: %w", err)85}86}8788return nil89}9091// CreateOrUpdateEndpoints applies the given eps against the client.92func CreateOrUpdateEndpoints(ctx context.Context, c client.Client, eps *v1.Endpoints) error {93var exist v1.Endpoints94err := c.Get(ctx, client.ObjectKeyFromObject(eps), &exist)95if err != nil && !k8s_errors.IsNotFound(err) {96return fmt.Errorf("failed to retrieve existing endpoints: %w", err)97}9899if k8s_errors.IsNotFound(err) {100err := c.Create(ctx, eps)101if err != nil {102return fmt.Errorf("failed to create endpoints: %w", err)103}104} else {105eps.ResourceVersion = exist.ResourceVersion106eps.SetOwnerReferences(mergeOwnerReferences(eps.GetOwnerReferences(), exist.GetOwnerReferences()))107eps.SetLabels(mergeMaps(eps.Labels, exist.Labels))108eps.SetAnnotations(mergeMaps(eps.Annotations, exist.Annotations))109110err := c.Update(ctx, eps)111if err != nil && !k8s_errors.IsNotFound(err) {112return fmt.Errorf("failed to update endpoints: %w", err)113}114}115116return nil117}118119// CreateOrUpdateStatefulSet applies the given StatefulSet against the client.120func CreateOrUpdateStatefulSet(ctx context.Context, c client.Client, ss *apps_v1.StatefulSet, l log.Logger) error {121var exist apps_v1.StatefulSet122err := c.Get(ctx, client.ObjectKeyFromObject(ss), &exist)123if err != nil && !k8s_errors.IsNotFound(err) {124return fmt.Errorf("failed to retrieve existing statefulset: %w", err)125}126127if k8s_errors.IsNotFound(err) {128err := c.Create(ctx, ss)129if err != nil {130return fmt.Errorf("failed to create statefulset: %w", err)131}132} else {133ss.ResourceVersion = exist.ResourceVersion134ss.SetOwnerReferences(mergeOwnerReferences(ss.GetOwnerReferences(), exist.GetOwnerReferences()))135ss.SetLabels(mergeMaps(ss.Labels, exist.Labels))136ss.SetAnnotations(mergeMaps(ss.Annotations, exist.Annotations))137138err := c.Update(ctx, ss)139if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {140level.Error(l).Log("msg", "error updating StatefulSet. Attempting to recreate", "err", err.Error())141// Resource version should only be set when updating142ss.ResourceVersion = ""143144err = c.Delete(ctx, ss)145if err != nil {146return fmt.Errorf("failed to update statefulset when deleting old statefulset: %w", err)147}148err = c.Create(ctx, ss)149if err != nil {150return fmt.Errorf("failed to update statefulset when creating replacement statefulset: %w", err)151}152} else if err != nil {153return fmt.Errorf("failed to update statefulset: %w", err)154}155}156157return nil158}159160// CreateOrUpdateDaemonSet applies the given DaemonSet against the client.161func CreateOrUpdateDaemonSet(ctx context.Context, c client.Client, ss *apps_v1.DaemonSet, l log.Logger) error {162var exist apps_v1.DaemonSet163err := c.Get(ctx, client.ObjectKeyFromObject(ss), &exist)164if err != nil && !k8s_errors.IsNotFound(err) {165return fmt.Errorf("failed to retrieve existing daemonset: %w", err)166}167168if k8s_errors.IsNotFound(err) {169err := c.Create(ctx, ss)170if err != nil {171return fmt.Errorf("failed to create daemonset: %w", err)172}173} else {174ss.ResourceVersion = exist.ResourceVersion175ss.SetOwnerReferences(mergeOwnerReferences(ss.GetOwnerReferences(), exist.GetOwnerReferences()))176ss.SetLabels(mergeMaps(ss.Labels, exist.Labels))177ss.SetAnnotations(mergeMaps(ss.Annotations, exist.Annotations))178179err := c.Update(ctx, ss)180if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {181level.Error(l).Log("msg", "error updating Daemonset. Attempting to recreate", "err", err.Error())182// Resource version should only be set when updating183ss.ResourceVersion = ""184185err = c.Delete(ctx, ss)186if err != nil {187return fmt.Errorf("failed to update daemonset: deleting old daemonset: %w", err)188}189err = c.Create(ctx, ss)190if err != nil {191return fmt.Errorf("failed to update daemonset: creating new deamonset: %w", err)192}193} else if err != nil {194return fmt.Errorf("failed to update daemonset: %w", err)195}196}197198return nil199}200201// CreateOrUpdateDeployment applies the given DaemonSet against the client.202func CreateOrUpdateDeployment(ctx context.Context, c client.Client, d *apps_v1.Deployment, l log.Logger) error {203var exist apps_v1.Deployment204err := c.Get(ctx, client.ObjectKeyFromObject(d), &exist)205if err != nil && !k8s_errors.IsNotFound(err) {206return fmt.Errorf("failed to retrieve existing Deployment: %w", err)207}208209if k8s_errors.IsNotFound(err) {210err := c.Create(ctx, d)211if err != nil {212return fmt.Errorf("failed to create Deployment: %w", err)213}214} else {215d.ResourceVersion = exist.ResourceVersion216d.SetOwnerReferences(mergeOwnerReferences(d.GetOwnerReferences(), exist.GetOwnerReferences()))217d.SetLabels(mergeMaps(d.Labels, exist.Labels))218d.SetAnnotations(mergeMaps(d.Annotations, exist.Annotations))219220err := c.Update(ctx, d)221if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {222level.Error(l).Log("msg", "error updating Deployment. Attempting to recreate", "err", err.Error())223// Resource version should only be set when updating224d.ResourceVersion = ""225226err = c.Delete(ctx, d)227if err != nil {228return fmt.Errorf("failed to update Deployment: deleting old Deployment: %w", err)229}230err = c.Create(ctx, d)231if err != nil {232return fmt.Errorf("failed to update Deployment: creating new Deployment: %w", err)233}234} else if err != nil {235return fmt.Errorf("failed to update Deployment: %w", err)236}237}238239return nil240}241242func mergeOwnerReferences(new, old []meta_v1.OwnerReference) []meta_v1.OwnerReference {243existing := make(map[types.UID]bool)244for _, ref := range old {245existing[ref.UID] = true246}247for _, ref := range new {248if _, ok := existing[ref.UID]; !ok {249old = append(old, ref)250}251}252return old253}254255func mergeMaps(new, old map[string]string) map[string]string {256if old == nil {257old = make(map[string]string, len(new))258}259for k, v := range new {260old[k] = v261}262return old263}264265266