Path: blob/main/component/loki/source/kubernetes_events/event_controller.go
4096 views
package kubernetes_events12import (3"context"4"errors"5"fmt"6"strings"7"time"89"github.com/cespare/xxhash/v2"10"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/grafana/agent/component/common/loki"13"github.com/grafana/agent/component/common/loki/positions"14"github.com/grafana/agent/pkg/runner"15"github.com/grafana/loki/pkg/logproto"16"github.com/prometheus/common/model"17corev1 "k8s.io/api/core/v1"18"k8s.io/apimachinery/pkg/runtime"19"k8s.io/client-go/rest"20cachetools "k8s.io/client-go/tools/cache"21"sigs.k8s.io/controller-runtime/pkg/cache"22"sigs.k8s.io/controller-runtime/pkg/client"23)2425type eventControllerTask struct {26Log log.Logger27Config *rest.Config // Config to connect to Kubernetes.28Namespace string // Namespace to watch for events in.29JobName string // Label value to use for job.30InstanceName string // Label value to use for instance.31Receiver loki.LogsReceiver32Positions positions.Positions33}3435// Hash implements [runner.Task].36func (t eventControllerTask) Hash() uint64 {37return xxhash.Sum64String(t.Namespace)38}3940// Equals implements [runner.Task].41func (t eventControllerTask) Equals(other runner.Task) bool {42// We can do a direct comparison since the two task types are comparable.43return t == other.(eventControllerTask)44}4546type eventController struct {47log log.Logger48task eventControllerTask49handler loki.EntryHandler5051positionsKey string52initTimestamp time.Time53}5455func newEventController(task eventControllerTask) *eventController {56var key string57if task.Namespace == "" {58key = positions.CursorKey("events")59} else {60key = positions.CursorKey("events-" + task.Namespace)61}6263lastTimestamp, _ := task.Positions.Get(key, "")6465return &eventController{66log: task.Log,67task: task,68handler: loki.NewEntryHandler(task.Receiver, func() {}),69positionsKey: key,70initTimestamp: time.UnixMicro(lastTimestamp),71}72}7374func (ctrl *eventController) Run(ctx context.Context) {75defer ctrl.handler.Stop()7677level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace)78defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace)7980if err := ctrl.runError(ctx); err != nil {81level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err)82}83}8485func (ctrl *eventController) runError(ctx context.Context) error {86scheme := runtime.NewScheme()87if err := corev1.AddToScheme(scheme); err != nil {88return fmt.Errorf("adding core to scheme: %w", err)89}9091opts := cache.Options{92Scheme: scheme,93Namespace: ctrl.task.Namespace,94}95informers, err := cache.New(ctrl.task.Config, opts)96if err != nil {97return fmt.Errorf("creating informers cache: %w", err)98}99100go func() {101err := informers.Start(ctx)102if err != nil && ctx.Err() != nil {103level.Error(ctrl.log).Log("msg", "failed to start informers", "err", err)104}105}()106107if !informers.WaitForCacheSync(ctx) {108return fmt.Errorf("informer caches failed to sync")109}110if err := ctrl.configureInformers(ctx, informers); err != nil {111return fmt.Errorf("failed to configure informers: %w", err)112}113114<-ctx.Done()115return nil116}117118func (ctrl *eventController) configureInformers(ctx context.Context, informers cache.Informers) error {119types := []client.Object{120&corev1.Event{},121}122123informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)124defer cancel()125126for _, ty := range types {127informer, err := informers.GetInformer(informerCtx, ty)128if err != nil {129if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout130return fmt.Errorf("timeout exceeded while configuring informers. Check the connection"+131" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", ty)132}133return err134}135136_, err = informer.AddEventHandler(cachetools.ResourceEventHandlerFuncs{137AddFunc: func(obj interface{}) { ctrl.onAdd(ctx, obj) },138UpdateFunc: func(oldObj, newObj interface{}) { ctrl.onUpdate(ctx, oldObj, newObj) },139DeleteFunc: func(obj interface{}) { ctrl.onDelete(ctx, obj) },140})141if err != nil {142return err143}144}145return nil146}147148func (ctrl *eventController) onAdd(ctx context.Context, obj interface{}) {149event, ok := obj.(*corev1.Event)150if !ok {151level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", obj))152return153}154err := ctrl.handleEvent(ctx, event)155if err != nil {156level.Error(ctrl.log).Log("msg", "error handling event", "err", err)157}158}159160func (ctrl *eventController) onUpdate(ctx context.Context, oldObj, newObj interface{}) {161oldEvent, ok := oldObj.(*corev1.Event)162if !ok {163level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", oldObj))164return165}166newEvent, ok := newObj.(*corev1.Event)167if !ok {168level.Warn(ctrl.log).Log("msg", "received an event for a non-Event Kind", "type", fmt.Sprintf("%T", newObj))169return170}171172if oldEvent.GetResourceVersion() == newEvent.GetResourceVersion() {173level.Debug(ctrl.log).Log("msg", "resource version didn't change, ignoring call to onUpdate", "resource version", newEvent.GetResourceVersion())174return175}176177err := ctrl.handleEvent(ctx, newEvent)178if err != nil {179level.Error(ctrl.log).Log("msg", "error handling event", "err", err)180}181}182183func (ctrl *eventController) onDelete(ctx context.Context, obj interface{}) {184// no-op: the event got deleted from Kubernetes, but there's nothing to log185// when this happens.186}187188func (ctrl *eventController) handleEvent(ctx context.Context, event *corev1.Event) error {189eventTs := eventTimestamp(event)190191// Events don't have any ordering guarantees, so we can't rely on comparing192// the timestamp of this event to any other event received.193//194// We use a best-effort attempt to not re-deliver any events we've already195// logged by checking the timestamp from when the worker started. This may196// still cause us to drop some events in between recreating workers, but it197// minimizes risk.198//199// TODO(rfratto): a longer term solution would be to track timestamps for200// each involved object (or something similar), but that solution would need201// to make sure to not leak those timestamps, and find a way to recognize202// that involved objects have been deleted.203if !eventTs.After(ctrl.initTimestamp) {204return nil205}206207lset, msg, err := ctrl.parseEvent(event)208if err != nil {209return err210}211212entry := loki.Entry{213Entry: logproto.Entry{214Timestamp: eventTs,215Line: msg,216},217Labels: lset,218}219220select {221case <-ctx.Done():222return ctx.Err()223case ctrl.handler.Chan() <- entry:224// Update position offset only after it's been sent to the next set of225// components.226ctrl.task.Positions.Put(ctrl.positionsKey, "", eventTs.UnixMicro())227return nil228}229}230231func (ctrl *eventController) parseEvent(event *corev1.Event) (model.LabelSet, string, error) {232var (233msg strings.Builder234lset = make(model.LabelSet)235)236237obj := event.InvolvedObject238if obj.Name == "" {239return nil, "", fmt.Errorf("no involved object for event")240}241242lset[model.LabelName("namespace")] = model.LabelValue(obj.Namespace)243lset[model.LabelName("job")] = model.LabelValue(ctrl.task.JobName)244lset[model.LabelName("instance")] = model.LabelValue(ctrl.task.InstanceName)245246fmt.Fprintf(&msg, "name=%s ", obj.Name)247if obj.Kind != "" {248fmt.Fprintf(&msg, "kind=%s ", obj.Kind)249}250if event.Action != "" {251fmt.Fprintf(&msg, "action=%s ", event.Action)252}253if obj.APIVersion != "" {254fmt.Fprintf(&msg, "objectAPIversion=%s ", obj.APIVersion)255}256if obj.ResourceVersion != "" {257fmt.Fprintf(&msg, "objectRV=%s ", obj.ResourceVersion)258}259if event.ResourceVersion != "" {260fmt.Fprintf(&msg, "eventRV=%s ", event.ResourceVersion)261}262if event.ReportingInstance != "" {263fmt.Fprintf(&msg, "reportinginstance=%s ", event.ReportingInstance)264}265if event.ReportingController != "" {266fmt.Fprintf(&msg, "reportingcontroller=%s ", event.ReportingController)267}268if event.Source.Component != "" {269fmt.Fprintf(&msg, "sourcecomponent=%s ", event.Source.Component)270}271if event.Source.Host != "" {272fmt.Fprintf(&msg, "sourcehost=%s ", event.Source.Host)273}274if event.Reason != "" {275fmt.Fprintf(&msg, "reason=%s ", event.Reason)276}277if event.Type != "" {278fmt.Fprintf(&msg, "type=%s ", event.Type)279}280if event.Count != 0 {281fmt.Fprintf(&msg, "count=%d ", event.Count)282}283284fmt.Fprintf(&msg, "msg=%q ", event.Message)285286return lset, msg.String(), nil287}288289func eventTimestamp(event *corev1.Event) time.Time {290if !event.LastTimestamp.IsZero() {291return event.LastTimestamp.Time292}293return event.EventTime.Time294}295296func (ctrl *eventController) DebugInfo() controllerInfo {297ts, _ := ctrl.task.Positions.Get(ctrl.positionsKey, "")298299return controllerInfo{300Namespace: ctrl.task.Namespace,301LastTimestamp: time.UnixMicro(ts),302}303}304305type controllerInfo struct {306Namespace string `river:"namespace,attr"`307LastTimestamp time.Time `river:"last_event_timestamp,attr"`308}309310311