Path: blob/main/pkg/integrations/v2/eventhandler/eventhandler.go
5287 views
// Package eventhandler watches for Kubernetes Event objects and hands them off to1// Agent's Logs subsystem (embedded promtail)2package eventhandler34import (5"context"6"encoding/json"7"fmt"8"os"9"path/filepath"10"strings"11"sync"12"time"1314v1 "k8s.io/api/core/v1"15"k8s.io/client-go/informers"16"k8s.io/client-go/kubernetes"17"k8s.io/client-go/rest"18"k8s.io/client-go/tools/cache"19"k8s.io/client-go/tools/clientcmd"20"k8s.io/client-go/util/homedir"2122"github.com/go-kit/log"23"github.com/go-kit/log/level"24"github.com/grafana/agent/pkg/integrations/v2"25"github.com/grafana/agent/pkg/logs"26"github.com/grafana/loki/clients/pkg/promtail/api"27"github.com/grafana/loki/pkg/logproto"28"github.com/prometheus/common/model"29"github.com/prometheus/prometheus/model/labels"30)3132const (33cacheFileMode = 060034)3536// EventHandler watches for Kubernetes Event objects and hands them off to37// Agent's logs subsystem (embedded promtail).38type EventHandler struct {39LogsClient *logs.Logs40LogsInstance string41Log log.Logger42CachePath string43LastEvent *ShippedEvents44InitEvent *ShippedEvents45EventInformer cache.SharedIndexInformer46SendTimeout time.Duration47ticker *time.Ticker48instance string49extraLabels labels.Labels50sync.Mutex51}5253// ShippedEvents stores a timestamp and map of event ResourceVersions shipped for that timestamp.54// Used to avoid double-shipping events upon restart.55type ShippedEvents struct {56// shipped event's timestamp57Timestamp time.Time `json:"ts"`58// map of event RVs (resource versions) already "shipped" (handed off) for this timestamp.59// this is to handle the case of a timestamp having multiple events,60// which happens quite frequently.61RvMap map[string]struct{} `json:"resourceVersion"`62}6364func newEventHandler(l log.Logger, globals integrations.Globals, c *Config) (integrations.Integration, error) {65var (66config *rest.Config67err error68factory informers.SharedInformerFactory69id string70)7172// Try using KubeconfigPath or inClusterConfig73config, err = clientcmd.BuildConfigFromFlags("", c.KubeconfigPath)74if err != nil {75level.Error(l).Log("msg", "Loading from KubeconfigPath or inClusterConfig failed", "err", err)76// Trying default home location77if home := homedir.HomeDir(); home != "" {78kubeconfigPath := filepath.Join(home, ".kube", "config")79config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)80if err != nil {81level.Error(l).Log("msg", "Could not load a kubeconfig", "err", err)82return nil, err83}84} else {85err = fmt.Errorf("could not load a kubeconfig")86return nil, err87}88}8990clientset, err := kubernetes.NewForConfig(config)91if err != nil {92return nil, err93}9495// get an informer96if c.Namespace == "" {97factory = informers.NewSharedInformerFactory(clientset, time.Duration(c.InformerResync)*time.Second)98} else {99factory = informers.NewSharedInformerFactoryWithOptions(clientset, time.Duration(c.InformerResync)*time.Second, informers.WithNamespace(c.Namespace))100}101102eventInformer := factory.Core().V1().Events().Informer()103id, _ = c.Identifier(globals)104105eh := &EventHandler{106LogsClient: globals.Logs,107LogsInstance: c.LogsInstance,108Log: l,109CachePath: c.CachePath,110EventInformer: eventInformer,111SendTimeout: time.Duration(c.SendTimeout) * time.Second,112instance: id,113extraLabels: c.ExtraLabels,114}115// set the resource handler fns116if err := eh.initInformer(eventInformer); err != nil {117return nil, err118}119eh.ticker = time.NewTicker(time.Duration(c.FlushInterval) * time.Second)120return eh, nil121}122123// Initialize informer by setting event handler fns124func (eh *EventHandler) initInformer(eventsInformer cache.SharedIndexInformer) error {125_, err := eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{126AddFunc: eh.addEvent,127UpdateFunc: eh.updateEvent,128DeleteFunc: eh.deleteEvent,129})130return err131}132133// Handles new event objects134func (eh *EventHandler) addEvent(obj interface{}) {135event, _ := obj.(*v1.Event)136137err := eh.handleEvent(event)138if err != nil {139level.Error(eh.Log).Log("msg", "Error handling event", "err", err, "event", event)140}141}142143// Handles event object updates. Note that this get triggered on informer resyncs and also144// events occurring more than once (in which case .count is incremented)145func (eh *EventHandler) updateEvent(objOld interface{}, objNew interface{}) {146eOld, _ := objOld.(*v1.Event)147eNew, _ := objNew.(*v1.Event)148149if eOld.GetResourceVersion() == eNew.GetResourceVersion() {150// ignore resync updates151level.Debug(eh.Log).Log("msg", "Event RV didn't change, ignoring", "eRV", eNew.ResourceVersion)152return153}154155err := eh.handleEvent(eNew)156if err != nil {157level.Error(eh.Log).Log("msg", "Error handling event", "err", err, "event", eNew)158}159}160161func (eh *EventHandler) handleEvent(event *v1.Event) error {162eventTs := getTimestamp(event)163164// if event is older than the one stored in cache on startup, we've shipped it165if eventTs.Before(eh.InitEvent.Timestamp) {166return nil167}168// if event is equal and is in map, we've shipped it169if eventTs.Equal(eh.InitEvent.Timestamp) {170if _, ok := eh.InitEvent.RvMap[event.ResourceVersion]; ok {171return nil172}173}174175labels, msg, err := eh.extractEvent(event)176if err != nil {177return err178}179180entry := newEntry(msg, eventTs, labels)181ok := eh.LogsClient.Instance(eh.LogsInstance).SendEntry(entry, eh.SendTimeout)182if !ok {183err = fmt.Errorf("msg=%s entry=%s", "error handing entry off to promtail", entry)184return err185}186level.Info(eh.Log).Log("msg", "Shipped entry", "eventRV", event.ResourceVersion, "eventMsg", event.Message)187188// update cache with new "last" event189err = eh.updateLastEvent(event, eventTs)190if err != nil {191return err192}193return nil194}195196// Called when event objects are removed from etcd, can safely ignore this197func (eh *EventHandler) deleteEvent(obj interface{}) {198}199200// extract data from event fields and create labels, etc.201// TODO: ship JSON blobs and allow users to configure using pipelines etc.202// instead of hardcoding labels here203func (eh *EventHandler) extractEvent(event *v1.Event) (model.LabelSet, string, error) {204var (205msg strings.Builder206labels = make(model.LabelSet)207)208209obj := event.InvolvedObject210if obj.Name == "" {211return nil, "", fmt.Errorf("no involved object for event")212}213msg.WriteString(fmt.Sprintf("name=%s ", obj.Name))214215labels[model.LabelName("namespace")] = model.LabelValue(obj.Namespace)216// TODO(hjet) omit "kubernetes"217labels[model.LabelName("job")] = model.LabelValue("integrations/kubernetes/eventhandler")218labels[model.LabelName("instance")] = model.LabelValue(eh.instance)219labels[model.LabelName("agent_hostname")] = model.LabelValue(eh.instance)220for _, lbl := range eh.extraLabels {221labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)222}223224// we add these fields to the log line to reduce label bloat and cardinality225if obj.Kind != "" {226msg.WriteString(fmt.Sprintf("kind=%s ", obj.Kind))227}228if event.Action != "" {229msg.WriteString(fmt.Sprintf("action=%s ", event.Action))230}231if obj.APIVersion != "" {232msg.WriteString(fmt.Sprintf("objectAPIversion=%s ", obj.APIVersion))233}234if obj.ResourceVersion != "" {235msg.WriteString(fmt.Sprintf("objectRV=%s ", obj.ResourceVersion))236}237if event.ResourceVersion != "" {238msg.WriteString(fmt.Sprintf("eventRV=%s ", event.ResourceVersion))239}240if event.ReportingInstance != "" {241msg.WriteString(fmt.Sprintf("reportinginstance=%s ", event.ReportingInstance))242}243if event.ReportingController != "" {244msg.WriteString(fmt.Sprintf("reportingcontroller=%s ", event.ReportingController))245}246if event.Source.Component != "" {247msg.WriteString(fmt.Sprintf("sourcecomponent=%s ", event.Source.Component))248}249if event.Source.Host != "" {250msg.WriteString(fmt.Sprintf("sourcehost=%s ", event.Source.Host))251}252if event.Reason != "" {253msg.WriteString(fmt.Sprintf("reason=%s ", event.Reason))254}255if event.Type != "" {256msg.WriteString(fmt.Sprintf("type=%s ", event.Type))257}258if event.Count != 0 {259msg.WriteString(fmt.Sprintf("count=%d ", event.Count))260}261262msg.WriteString(fmt.Sprintf("msg=%q", event.Message))263264return labels, msg.String(), nil265}266267func getTimestamp(event *v1.Event) time.Time {268if !event.LastTimestamp.IsZero() {269return event.LastTimestamp.Time270}271return event.EventTime.Time272}273274func newEntry(msg string, ts time.Time, labels model.LabelSet) api.Entry {275entry := logproto.Entry{Timestamp: ts, Line: msg}276return api.Entry{Labels: labels, Entry: entry}277}278279// maintain "last event" state280func (eh *EventHandler) updateLastEvent(e *v1.Event, eventTs time.Time) error {281eh.Lock()282defer eh.Unlock()283284eventRv := e.ResourceVersion285286if eh.LastEvent == nil {287// startup288eh.LastEvent = &ShippedEvents{Timestamp: eventTs, RvMap: make(map[string]struct{})}289eh.LastEvent.RvMap[eventRv] = struct{}{}290return nil291}292293// if timestamp is the same, add to map294if eh.LastEvent != nil && eventTs.Equal(eh.LastEvent.Timestamp) {295eh.LastEvent.RvMap[eventRv] = struct{}{}296return nil297}298299// if timestamp is different, create a new ShippedEvents struct300eh.LastEvent = &ShippedEvents{Timestamp: eventTs, RvMap: make(map[string]struct{})}301eh.LastEvent.RvMap[eventRv] = struct{}{}302return nil303}304305func (eh *EventHandler) writeOutLastEvent() error {306level.Info(eh.Log).Log("msg", "Flushing last event to disk")307308eh.Lock()309defer eh.Unlock()310311if eh.LastEvent == nil {312level.Info(eh.Log).Log("msg", "No last event to flush, returning")313return nil314}315316temp := eh.CachePath + "-new"317buf, err := json.Marshal(&eh.LastEvent)318if err != nil {319return err320}321322err = os.WriteFile(temp, buf, os.FileMode(cacheFileMode))323if err != nil {324return err325}326327if err = os.Rename(temp, eh.CachePath); err != nil {328return err329}330level.Info(eh.Log).Log("msg", "Flushed last event to disk")331return nil332}333334// RunIntegration runs the eventhandler integration335func (eh *EventHandler) RunIntegration(ctx context.Context) error {336var wg sync.WaitGroup337338ctx, cancel := context.WithCancel(ctx)339defer cancel()340341// Quick check to make sure logs instance exists342if i := eh.LogsClient.Instance(eh.LogsInstance); i == nil {343level.Error(eh.Log).Log("msg", "Logs instance not configured", "instance", eh.LogsInstance)344cancel()345}346347cacheDir := filepath.Dir(eh.CachePath)348if err := os.MkdirAll(cacheDir, 0755); err != nil {349level.Error(eh.Log).Log("msg", "Failed to create cache dir", "err", err)350cancel()351}352353// cache file to store events shipped (prevents double shipping on restart)354cacheFile, err := os.OpenFile(eh.CachePath, os.O_RDWR|os.O_CREATE, cacheFileMode)355if err != nil {356level.Error(eh.Log).Log("msg", "Failed to open or create cache file", "err", err)357cancel()358}359360// attempt to read last timestamp from cache file into a ShippedEvents struct361initEvent, err := readInitEvent(cacheFile, eh.Log)362if err != nil {363level.Error(eh.Log).Log("msg", "Failed to read last event from cache file", "err", err)364cancel()365}366eh.InitEvent = initEvent367368if err = cacheFile.Close(); err != nil {369level.Error(eh.Log).Log("msg", "Failed to close cache file", "err", err)370cancel()371}372373go func() {374level.Info(eh.Log).Log("msg", "Waiting for cache to sync (initial List of events)")375isSynced := cache.WaitForCacheSync(ctx.Done(), eh.EventInformer.HasSynced)376if !isSynced {377level.Error(eh.Log).Log("msg", "Failed to sync informer cache")378// maybe want to bail here379return380}381level.Info(eh.Log).Log("msg", "Informer cache synced")382}()383384// start the informer385// technically we should prob use the factory here, but since we386// only have one informer atm, this likely doesn't matter387go eh.EventInformer.Run(ctx.Done())388389// wait for last event to flush before returning390wg.Add(1)391go func() {392defer wg.Done()393eh.runTicker(ctx.Done())394}()395wg.Wait()396397return nil398}399400// write out last event every FlushInterval401func (eh *EventHandler) runTicker(stopCh <-chan struct{}) {402for {403select {404case <-stopCh:405if err := eh.writeOutLastEvent(); err != nil {406level.Error(eh.Log).Log("msg", "Failed to flush last event", "err", err)407}408return409case <-eh.ticker.C:410if err := eh.writeOutLastEvent(); err != nil {411level.Error(eh.Log).Log("msg", "Failed to flush last event", "err", err)412}413}414}415}416417func readInitEvent(file *os.File, logger log.Logger) (*ShippedEvents, error) {418var (419initEvent = new(ShippedEvents)420)421422stat, err := file.Stat()423if err != nil {424return nil, err425}426if stat.Size() == 0 {427level.Info(logger).Log("msg", "Cache file empty, setting zero-valued initEvent")428return initEvent, nil429}430431dec := json.NewDecoder(file)432err = dec.Decode(&initEvent)433if err != nil {434err = fmt.Errorf("could not read init event from cache: %s. Please delete the cache file", err)435return nil, err436}437level.Info(logger).Log("msg", "Loaded init event from cache file", "initEventTime", initEvent.Timestamp)438return initEvent, nil439}440441442