Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes/kubetail/target.go
4096 views
1
package kubetail
2
3
import (
4
"fmt"
5
"strings"
6
"sync"
7
"time"
8
9
"github.com/cespare/xxhash/v2"
10
"github.com/prometheus/common/model"
11
"github.com/prometheus/prometheus/model/labels"
12
"k8s.io/apimachinery/pkg/types"
13
)
14
15
// Internal labels which indicate what container to tail logs from.
16
const (
17
LabelPodNamespace = "__pod_namespace__"
18
LabelPodName = "__pod_name__"
19
LabelPodContainerName = "__pod_container_name__"
20
LabelPodUID = "__pod_uid__"
21
22
kubePodNamespace = "__meta_kubernetes_namespace"
23
kubePodName = "__meta_kubernetes_pod_name"
24
kubePodContainerName = "__meta_kubernetes_pod_container_name"
25
kubePodUID = "__meta_kubernetes_pod_uid"
26
)
27
28
// Target represents an individual container being tailed for logs.
29
type Target struct {
30
origLabels labels.Labels // Original discovery labels
31
publicLabels labels.Labels // Labels with private labels omitted
32
33
namespacedName types.NamespacedName
34
containerName string
35
id string // String representation of "namespace/pod:container"; not fully unique
36
uid string // UID from pod
37
hash uint64 // Hash of public labels and id
38
39
mut sync.RWMutex
40
lastError error
41
lastEntry time.Time
42
}
43
44
// NewTarget creates a new Target which can be passed to a tailer.
45
func NewTarget(origLabels labels.Labels, lset labels.Labels) *Target {
46
// Precompute some values based on labels so we don't have to continually
47
// search them.
48
var (
49
namespacedName = types.NamespacedName{
50
Namespace: lset.Get(LabelPodNamespace),
51
Name: lset.Get(LabelPodName),
52
}
53
54
containerName = lset.Get(LabelPodContainerName)
55
uid = lset.Get(LabelPodUID)
56
57
id = fmt.Sprintf("%s:%s", namespacedName, containerName)
58
publicLabels = publicLabels(lset)
59
)
60
61
// Precompute the hash of the target from the public labels and the ID of the
62
// target.
63
hasher := xxhash.New()
64
fmt.Fprintf(hasher, "%016d", publicLabels.Hash())
65
fmt.Fprint(hasher, id)
66
fmt.Fprint(hasher, uid)
67
hash := hasher.Sum64()
68
69
return &Target{
70
origLabels: origLabels,
71
publicLabels: publicLabels,
72
73
namespacedName: namespacedName,
74
containerName: containerName,
75
id: id,
76
uid: uid,
77
hash: hash,
78
}
79
}
80
81
func publicLabels(lset labels.Labels) labels.Labels {
82
lb := labels.NewBuilder(lset)
83
84
for _, l := range lset {
85
if strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
86
lb.Del(l.Name)
87
}
88
}
89
90
return lb.Labels(nil)
91
}
92
93
// NamespacedName returns the key of the Pod being targeted.
94
func (t *Target) NamespacedName() types.NamespacedName { return t.namespacedName }
95
96
// ContainerName returns the container name being targeted.
97
func (t *Target) ContainerName() string { return t.containerName }
98
99
// String returns a string representing the target in the form
100
// "namespace/name:container".
101
func (t *Target) String() string { return t.id }
102
103
// DiscoveryLabels returns the set of original labels prior to processing or
104
// relabeling.
105
func (t *Target) DiscoveryLabels() labels.Labels { return t.origLabels }
106
107
// Labels returns the set of public labels for the target.
108
func (t *Target) Labels() labels.Labels { return t.publicLabels }
109
110
// Hash returns an identifying hash for the target.
111
func (t *Target) Hash() uint64 { return t.hash }
112
113
// UID returns the UID for this target, based on the pod's UID.
114
func (t *Target) UID() string { return t.uid }
115
116
// Report reports information about the target.
117
func (t *Target) Report(time time.Time, err error) {
118
t.mut.Lock()
119
defer t.mut.Unlock()
120
121
t.lastError = err
122
t.lastEntry = time
123
}
124
125
// LastError returns the most recent error if the target is unhealthy.
126
func (t *Target) LastError() error {
127
t.mut.RLock()
128
defer t.mut.RUnlock()
129
130
return t.lastError
131
}
132
133
// LastEntry returns the time the most recent log line was read or when the
134
// most recent error occurred.
135
func (t *Target) LastEntry() time.Time {
136
t.mut.RLock()
137
defer t.mut.RUnlock()
138
139
return t.lastEntry
140
}
141
142
// PrepareLabels builds a label set with default labels applied from the
143
// default label set. It validates that the input label set is valid.
144
//
145
// The namespace of the pod to tail logs from is determined by the
146
// [LabelPodNamespace] label. If this label isn't present, PrepareLabels falls
147
// back to __meta_kubernetes_namespace.
148
//
149
// The name of the pod to tail logs from is determined by the [LabelPodName]
150
// label. If this label isn't present, PrepareLabels falls back to
151
// __meta_kubernetes_pod_name.
152
//
153
// The name of the container to tail logs from is determined by the
154
// [LabelPodContainerName] label. If this label isn't present, PrepareLabels
155
// falls back to __meta_kubernetes_pod_container_name.
156
//
157
// Validation of lset fails if there is no label indicating the pod namespace,
158
// pod name, or container name.
159
func PrepareLabels(lset labels.Labels, defaultJob string) (res labels.Labels, err error) {
160
tailLabels := []labels.Label{
161
{Name: model.JobLabel, Value: defaultJob},
162
}
163
lb := labels.NewBuilder(lset)
164
165
// Add default labels to lb if they're not in lset.
166
for _, l := range tailLabels {
167
if !lset.Has(l.Name) {
168
lb.Set(l.Name, l.Value)
169
}
170
}
171
172
firstLabelValue := func(labelNames ...string) string {
173
for _, labelName := range labelNames {
174
if lv := lset.Get(labelName); lv != "" {
175
return lv
176
}
177
}
178
return ""
179
}
180
181
var (
182
podNamespace = firstLabelValue(LabelPodNamespace, kubePodNamespace)
183
podName = firstLabelValue(LabelPodName, kubePodName)
184
podContainerName = firstLabelValue(LabelPodContainerName, kubePodContainerName)
185
podUID = firstLabelValue(LabelPodUID, kubePodUID)
186
)
187
188
switch {
189
case podNamespace == "":
190
return nil, fmt.Errorf("missing pod namespace label")
191
case podName == "":
192
return nil, fmt.Errorf("missing pod name label")
193
case podContainerName == "":
194
return nil, fmt.Errorf("missing pod container name label")
195
case podUID == "":
196
return nil, fmt.Errorf("missing pod UID label")
197
}
198
199
// Make sure that LabelPodNamespace, LabelPodName, LabelPodContainerName, and
200
// LabelPodUID are set on the final target.
201
if !lset.Has(LabelPodNamespace) {
202
lb.Set(LabelPodNamespace, podNamespace)
203
}
204
if !lset.Has(LabelPodName) {
205
lb.Set(LabelPodName, podName)
206
}
207
if !lset.Has(LabelPodContainerName) {
208
lb.Set(LabelPodContainerName, podContainerName)
209
}
210
if !lset.Has(LabelPodUID) {
211
lb.Set(LabelPodUID, podUID)
212
}
213
214
// Meta labels are deleted after relabelling. Other internal labels propagate
215
// to the target which decides whether they will be part of their label set.
216
for _, l := range lset {
217
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
218
lb.Del(l.Name)
219
}
220
}
221
222
// Default the instance label to the target address.
223
if !lset.Has(model.InstanceLabel) {
224
defaultInstance := fmt.Sprintf("%s/%s:%s", podNamespace, podName, podContainerName)
225
lb.Set(model.InstanceLabel, defaultInstance)
226
}
227
228
res = lb.Labels(nil)
229
for _, l := range res {
230
// Check label values are valid, drop the target if not.
231
if !model.LabelValue(l.Value).IsValid() {
232
return nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
233
}
234
}
235
return res, nil
236
}
237
238