Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes_events/event_controller.go
4096 views
1
package kubernetes_events
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"strings"
8
"time"
9
10
"github.com/cespare/xxhash/v2"
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/agent/component/common/loki/positions"
15
"github.com/grafana/agent/pkg/runner"
16
"github.com/grafana/loki/pkg/logproto"
17
"github.com/prometheus/common/model"
18
corev1 "k8s.io/api/core/v1"
19
"k8s.io/apimachinery/pkg/runtime"
20
"k8s.io/client-go/rest"
21
cachetools "k8s.io/client-go/tools/cache"
22
"sigs.k8s.io/controller-runtime/pkg/cache"
23
"sigs.k8s.io/controller-runtime/pkg/client"
24
)
25
26
type eventControllerTask struct {
27
Log log.Logger
28
Config *rest.Config // Config to connect to Kubernetes.
29
Namespace string // Namespace to watch for events in.
30
JobName string // Label value to use for job.
31
InstanceName string // Label value to use for instance.
32
Receiver loki.LogsReceiver
33
Positions positions.Positions
34
}
35
36
// Hash implements [runner.Task].
37
func (t eventControllerTask) Hash() uint64 {
38
return xxhash.Sum64String(t.Namespace)
39
}
40
41
// Equals implements [runner.Task].
42
func (t eventControllerTask) Equals(other runner.Task) bool {
43
// We can do a direct comparison since the two task types are comparable.
44
return t == other.(eventControllerTask)
45
}
46
47
type eventController struct {
48
log log.Logger
49
task eventControllerTask
50
handler loki.EntryHandler
51
52
positionsKey string
53
initTimestamp time.Time
54
}
55
56
func newEventController(task eventControllerTask) *eventController {
57
var key string
58
if task.Namespace == "" {
59
key = positions.CursorKey("events")
60
} else {
61
key = positions.CursorKey("events-" + task.Namespace)
62
}
63
64
lastTimestamp, _ := task.Positions.Get(key, "")
65
66
return &eventController{
67
log: task.Log,
68
task: task,
69
handler: loki.NewEntryHandler(task.Receiver, func() {}),
70
positionsKey: key,
71
initTimestamp: time.UnixMicro(lastTimestamp),
72
}
73
}
74
75
func (ctrl *eventController) Run(ctx context.Context) {
76
defer ctrl.handler.Stop()
77
78
level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace)
79
defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace)
80
81
if err := ctrl.runError(ctx); err != nil {
82
level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err)
83
}
84
}
85
86
func (ctrl *eventController) runError(ctx context.Context) error {
87
scheme := runtime.NewScheme()
88
if err := corev1.AddToScheme(scheme); err != nil {
89
return fmt.Errorf("adding core to scheme: %w", err)
90
}
91
92
opts := cache.Options{
93
Scheme: scheme,
94
Namespace: ctrl.task.Namespace,
95
}
96
informers, err := cache.New(ctrl.task.Config, opts)
97
if err != nil {
98
return fmt.Errorf("creating informers cache: %w", err)
99
}
100
101
go func() {
102
err := informers.Start(ctx)
103
if err != nil && ctx.Err() != nil {
104
level.Error(ctrl.log).Log("msg", "failed to start informers", "err", err)
105
}
106
}()
107
108
if !informers.WaitForCacheSync(ctx) {
109
return fmt.Errorf("informer caches failed to sync")
110
}
111
if err := ctrl.configureInformers(ctx, informers); err != nil {
112
return fmt.Errorf("failed to configure informers: %w", err)
113
}
114
115
<-ctx.Done()
116
return nil
117
}
118
119
func (ctrl *eventController) configureInformers(ctx context.Context, informers cache.Informers) error {
120
types := []client.Object{
121
&corev1.Event{},
122
}
123
124
informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)
125
defer cancel()
126
127
for _, ty := range types {
128
informer, err := informers.GetInformer(informerCtx, ty)
129
if err != nil {
130
if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout
131
return fmt.Errorf("timeout exceeded while configuring informers. Check the connection"+
132
" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", ty)
133
}
134
return err
135
}
136
137
_, err = informer.AddEventHandler(cachetools.ResourceEventHandlerFuncs{
138
AddFunc: func(obj interface{}) { ctrl.onAdd(ctx, obj) },
139
UpdateFunc: func(oldObj, newObj interface{}) { ctrl.onUpdate(ctx, oldObj, newObj) },
140
DeleteFunc: func(obj interface{}) { ctrl.onDelete(ctx, obj) },
141
})
142
if err != nil {
143
return err
144
}
145
}
146
return nil
147
}
148
149
func (ctrl *eventController) onAdd(ctx context.Context, obj interface{}) {
150
event, ok := obj.(*corev1.Event)
151
if !ok {
152
level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", obj))
153
return
154
}
155
err := ctrl.handleEvent(ctx, event)
156
if err != nil {
157
level.Error(ctrl.log).Log("msg", "error handling event", "err", err)
158
}
159
}
160
161
func (ctrl *eventController) onUpdate(ctx context.Context, oldObj, newObj interface{}) {
162
oldEvent, ok := oldObj.(*corev1.Event)
163
if !ok {
164
level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", oldObj))
165
return
166
}
167
newEvent, ok := newObj.(*corev1.Event)
168
if !ok {
169
level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", newObj))
170
return
171
}
172
173
if oldEvent.GetResourceVersion() == newEvent.GetResourceVersion() {
174
level.Debug(ctrl.log).Log("msg", "resource version didn't change, ignoring call to onUpdate", "resource version", newEvent.GetResourceVersion())
175
return
176
}
177
178
err := ctrl.handleEvent(ctx, newEvent)
179
if err != nil {
180
level.Error(ctrl.log).Log("msg", "error handling event", "err", err)
181
}
182
}
183
184
func (ctrl *eventController) onDelete(ctx context.Context, obj interface{}) {
185
// no-op: the event got deleted from Kubernetes, but there's nothing to log
186
// when this happens.
187
}
188
189
func (ctrl *eventController) handleEvent(ctx context.Context, event *corev1.Event) error {
190
eventTs := eventTimestamp(event)
191
192
// Events don't have any ordering guarantees, so we can't rely on comparing
193
// the timestamp of this event to any other event received.
194
//
195
// We use a best-effort attempt to not re-deliver any events we've already
196
// logged by checking the timestamp from when the worker started. This may
197
// still cause us to drop some events in between recreating workers, but it
198
// minimizes risk.
199
//
200
// TODO(rfratto): a longer term solution would be to track timestamps for
201
// each involved object (or something similar), but that solution would need
202
// to make sure to not leak those timestamps, and find a way to recognize
203
// that involved objects have been deleted.
204
if !eventTs.After(ctrl.initTimestamp) {
205
return nil
206
}
207
208
lset, msg, err := ctrl.parseEvent(event)
209
if err != nil {
210
return err
211
}
212
213
entry := loki.Entry{
214
Entry: logproto.Entry{
215
Timestamp: eventTs,
216
Line: msg,
217
},
218
Labels: lset,
219
}
220
221
select {
222
case <-ctx.Done():
223
return ctx.Err()
224
case ctrl.handler.Chan() <- entry:
225
// Update position offset only after it's been sent to the next set of
226
// components.
227
ctrl.task.Positions.Put(ctrl.positionsKey, "", eventTs.UnixMicro())
228
return nil
229
}
230
}
231
232
func (ctrl *eventController) parseEvent(event *corev1.Event) (model.LabelSet, string, error) {
233
var (
234
msg strings.Builder
235
lset = make(model.LabelSet)
236
)
237
238
obj := event.InvolvedObject
239
if obj.Name == "" {
240
return nil, "", fmt.Errorf("no involved object for event")
241
}
242
243
lset[model.LabelName("namespace")] = model.LabelValue(obj.Namespace)
244
lset[model.LabelName("job")] = model.LabelValue(ctrl.task.JobName)
245
lset[model.LabelName("instance")] = model.LabelValue(ctrl.task.InstanceName)
246
247
fmt.Fprintf(&msg, "name=%s ", obj.Name)
248
if obj.Kind != "" {
249
fmt.Fprintf(&msg, "kind=%s ", obj.Kind)
250
}
251
if event.Action != "" {
252
fmt.Fprintf(&msg, "action=%s ", event.Action)
253
}
254
if obj.APIVersion != "" {
255
fmt.Fprintf(&msg, "objectAPIversion=%s ", obj.APIVersion)
256
}
257
if obj.ResourceVersion != "" {
258
fmt.Fprintf(&msg, "objectRV=%s ", obj.ResourceVersion)
259
}
260
if event.ResourceVersion != "" {
261
fmt.Fprintf(&msg, "eventRV=%s ", event.ResourceVersion)
262
}
263
if event.ReportingInstance != "" {
264
fmt.Fprintf(&msg, "reportinginstance=%s ", event.ReportingInstance)
265
}
266
if event.ReportingController != "" {
267
fmt.Fprintf(&msg, "reportingcontroller=%s ", event.ReportingController)
268
}
269
if event.Source.Component != "" {
270
fmt.Fprintf(&msg, "sourcecomponent=%s ", event.Source.Component)
271
}
272
if event.Source.Host != "" {
273
fmt.Fprintf(&msg, "sourcehost=%s ", event.Source.Host)
274
}
275
if event.Reason != "" {
276
fmt.Fprintf(&msg, "reason=%s ", event.Reason)
277
}
278
if event.Type != "" {
279
fmt.Fprintf(&msg, "type=%s ", event.Type)
280
}
281
if event.Count != 0 {
282
fmt.Fprintf(&msg, "count=%d ", event.Count)
283
}
284
285
fmt.Fprintf(&msg, "msg=%q ", event.Message)
286
287
return lset, msg.String(), nil
288
}
289
290
func eventTimestamp(event *corev1.Event) time.Time {
291
if !event.LastTimestamp.IsZero() {
292
return event.LastTimestamp.Time
293
}
294
return event.EventTime.Time
295
}
296
297
func (ctrl *eventController) DebugInfo() controllerInfo {
298
ts, _ := ctrl.task.Positions.Get(ctrl.positionsKey, "")
299
300
return controllerInfo{
301
Namespace: ctrl.task.Namespace,
302
LastTimestamp: time.UnixMicro(ts),
303
}
304
}
305
306
type controllerInfo struct {
307
Namespace string `river:"namespace,attr"`
308
LastTimestamp time.Time `river:"last_event_timestamp,attr"`
309
}
310
311