Path: blob/main/component/loki/source/podlogs/reconciler.go
5371 views
package podlogs12import (3"context"4"fmt"5"sort"6"strings"7"sync"8"time"910"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"13monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2"14"github.com/prometheus/common/model"15promlabels "github.com/prometheus/prometheus/model/labels"16"github.com/prometheus/prometheus/model/relabel"17"github.com/prometheus/prometheus/util/strutil"18corev1 "k8s.io/api/core/v1"19metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"20"k8s.io/apimachinery/pkg/labels"21"sigs.k8s.io/controller-runtime/pkg/client"22)2324// The reconciler reconciles the state of PodLogs on Kubernetes with targets to25// collect logs from.26type reconciler struct {27log log.Logger28tailer *kubetail.Manager2930reconcileMut sync.RWMutex31podLogsSelector labels.Selector32podLogsNamespaceSelector labels.Selector3334debugMut sync.RWMutex35debugInfo []DiscoveredPodLogs36}3738// newReconciler creates a new reconciler which synchronizes targets with the39// provided tailer whenever Reconcile is called.40func newReconciler(l log.Logger, tailer *kubetail.Manager) *reconciler {41return &reconciler{42log: l,43tailer: tailer,4445podLogsSelector: labels.Everything(),46podLogsNamespaceSelector: labels.Everything(),47}48}4950// UpdateSelectors updates the selectors used by the reconciler.51func (r *reconciler) UpdateSelectors(podLogs, namespace labels.Selector) {52r.reconcileMut.Lock()53defer r.reconcileMut.Unlock()5455r.podLogsSelector = podLogs56r.podLogsNamespaceSelector = namespace57}5859// Reconcile synchronizes the set of running kubetail targets with the set of60// discovered PodLogs.61func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error {62var newDebugInfo []DiscoveredPodLogs63var newTasks []*kubetail.Target6465listOpts := []client.ListOption{66client.MatchingLabelsSelector{Selector: r.podLogsSelector},67}68var podLogsList monitoringv1alpha2.PodLogsList69if err := cli.List(ctx, &podLogsList, listOpts...); err != nil {70return fmt.Errorf("could not list PodLogs: %w", err)71}7273for _, podLogs := range podLogsList.Items {74key := client.ObjectKeyFromObject(podLogs)7576// Skip over this podLogs if it doesn't match the namespace selector.77podLogsNamespace := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: podLogs.Namespace}}78if err := cli.Get(ctx, client.ObjectKeyFromObject(&podLogsNamespace), &podLogsNamespace); err != nil {79level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "get namespace", "key", key, "err", err)80continue81}82if !r.podLogsNamespaceSelector.Matches(labels.Set(podLogsNamespace.Labels)) {83continue84}8586targets, discoveredPodLogs := r.reconcilePodLogs(ctx, cli, podLogs)8788newTasks = append(newTasks, targets...)89newDebugInfo = append(newDebugInfo, discoveredPodLogs)90}9192if err := r.tailer.SyncTargets(ctx, newTasks); err != nil {93level.Error(r.log).Log("msg", "failed to apply new tailers to run", "err", err)94}9596r.debugMut.Lock()97r.debugInfo = newDebugInfo98r.debugMut.Unlock()99100return nil101}102103func (r *reconciler) reconcilePodLogs(ctx context.Context, cli client.Client, podLogs *monitoringv1alpha2.PodLogs) ([]*kubetail.Target, DiscoveredPodLogs) {104var targets []*kubetail.Target105106discoveredPodLogs := DiscoveredPodLogs{107Namespace: podLogs.Namespace,108Name: podLogs.Name,109LastReconcile: time.Now(),110}111112key := client.ObjectKeyFromObject(podLogs)113level.Debug(r.log).Log("msg", "reconciling PodLogs", "key", key)114115relabelRules, err := convertRelabelConfig(podLogs.Spec.RelabelConfigs)116if err != nil {117discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid relabelings: %s", err)118level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert relabelings", "key", key, "err", err)119return targets, discoveredPodLogs120}121122sel, err := metav1.LabelSelectorAsSelector(&podLogs.Spec.Selector)123if err != nil {124discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid Pod selector: %s", err)125level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert selector", "key", key, "err", err)126return targets, discoveredPodLogs127}128129opts := []client.ListOption{130client.MatchingLabelsSelector{Selector: sel},131}132133var podList corev1.PodList134if err := cli.List(ctx, &podList, opts...); err != nil {135discoveredPodLogs.ReconcileError = fmt.Sprintf("failed to list Pods: %s", err)136level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "list Pods", "key", key, "err", err)137return targets, discoveredPodLogs138}139140namespaceSel, err := metav1.LabelSelectorAsSelector(&podLogs.Spec.NamespaceSelector)141if err != nil {142discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid Pod namespaceSelector: %s", err)143level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert namespaceSelector", "key", key, "err", err)144return targets, discoveredPodLogs145}146147for _, pod := range podList.Items {148discoveredPod := DiscoveredPod{149Namespace: pod.Namespace,150Name: pod.Name,151}152153// Skip over this pod if it doesn't match the namespace selector.154namespace := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: pod.Namespace}}155if err := cli.Get(ctx, client.ObjectKeyFromObject(&namespace), &namespace); err != nil {156level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "get namespace for Pod", "key", key, "err", err)157continue158}159if !namespaceSel.Matches(labels.Set(namespace.Labels)) {160continue161}162163level.Debug(r.log).Log("msg", "found matching Pod", "key", key, "pod", client.ObjectKeyFromObject(&pod))164165handleContainer := func(container *corev1.Container, initContainer bool) {166targetLabels := buildTargetLabels(discoveredContainer{167PodLogs: podLogs,168PodNamespace: &namespace,169Pod: &pod,170Container: container,171InitContainer: initContainer,172})173processedLabels, _ := relabel.Process(targetLabels.Copy(), relabelRules...)174175defaultJob := fmt.Sprintf("%s/%s:%s", podLogs.Namespace, podLogs.Name, container.Name)176finalLabels, err := kubetail.PrepareLabels(processedLabels, defaultJob)177178if err != nil {179discoveredPod.Containers = append(discoveredPod.Containers, DiscoveredContainer{180DiscoveredLabels: targetLabels.Map(),181Labels: processedLabels.Map(),182ReconcileError: fmt.Sprintf("invalid labels: %s", err),183})184return185}186187target := kubetail.NewTarget(targetLabels.Copy(), finalLabels)188if len(processedLabels) != 0 {189targets = append(targets, target)190}191192discoveredPod.Containers = append(discoveredPod.Containers, DiscoveredContainer{193DiscoveredLabels: targetLabels.Map(),194Labels: target.Labels().Map(),195})196}197198for _, container := range pod.Spec.InitContainers {199handleContainer(&container, true)200}201for _, container := range pod.Spec.Containers {202handleContainer(&container, false)203}204205discoveredPodLogs.Pods = append(discoveredPodLogs.Pods, discoveredPod)206}207208return targets, discoveredPodLogs209}210211// DebugInfo returns the current debug information for the reconciler.212func (r *reconciler) DebugInfo() []DiscoveredPodLogs {213r.debugMut.RLock()214defer r.debugMut.RUnlock()215216return r.debugInfo217}218219type discoveredContainer struct {220PodLogs *monitoringv1alpha2.PodLogs221PodNamespace *corev1.Namespace222Pod *corev1.Pod223Container *corev1.Container224InitContainer bool225}226227func buildTargetLabels(opts discoveredContainer) promlabels.Labels {228targetLabels := promlabels.NewBuilder(nil)229230targetLabels.Set("__meta_kubernetes_podlogs_namespace", opts.PodLogs.Namespace)231targetLabels.Set("__meta_kubernetes_podlogs_name", opts.PodLogs.Name)232for key, value := range opts.PodLogs.Labels {233key = strutil.SanitizeLabelName(key)234targetLabels.Set("__meta_kubernetes_podlogs_label_"+key, value)235targetLabels.Set("__meta_kubernetes_podlogs_labelpresent_"+key, "true")236}237for key, value := range opts.PodLogs.Annotations {238key = strutil.SanitizeLabelName(key)239targetLabels.Set("__meta_kubernetes_podlogs_annotation_"+key, value)240targetLabels.Set("__meta_kubernetes_podlogs_annotationpresent_"+key, "true")241}242243targetLabels.Set("__meta_kubernetes_namespace", opts.Pod.Namespace)244for key, value := range opts.PodNamespace.Labels {245key = strutil.SanitizeLabelName(key)246targetLabels.Set("__meta_kubernetes_namespace_label_"+key, value)247targetLabels.Set("__meta_kubernetes_namespace_labelpresent_"+key, "true")248}249for key, value := range opts.PodNamespace.Annotations {250key = strutil.SanitizeLabelName(key)251targetLabels.Set("__meta_kubernetes_namespace_annotation_"+key, value)252targetLabels.Set("__meta_kubernetes_namespace_annotationpresent_"+key, "true")253}254255targetLabels.Set("__meta_kubernetes_pod_name", opts.Pod.Name)256targetLabels.Set("__meta_kubernetes_pod_ip", opts.Pod.Status.PodIP)257for key, value := range opts.Pod.Labels {258key = strutil.SanitizeLabelName(key)259targetLabels.Set("__meta_kubernetes_pod_label_"+key, value)260targetLabels.Set("__meta_kubernetes_pod_labelpresent_"+key, "true")261}262for key, value := range opts.Pod.Annotations {263key = strutil.SanitizeLabelName(key)264targetLabels.Set("__meta_kubernetes_pod_annotation_"+key, value)265targetLabels.Set("__meta_kubernetes_pod_annotationpresent_"+key, "true")266}267targetLabels.Set("__meta_kubernetes_pod_container_init", fmt.Sprint(opts.InitContainer))268targetLabels.Set("__meta_kubernetes_pod_container_name", opts.Container.Name)269targetLabels.Set("__meta_kubernetes_pod_container_image", opts.Container.Image)270targetLabels.Set("__meta_kubernetes_pod_ready", string(podReady(opts.Pod)))271targetLabels.Set("__meta_kubernetes_pod_phase", string(opts.Pod.Status.Phase))272targetLabels.Set("__meta_kubernetes_pod_node_name", opts.Pod.Spec.NodeName)273targetLabels.Set("__meta_kubernetes_pod_host_ip", opts.Pod.Status.HostIP)274targetLabels.Set("__meta_kubernetes_pod_uid", string(opts.Pod.UID))275276for _, ref := range opts.Pod.GetOwnerReferences() {277if ref.Controller != nil && *ref.Controller {278targetLabels.Set("__meta_kubernetes_pod_controller_kind", ref.Kind)279targetLabels.Set("__meta_kubernetes_pod_controller_name", ref.Name)280break281}282}283284// Add labels needed for collecting.285targetLabels.Set(kubetail.LabelPodNamespace, opts.Pod.Namespace)286targetLabels.Set(kubetail.LabelPodName, opts.Pod.Name)287targetLabels.Set(kubetail.LabelPodContainerName, opts.Container.Name)288targetLabels.Set(kubetail.LabelPodUID, string(opts.Pod.GetUID()))289290// Add default labels (job, instance)291targetLabels.Set(model.InstanceLabel, fmt.Sprintf("%s/%s:%s", opts.Pod.Namespace, opts.Pod.Name, opts.Container.Name))292targetLabels.Set(model.JobLabel, fmt.Sprintf("%s/%s", opts.PodLogs.Namespace, opts.PodLogs.Name))293294res := targetLabels.Labels(nil)295sort.Sort(res)296return res297}298299func podReady(pod *corev1.Pod) model.LabelValue {300for _, cond := range pod.Status.Conditions {301if cond.Type == corev1.PodReady {302return model.LabelValue(strings.ToLower(string(cond.Status)))303}304}305return model.LabelValue(strings.ToLower(string(corev1.ConditionUnknown)))306}307308type DiscoveredPodLogs struct {309Namespace string `river:"namespace,attr"`310Name string `river:"name,attr"`311LastReconcile time.Time `river:"last_reconcile,attr,optional"`312ReconcileError string `river:"reconcile_error,attr,optional"`313314Pods []DiscoveredPod `river:"pod,block"`315}316317type DiscoveredPod struct {318Namespace string `river:"namespace,attr"`319Name string `river:"name,attr"`320ReconcileError string `river:"reconcile_error,attr,optional"`321322Containers []DiscoveredContainer `river:"container,block"`323}324325type DiscoveredContainer struct {326DiscoveredLabels map[string]string `river:"discovered_labels,attr"`327Labels map[string]string `river:"labels,attr"`328ReconcileError string `river:"reconcile_error,attr,optional"`329}330331332