Path: blob/main/component/loki/source/kubernetes/kubetail/target.go
4096 views
package kubetail12import (3"fmt"4"strings"5"sync"6"time"78"github.com/cespare/xxhash/v2"9"github.com/prometheus/common/model"10"github.com/prometheus/prometheus/model/labels"11"k8s.io/apimachinery/pkg/types"12)1314// Internal labels which indicate what container to tail logs from.15const (16LabelPodNamespace = "__pod_namespace__"17LabelPodName = "__pod_name__"18LabelPodContainerName = "__pod_container_name__"19LabelPodUID = "__pod_uid__"2021kubePodNamespace = "__meta_kubernetes_namespace"22kubePodName = "__meta_kubernetes_pod_name"23kubePodContainerName = "__meta_kubernetes_pod_container_name"24kubePodUID = "__meta_kubernetes_pod_uid"25)2627// Target represents an individual container being tailed for logs.28type Target struct {29origLabels labels.Labels // Original discovery labels30publicLabels labels.Labels // Labels with private labels omitted3132namespacedName types.NamespacedName33containerName string34id string // String representation of "namespace/pod:container"; not fully unique35uid string // UID from pod36hash uint64 // Hash of public labels and id3738mut sync.RWMutex39lastError error40lastEntry time.Time41}4243// NewTarget creates a new Target which can be passed to a tailer.44func NewTarget(origLabels labels.Labels, lset labels.Labels) *Target {45// Precompute some values based on labels so we don't have to continually46// search them.47var (48namespacedName = types.NamespacedName{49Namespace: lset.Get(LabelPodNamespace),50Name: lset.Get(LabelPodName),51}5253containerName = lset.Get(LabelPodContainerName)54uid = lset.Get(LabelPodUID)5556id = fmt.Sprintf("%s:%s", namespacedName, containerName)57publicLabels = publicLabels(lset)58)5960// Precompute the hash of the target from the public labels and the ID of the61// target.62hasher := xxhash.New()63fmt.Fprintf(hasher, "%016d", publicLabels.Hash())64fmt.Fprint(hasher, id)65fmt.Fprint(hasher, uid)66hash := hasher.Sum64()6768return &Target{69origLabels: origLabels,70publicLabels: publicLabels,7172namespacedName: namespacedName,73containerName: containerName,74id: id,75uid: uid,76hash: hash,77}78}7980func publicLabels(lset labels.Labels) labels.Labels {81lb := labels.NewBuilder(lset)8283for _, l := range lset {84if strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {85lb.Del(l.Name)86}87}8889return lb.Labels(nil)90}9192// NamespacedName returns the key of the Pod being targeted.93func (t *Target) NamespacedName() types.NamespacedName { return t.namespacedName }9495// ContainerName returns the container name being targeted.96func (t *Target) ContainerName() string { return t.containerName }9798// String returns a string representing the target in the form99// "namespace/name:container".100func (t *Target) String() string { return t.id }101102// DiscoveryLabels returns the set of original labels prior to processing or103// relabeling.104func (t *Target) DiscoveryLabels() labels.Labels { return t.origLabels }105106// Labels returns the set of public labels for the target.107func (t *Target) Labels() labels.Labels { return t.publicLabels }108109// Hash returns an identifying hash for the target.110func (t *Target) Hash() uint64 { return t.hash }111112// UID returns the UID for this target, based on the pod's UID.113func (t *Target) UID() string { return t.uid }114115// Report reports information about the target.116func (t *Target) Report(time time.Time, err error) {117t.mut.Lock()118defer t.mut.Unlock()119120t.lastError = err121t.lastEntry = time122}123124// LastError returns the most recent error if the target is unhealthy.125func (t *Target) LastError() error {126t.mut.RLock()127defer t.mut.RUnlock()128129return t.lastError130}131132// LastEntry returns the time the most recent log line was read or when the133// most recent error occurred.134func (t *Target) LastEntry() time.Time {135t.mut.RLock()136defer t.mut.RUnlock()137138return t.lastEntry139}140141// PrepareLabels builds a label set with default labels applied from the142// default label set. It validates that the input label set is valid.143//144// The namespace of the pod to tail logs from is determined by the145// [LabelPodNamespace] label. If this label isn't present, PrepareLabels falls146// back to __meta_kubernetes_namespace.147//148// The name of the pod to tail logs from is determined by the [LabelPodName]149// label. If this label isn't present, PrepareLabels falls back to150// __meta_kubernetes_pod_name.151//152// The name of the container to tail logs from is determined by the153// [LabelPodContainerName] label. If this label isn't present, PrepareLabels154// falls back to __meta_kubernetes_pod_container_name.155//156// Validation of lset fails if there is no label indicating the pod namespace,157// pod name, or container name.158func PrepareLabels(lset labels.Labels, defaultJob string) (res labels.Labels, err error) {159tailLabels := []labels.Label{160{Name: model.JobLabel, Value: defaultJob},161}162lb := labels.NewBuilder(lset)163164// Add default labels to lb if they're not in lset.165for _, l := range tailLabels {166if !lset.Has(l.Name) {167lb.Set(l.Name, l.Value)168}169}170171firstLabelValue := func(labelNames ...string) string {172for _, labelName := range labelNames {173if lv := lset.Get(labelName); lv != "" {174return lv175}176}177return ""178}179180var (181podNamespace = firstLabelValue(LabelPodNamespace, kubePodNamespace)182podName = firstLabelValue(LabelPodName, kubePodName)183podContainerName = firstLabelValue(LabelPodContainerName, kubePodContainerName)184podUID = firstLabelValue(LabelPodUID, kubePodUID)185)186187switch {188case podNamespace == "":189return nil, fmt.Errorf("missing pod namespace label")190case podName == "":191return nil, fmt.Errorf("missing pod name label")192case podContainerName == "":193return nil, fmt.Errorf("missing pod container name label")194case podUID == "":195return nil, fmt.Errorf("missing pod UID label")196}197198// Make sure that LabelPodNamespace, LabelPodName, LabelPodContainerName, and199// LabelPodUID are set on the final target.200if !lset.Has(LabelPodNamespace) {201lb.Set(LabelPodNamespace, podNamespace)202}203if !lset.Has(LabelPodName) {204lb.Set(LabelPodName, podName)205}206if !lset.Has(LabelPodContainerName) {207lb.Set(LabelPodContainerName, podContainerName)208}209if !lset.Has(LabelPodUID) {210lb.Set(LabelPodUID, podUID)211}212213// Meta labels are deleted after relabelling. Other internal labels propagate214// to the target which decides whether they will be part of their label set.215for _, l := range lset {216if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {217lb.Del(l.Name)218}219}220221// Default the instance label to the target address.222if !lset.Has(model.InstanceLabel) {223defaultInstance := fmt.Sprintf("%s/%s:%s", podNamespace, podName, podContainerName)224lb.Set(model.InstanceLabel, defaultInstance)225}226227res = lb.Labels(nil)228for _, l := range res {229// Check label values are valid, drop the target if not.230if !model.LabelValue(l.Value).IsValid() {231return nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)232}233}234return res, nil235}236237238