Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/podlogs/reconciler.go
5371 views
1
package podlogs
2
3
import (
4
"context"
5
"fmt"
6
"sort"
7
"strings"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
14
monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2"
15
"github.com/prometheus/common/model"
16
promlabels "github.com/prometheus/prometheus/model/labels"
17
"github.com/prometheus/prometheus/model/relabel"
18
"github.com/prometheus/prometheus/util/strutil"
19
corev1 "k8s.io/api/core/v1"
20
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
"k8s.io/apimachinery/pkg/labels"
22
"sigs.k8s.io/controller-runtime/pkg/client"
23
)
24
25
// The reconciler reconciles the state of PodLogs on Kubernetes with targets to
26
// collect logs from.
27
type reconciler struct {
28
log log.Logger
29
tailer *kubetail.Manager
30
31
reconcileMut sync.RWMutex
32
podLogsSelector labels.Selector
33
podLogsNamespaceSelector labels.Selector
34
35
debugMut sync.RWMutex
36
debugInfo []DiscoveredPodLogs
37
}
38
39
// newReconciler creates a new reconciler which synchronizes targets with the
40
// provided tailer whenever Reconcile is called.
41
func newReconciler(l log.Logger, tailer *kubetail.Manager) *reconciler {
42
return &reconciler{
43
log: l,
44
tailer: tailer,
45
46
podLogsSelector: labels.Everything(),
47
podLogsNamespaceSelector: labels.Everything(),
48
}
49
}
50
51
// UpdateSelectors updates the selectors used by the reconciler.
52
func (r *reconciler) UpdateSelectors(podLogs, namespace labels.Selector) {
53
r.reconcileMut.Lock()
54
defer r.reconcileMut.Unlock()
55
56
r.podLogsSelector = podLogs
57
r.podLogsNamespaceSelector = namespace
58
}
59
60
// Reconcile synchronizes the set of running kubetail targets with the set of
61
// discovered PodLogs.
62
func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error {
63
var newDebugInfo []DiscoveredPodLogs
64
var newTasks []*kubetail.Target
65
66
listOpts := []client.ListOption{
67
client.MatchingLabelsSelector{Selector: r.podLogsSelector},
68
}
69
var podLogsList monitoringv1alpha2.PodLogsList
70
if err := cli.List(ctx, &podLogsList, listOpts...); err != nil {
71
return fmt.Errorf("could not list PodLogs: %w", err)
72
}
73
74
for _, podLogs := range podLogsList.Items {
75
key := client.ObjectKeyFromObject(podLogs)
76
77
// Skip over this podLogs if it doesn't match the namespace selector.
78
podLogsNamespace := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: podLogs.Namespace}}
79
if err := cli.Get(ctx, client.ObjectKeyFromObject(&podLogsNamespace), &podLogsNamespace); err != nil {
80
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "get namespace", "key", key, "err", err)
81
continue
82
}
83
if !r.podLogsNamespaceSelector.Matches(labels.Set(podLogsNamespace.Labels)) {
84
continue
85
}
86
87
targets, discoveredPodLogs := r.reconcilePodLogs(ctx, cli, podLogs)
88
89
newTasks = append(newTasks, targets...)
90
newDebugInfo = append(newDebugInfo, discoveredPodLogs)
91
}
92
93
if err := r.tailer.SyncTargets(ctx, newTasks); err != nil {
94
level.Error(r.log).Log("msg", "failed to apply new tailers to run", "err", err)
95
}
96
97
r.debugMut.Lock()
98
r.debugInfo = newDebugInfo
99
r.debugMut.Unlock()
100
101
return nil
102
}
103
104
func (r *reconciler) reconcilePodLogs(ctx context.Context, cli client.Client, podLogs *monitoringv1alpha2.PodLogs) ([]*kubetail.Target, DiscoveredPodLogs) {
105
var targets []*kubetail.Target
106
107
discoveredPodLogs := DiscoveredPodLogs{
108
Namespace: podLogs.Namespace,
109
Name: podLogs.Name,
110
LastReconcile: time.Now(),
111
}
112
113
key := client.ObjectKeyFromObject(podLogs)
114
level.Debug(r.log).Log("msg", "reconciling PodLogs", "key", key)
115
116
relabelRules, err := convertRelabelConfig(podLogs.Spec.RelabelConfigs)
117
if err != nil {
118
discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid relabelings: %s", err)
119
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert relabelings", "key", key, "err", err)
120
return targets, discoveredPodLogs
121
}
122
123
sel, err := metav1.LabelSelectorAsSelector(&podLogs.Spec.Selector)
124
if err != nil {
125
discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid Pod selector: %s", err)
126
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert selector", "key", key, "err", err)
127
return targets, discoveredPodLogs
128
}
129
130
opts := []client.ListOption{
131
client.MatchingLabelsSelector{Selector: sel},
132
}
133
134
var podList corev1.PodList
135
if err := cli.List(ctx, &podList, opts...); err != nil {
136
discoveredPodLogs.ReconcileError = fmt.Sprintf("failed to list Pods: %s", err)
137
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "list Pods", "key", key, "err", err)
138
return targets, discoveredPodLogs
139
}
140
141
namespaceSel, err := metav1.LabelSelectorAsSelector(&podLogs.Spec.NamespaceSelector)
142
if err != nil {
143
discoveredPodLogs.ReconcileError = fmt.Sprintf("invalid Pod namespaceSelector: %s", err)
144
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "convert namespaceSelector", "key", key, "err", err)
145
return targets, discoveredPodLogs
146
}
147
148
for _, pod := range podList.Items {
149
discoveredPod := DiscoveredPod{
150
Namespace: pod.Namespace,
151
Name: pod.Name,
152
}
153
154
// Skip over this pod if it doesn't match the namespace selector.
155
namespace := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: pod.Namespace}}
156
if err := cli.Get(ctx, client.ObjectKeyFromObject(&namespace), &namespace); err != nil {
157
level.Error(r.log).Log("msg", "failed to reconcile PodLogs", "operation", "get namespace for Pod", "key", key, "err", err)
158
continue
159
}
160
if !namespaceSel.Matches(labels.Set(namespace.Labels)) {
161
continue
162
}
163
164
level.Debug(r.log).Log("msg", "found matching Pod", "key", key, "pod", client.ObjectKeyFromObject(&pod))
165
166
handleContainer := func(container *corev1.Container, initContainer bool) {
167
targetLabels := buildTargetLabels(discoveredContainer{
168
PodLogs: podLogs,
169
PodNamespace: &namespace,
170
Pod: &pod,
171
Container: container,
172
InitContainer: initContainer,
173
})
174
processedLabels, _ := relabel.Process(targetLabels.Copy(), relabelRules...)
175
176
defaultJob := fmt.Sprintf("%s/%s:%s", podLogs.Namespace, podLogs.Name, container.Name)
177
finalLabels, err := kubetail.PrepareLabels(processedLabels, defaultJob)
178
179
if err != nil {
180
discoveredPod.Containers = append(discoveredPod.Containers, DiscoveredContainer{
181
DiscoveredLabels: targetLabels.Map(),
182
Labels: processedLabels.Map(),
183
ReconcileError: fmt.Sprintf("invalid labels: %s", err),
184
})
185
return
186
}
187
188
target := kubetail.NewTarget(targetLabels.Copy(), finalLabels)
189
if len(processedLabels) != 0 {
190
targets = append(targets, target)
191
}
192
193
discoveredPod.Containers = append(discoveredPod.Containers, DiscoveredContainer{
194
DiscoveredLabels: targetLabels.Map(),
195
Labels: target.Labels().Map(),
196
})
197
}
198
199
for _, container := range pod.Spec.InitContainers {
200
handleContainer(&container, true)
201
}
202
for _, container := range pod.Spec.Containers {
203
handleContainer(&container, false)
204
}
205
206
discoveredPodLogs.Pods = append(discoveredPodLogs.Pods, discoveredPod)
207
}
208
209
return targets, discoveredPodLogs
210
}
211
212
// DebugInfo returns the current debug information for the reconciler.
213
func (r *reconciler) DebugInfo() []DiscoveredPodLogs {
214
r.debugMut.RLock()
215
defer r.debugMut.RUnlock()
216
217
return r.debugInfo
218
}
219
220
type discoveredContainer struct {
221
PodLogs *monitoringv1alpha2.PodLogs
222
PodNamespace *corev1.Namespace
223
Pod *corev1.Pod
224
Container *corev1.Container
225
InitContainer bool
226
}
227
228
func buildTargetLabels(opts discoveredContainer) promlabels.Labels {
229
targetLabels := promlabels.NewBuilder(nil)
230
231
targetLabels.Set("__meta_kubernetes_podlogs_namespace", opts.PodLogs.Namespace)
232
targetLabels.Set("__meta_kubernetes_podlogs_name", opts.PodLogs.Name)
233
for key, value := range opts.PodLogs.Labels {
234
key = strutil.SanitizeLabelName(key)
235
targetLabels.Set("__meta_kubernetes_podlogs_label_"+key, value)
236
targetLabels.Set("__meta_kubernetes_podlogs_labelpresent_"+key, "true")
237
}
238
for key, value := range opts.PodLogs.Annotations {
239
key = strutil.SanitizeLabelName(key)
240
targetLabels.Set("__meta_kubernetes_podlogs_annotation_"+key, value)
241
targetLabels.Set("__meta_kubernetes_podlogs_annotationpresent_"+key, "true")
242
}
243
244
targetLabels.Set("__meta_kubernetes_namespace", opts.Pod.Namespace)
245
for key, value := range opts.PodNamespace.Labels {
246
key = strutil.SanitizeLabelName(key)
247
targetLabels.Set("__meta_kubernetes_namespace_label_"+key, value)
248
targetLabels.Set("__meta_kubernetes_namespace_labelpresent_"+key, "true")
249
}
250
for key, value := range opts.PodNamespace.Annotations {
251
key = strutil.SanitizeLabelName(key)
252
targetLabels.Set("__meta_kubernetes_namespace_annotation_"+key, value)
253
targetLabels.Set("__meta_kubernetes_namespace_annotationpresent_"+key, "true")
254
}
255
256
targetLabels.Set("__meta_kubernetes_pod_name", opts.Pod.Name)
257
targetLabels.Set("__meta_kubernetes_pod_ip", opts.Pod.Status.PodIP)
258
for key, value := range opts.Pod.Labels {
259
key = strutil.SanitizeLabelName(key)
260
targetLabels.Set("__meta_kubernetes_pod_label_"+key, value)
261
targetLabels.Set("__meta_kubernetes_pod_labelpresent_"+key, "true")
262
}
263
for key, value := range opts.Pod.Annotations {
264
key = strutil.SanitizeLabelName(key)
265
targetLabels.Set("__meta_kubernetes_pod_annotation_"+key, value)
266
targetLabels.Set("__meta_kubernetes_pod_annotationpresent_"+key, "true")
267
}
268
targetLabels.Set("__meta_kubernetes_pod_container_init", fmt.Sprint(opts.InitContainer))
269
targetLabels.Set("__meta_kubernetes_pod_container_name", opts.Container.Name)
270
targetLabels.Set("__meta_kubernetes_pod_container_image", opts.Container.Image)
271
targetLabels.Set("__meta_kubernetes_pod_ready", string(podReady(opts.Pod)))
272
targetLabels.Set("__meta_kubernetes_pod_phase", string(opts.Pod.Status.Phase))
273
targetLabels.Set("__meta_kubernetes_pod_node_name", opts.Pod.Spec.NodeName)
274
targetLabels.Set("__meta_kubernetes_pod_host_ip", opts.Pod.Status.HostIP)
275
targetLabels.Set("__meta_kubernetes_pod_uid", string(opts.Pod.UID))
276
277
for _, ref := range opts.Pod.GetOwnerReferences() {
278
if ref.Controller != nil && *ref.Controller {
279
targetLabels.Set("__meta_kubernetes_pod_controller_kind", ref.Kind)
280
targetLabels.Set("__meta_kubernetes_pod_controller_name", ref.Name)
281
break
282
}
283
}
284
285
// Add labels needed for collecting.
286
targetLabels.Set(kubetail.LabelPodNamespace, opts.Pod.Namespace)
287
targetLabels.Set(kubetail.LabelPodName, opts.Pod.Name)
288
targetLabels.Set(kubetail.LabelPodContainerName, opts.Container.Name)
289
targetLabels.Set(kubetail.LabelPodUID, string(opts.Pod.GetUID()))
290
291
// Add default labels (job, instance)
292
targetLabels.Set(model.InstanceLabel, fmt.Sprintf("%s/%s:%s", opts.Pod.Namespace, opts.Pod.Name, opts.Container.Name))
293
targetLabels.Set(model.JobLabel, fmt.Sprintf("%s/%s", opts.PodLogs.Namespace, opts.PodLogs.Name))
294
295
res := targetLabels.Labels(nil)
296
sort.Sort(res)
297
return res
298
}
299
300
func podReady(pod *corev1.Pod) model.LabelValue {
301
for _, cond := range pod.Status.Conditions {
302
if cond.Type == corev1.PodReady {
303
return model.LabelValue(strings.ToLower(string(cond.Status)))
304
}
305
}
306
return model.LabelValue(strings.ToLower(string(corev1.ConditionUnknown)))
307
}
308
309
type DiscoveredPodLogs struct {
310
Namespace string `river:"namespace,attr"`
311
Name string `river:"name,attr"`
312
LastReconcile time.Time `river:"last_reconcile,attr,optional"`
313
ReconcileError string `river:"reconcile_error,attr,optional"`
314
315
Pods []DiscoveredPod `river:"pod,block"`
316
}
317
318
type DiscoveredPod struct {
319
Namespace string `river:"namespace,attr"`
320
Name string `river:"name,attr"`
321
ReconcileError string `river:"reconcile_error,attr,optional"`
322
323
Containers []DiscoveredContainer `river:"container,block"`
324
}
325
326
type DiscoveredContainer struct {
327
DiscoveredLabels map[string]string `river:"discovered_labels,attr"`
328
Labels map[string]string `river:"labels,attr"`
329
ReconcileError string `river:"reconcile_error,attr,optional"`
330
}
331
332