Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/eventhandler/eventhandler.go
5287 views
1
// Package eventhandler watches for Kubernetes Event objects and hands them off to
2
// Agent's Logs subsystem (embedded promtail)
3
package eventhandler
4
5
import (
6
"context"
7
"encoding/json"
8
"fmt"
9
"os"
10
"path/filepath"
11
"strings"
12
"sync"
13
"time"
14
15
v1 "k8s.io/api/core/v1"
16
"k8s.io/client-go/informers"
17
"k8s.io/client-go/kubernetes"
18
"k8s.io/client-go/rest"
19
"k8s.io/client-go/tools/cache"
20
"k8s.io/client-go/tools/clientcmd"
21
"k8s.io/client-go/util/homedir"
22
23
"github.com/go-kit/log"
24
"github.com/go-kit/log/level"
25
"github.com/grafana/agent/pkg/integrations/v2"
26
"github.com/grafana/agent/pkg/logs"
27
"github.com/grafana/loki/clients/pkg/promtail/api"
28
"github.com/grafana/loki/pkg/logproto"
29
"github.com/prometheus/common/model"
30
"github.com/prometheus/prometheus/model/labels"
31
)
32
33
const (
34
cacheFileMode = 0600
35
)
36
37
// EventHandler watches for Kubernetes Event objects and hands them off to
38
// Agent's logs subsystem (embedded promtail).
39
type EventHandler struct {
40
LogsClient *logs.Logs
41
LogsInstance string
42
Log log.Logger
43
CachePath string
44
LastEvent *ShippedEvents
45
InitEvent *ShippedEvents
46
EventInformer cache.SharedIndexInformer
47
SendTimeout time.Duration
48
ticker *time.Ticker
49
instance string
50
extraLabels labels.Labels
51
sync.Mutex
52
}
53
54
// ShippedEvents stores a timestamp and map of event ResourceVersions shipped for that timestamp.
55
// Used to avoid double-shipping events upon restart.
56
type ShippedEvents struct {
57
// shipped event's timestamp
58
Timestamp time.Time `json:"ts"`
59
// map of event RVs (resource versions) already "shipped" (handed off) for this timestamp.
60
// this is to handle the case of a timestamp having multiple events,
61
// which happens quite frequently.
62
RvMap map[string]struct{} `json:"resourceVersion"`
63
}
64
65
func newEventHandler(l log.Logger, globals integrations.Globals, c *Config) (integrations.Integration, error) {
66
var (
67
config *rest.Config
68
err error
69
factory informers.SharedInformerFactory
70
id string
71
)
72
73
// Try using KubeconfigPath or inClusterConfig
74
config, err = clientcmd.BuildConfigFromFlags("", c.KubeconfigPath)
75
if err != nil {
76
level.Error(l).Log("msg", "Loading from KubeconfigPath or inClusterConfig failed", "err", err)
77
// Trying default home location
78
if home := homedir.HomeDir(); home != "" {
79
kubeconfigPath := filepath.Join(home, ".kube", "config")
80
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
81
if err != nil {
82
level.Error(l).Log("msg", "Could not load a kubeconfig", "err", err)
83
return nil, err
84
}
85
} else {
86
err = fmt.Errorf("could not load a kubeconfig")
87
return nil, err
88
}
89
}
90
91
clientset, err := kubernetes.NewForConfig(config)
92
if err != nil {
93
return nil, err
94
}
95
96
// get an informer
97
if c.Namespace == "" {
98
factory = informers.NewSharedInformerFactory(clientset, time.Duration(c.InformerResync)*time.Second)
99
} else {
100
factory = informers.NewSharedInformerFactoryWithOptions(clientset, time.Duration(c.InformerResync)*time.Second, informers.WithNamespace(c.Namespace))
101
}
102
103
eventInformer := factory.Core().V1().Events().Informer()
104
id, _ = c.Identifier(globals)
105
106
eh := &EventHandler{
107
LogsClient: globals.Logs,
108
LogsInstance: c.LogsInstance,
109
Log: l,
110
CachePath: c.CachePath,
111
EventInformer: eventInformer,
112
SendTimeout: time.Duration(c.SendTimeout) * time.Second,
113
instance: id,
114
extraLabels: c.ExtraLabels,
115
}
116
// set the resource handler fns
117
if err := eh.initInformer(eventInformer); err != nil {
118
return nil, err
119
}
120
eh.ticker = time.NewTicker(time.Duration(c.FlushInterval) * time.Second)
121
return eh, nil
122
}
123
124
// Initialize informer by setting event handler fns
125
func (eh *EventHandler) initInformer(eventsInformer cache.SharedIndexInformer) error {
126
_, err := eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
127
AddFunc: eh.addEvent,
128
UpdateFunc: eh.updateEvent,
129
DeleteFunc: eh.deleteEvent,
130
})
131
return err
132
}
133
134
// Handles new event objects
135
func (eh *EventHandler) addEvent(obj interface{}) {
136
event, _ := obj.(*v1.Event)
137
138
err := eh.handleEvent(event)
139
if err != nil {
140
level.Error(eh.Log).Log("msg", "Error handling event", "err", err, "event", event)
141
}
142
}
143
144
// Handles event object updates. Note that this get triggered on informer resyncs and also
145
// events occurring more than once (in which case .count is incremented)
146
func (eh *EventHandler) updateEvent(objOld interface{}, objNew interface{}) {
147
eOld, _ := objOld.(*v1.Event)
148
eNew, _ := objNew.(*v1.Event)
149
150
if eOld.GetResourceVersion() == eNew.GetResourceVersion() {
151
// ignore resync updates
152
level.Debug(eh.Log).Log("msg", "Event RV didn't change, ignoring", "eRV", eNew.ResourceVersion)
153
return
154
}
155
156
err := eh.handleEvent(eNew)
157
if err != nil {
158
level.Error(eh.Log).Log("msg", "Error handling event", "err", err, "event", eNew)
159
}
160
}
161
162
func (eh *EventHandler) handleEvent(event *v1.Event) error {
163
eventTs := getTimestamp(event)
164
165
// if event is older than the one stored in cache on startup, we've shipped it
166
if eventTs.Before(eh.InitEvent.Timestamp) {
167
return nil
168
}
169
// if event is equal and is in map, we've shipped it
170
if eventTs.Equal(eh.InitEvent.Timestamp) {
171
if _, ok := eh.InitEvent.RvMap[event.ResourceVersion]; ok {
172
return nil
173
}
174
}
175
176
labels, msg, err := eh.extractEvent(event)
177
if err != nil {
178
return err
179
}
180
181
entry := newEntry(msg, eventTs, labels)
182
ok := eh.LogsClient.Instance(eh.LogsInstance).SendEntry(entry, eh.SendTimeout)
183
if !ok {
184
err = fmt.Errorf("msg=%s entry=%s", "error handing entry off to promtail", entry)
185
return err
186
}
187
level.Info(eh.Log).Log("msg", "Shipped entry", "eventRV", event.ResourceVersion, "eventMsg", event.Message)
188
189
// update cache with new "last" event
190
err = eh.updateLastEvent(event, eventTs)
191
if err != nil {
192
return err
193
}
194
return nil
195
}
196
197
// Called when event objects are removed from etcd, can safely ignore this
198
func (eh *EventHandler) deleteEvent(obj interface{}) {
199
}
200
201
// extract data from event fields and create labels, etc.
202
// TODO: ship JSON blobs and allow users to configure using pipelines etc.
203
// instead of hardcoding labels here
204
func (eh *EventHandler) extractEvent(event *v1.Event) (model.LabelSet, string, error) {
205
var (
206
msg strings.Builder
207
labels = make(model.LabelSet)
208
)
209
210
obj := event.InvolvedObject
211
if obj.Name == "" {
212
return nil, "", fmt.Errorf("no involved object for event")
213
}
214
msg.WriteString(fmt.Sprintf("name=%s ", obj.Name))
215
216
labels[model.LabelName("namespace")] = model.LabelValue(obj.Namespace)
217
// TODO(hjet) omit "kubernetes"
218
labels[model.LabelName("job")] = model.LabelValue("integrations/kubernetes/eventhandler")
219
labels[model.LabelName("instance")] = model.LabelValue(eh.instance)
220
labels[model.LabelName("agent_hostname")] = model.LabelValue(eh.instance)
221
for _, lbl := range eh.extraLabels {
222
labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
223
}
224
225
// we add these fields to the log line to reduce label bloat and cardinality
226
if obj.Kind != "" {
227
msg.WriteString(fmt.Sprintf("kind=%s ", obj.Kind))
228
}
229
if event.Action != "" {
230
msg.WriteString(fmt.Sprintf("action=%s ", event.Action))
231
}
232
if obj.APIVersion != "" {
233
msg.WriteString(fmt.Sprintf("objectAPIversion=%s ", obj.APIVersion))
234
}
235
if obj.ResourceVersion != "" {
236
msg.WriteString(fmt.Sprintf("objectRV=%s ", obj.ResourceVersion))
237
}
238
if event.ResourceVersion != "" {
239
msg.WriteString(fmt.Sprintf("eventRV=%s ", event.ResourceVersion))
240
}
241
if event.ReportingInstance != "" {
242
msg.WriteString(fmt.Sprintf("reportinginstance=%s ", event.ReportingInstance))
243
}
244
if event.ReportingController != "" {
245
msg.WriteString(fmt.Sprintf("reportingcontroller=%s ", event.ReportingController))
246
}
247
if event.Source.Component != "" {
248
msg.WriteString(fmt.Sprintf("sourcecomponent=%s ", event.Source.Component))
249
}
250
if event.Source.Host != "" {
251
msg.WriteString(fmt.Sprintf("sourcehost=%s ", event.Source.Host))
252
}
253
if event.Reason != "" {
254
msg.WriteString(fmt.Sprintf("reason=%s ", event.Reason))
255
}
256
if event.Type != "" {
257
msg.WriteString(fmt.Sprintf("type=%s ", event.Type))
258
}
259
if event.Count != 0 {
260
msg.WriteString(fmt.Sprintf("count=%d ", event.Count))
261
}
262
263
msg.WriteString(fmt.Sprintf("msg=%q", event.Message))
264
265
return labels, msg.String(), nil
266
}
267
268
func getTimestamp(event *v1.Event) time.Time {
269
if !event.LastTimestamp.IsZero() {
270
return event.LastTimestamp.Time
271
}
272
return event.EventTime.Time
273
}
274
275
func newEntry(msg string, ts time.Time, labels model.LabelSet) api.Entry {
276
entry := logproto.Entry{Timestamp: ts, Line: msg}
277
return api.Entry{Labels: labels, Entry: entry}
278
}
279
280
// maintain "last event" state
281
func (eh *EventHandler) updateLastEvent(e *v1.Event, eventTs time.Time) error {
282
eh.Lock()
283
defer eh.Unlock()
284
285
eventRv := e.ResourceVersion
286
287
if eh.LastEvent == nil {
288
// startup
289
eh.LastEvent = &ShippedEvents{Timestamp: eventTs, RvMap: make(map[string]struct{})}
290
eh.LastEvent.RvMap[eventRv] = struct{}{}
291
return nil
292
}
293
294
// if timestamp is the same, add to map
295
if eh.LastEvent != nil && eventTs.Equal(eh.LastEvent.Timestamp) {
296
eh.LastEvent.RvMap[eventRv] = struct{}{}
297
return nil
298
}
299
300
// if timestamp is different, create a new ShippedEvents struct
301
eh.LastEvent = &ShippedEvents{Timestamp: eventTs, RvMap: make(map[string]struct{})}
302
eh.LastEvent.RvMap[eventRv] = struct{}{}
303
return nil
304
}
305
306
func (eh *EventHandler) writeOutLastEvent() error {
307
level.Info(eh.Log).Log("msg", "Flushing last event to disk")
308
309
eh.Lock()
310
defer eh.Unlock()
311
312
if eh.LastEvent == nil {
313
level.Info(eh.Log).Log("msg", "No last event to flush, returning")
314
return nil
315
}
316
317
temp := eh.CachePath + "-new"
318
buf, err := json.Marshal(&eh.LastEvent)
319
if err != nil {
320
return err
321
}
322
323
err = os.WriteFile(temp, buf, os.FileMode(cacheFileMode))
324
if err != nil {
325
return err
326
}
327
328
if err = os.Rename(temp, eh.CachePath); err != nil {
329
return err
330
}
331
level.Info(eh.Log).Log("msg", "Flushed last event to disk")
332
return nil
333
}
334
335
// RunIntegration runs the eventhandler integration
336
func (eh *EventHandler) RunIntegration(ctx context.Context) error {
337
var wg sync.WaitGroup
338
339
ctx, cancel := context.WithCancel(ctx)
340
defer cancel()
341
342
// Quick check to make sure logs instance exists
343
if i := eh.LogsClient.Instance(eh.LogsInstance); i == nil {
344
level.Error(eh.Log).Log("msg", "Logs instance not configured", "instance", eh.LogsInstance)
345
cancel()
346
}
347
348
cacheDir := filepath.Dir(eh.CachePath)
349
if err := os.MkdirAll(cacheDir, 0755); err != nil {
350
level.Error(eh.Log).Log("msg", "Failed to create cache dir", "err", err)
351
cancel()
352
}
353
354
// cache file to store events shipped (prevents double shipping on restart)
355
cacheFile, err := os.OpenFile(eh.CachePath, os.O_RDWR|os.O_CREATE, cacheFileMode)
356
if err != nil {
357
level.Error(eh.Log).Log("msg", "Failed to open or create cache file", "err", err)
358
cancel()
359
}
360
361
// attempt to read last timestamp from cache file into a ShippedEvents struct
362
initEvent, err := readInitEvent(cacheFile, eh.Log)
363
if err != nil {
364
level.Error(eh.Log).Log("msg", "Failed to read last event from cache file", "err", err)
365
cancel()
366
}
367
eh.InitEvent = initEvent
368
369
if err = cacheFile.Close(); err != nil {
370
level.Error(eh.Log).Log("msg", "Failed to close cache file", "err", err)
371
cancel()
372
}
373
374
go func() {
375
level.Info(eh.Log).Log("msg", "Waiting for cache to sync (initial List of events)")
376
isSynced := cache.WaitForCacheSync(ctx.Done(), eh.EventInformer.HasSynced)
377
if !isSynced {
378
level.Error(eh.Log).Log("msg", "Failed to sync informer cache")
379
// maybe want to bail here
380
return
381
}
382
level.Info(eh.Log).Log("msg", "Informer cache synced")
383
}()
384
385
// start the informer
386
// technically we should prob use the factory here, but since we
387
// only have one informer atm, this likely doesn't matter
388
go eh.EventInformer.Run(ctx.Done())
389
390
// wait for last event to flush before returning
391
wg.Add(1)
392
go func() {
393
defer wg.Done()
394
eh.runTicker(ctx.Done())
395
}()
396
wg.Wait()
397
398
return nil
399
}
400
401
// write out last event every FlushInterval
402
func (eh *EventHandler) runTicker(stopCh <-chan struct{}) {
403
for {
404
select {
405
case <-stopCh:
406
if err := eh.writeOutLastEvent(); err != nil {
407
level.Error(eh.Log).Log("msg", "Failed to flush last event", "err", err)
408
}
409
return
410
case <-eh.ticker.C:
411
if err := eh.writeOutLastEvent(); err != nil {
412
level.Error(eh.Log).Log("msg", "Failed to flush last event", "err", err)
413
}
414
}
415
}
416
}
417
418
func readInitEvent(file *os.File, logger log.Logger) (*ShippedEvents, error) {
419
var (
420
initEvent = new(ShippedEvents)
421
)
422
423
stat, err := file.Stat()
424
if err != nil {
425
return nil, err
426
}
427
if stat.Size() == 0 {
428
level.Info(logger).Log("msg", "Cache file empty, setting zero-valued initEvent")
429
return initEvent, nil
430
}
431
432
dec := json.NewDecoder(file)
433
err = dec.Decode(&initEvent)
434
if err != nil {
435
err = fmt.Errorf("could not read init event from cache: %s. Please delete the cache file", err)
436
return nil, err
437
}
438
level.Info(logger).Log("msg", "Loaded init event from cache file", "initEventTime", initEvent.Timestamp)
439
return initEvent, nil
440
}
441
442