Path: blob/main/component/loki/source/kubernetes_events/kubernetes_events.go
4096 views
// Package kubernetes_events implements the loki.source.kubernetes_events1// component.2package kubernetes_events //nolint:golint34import (5"context"6"fmt"7"os"8"path/filepath"9"reflect"10"sync"11"time"1213"github.com/go-kit/log"14"github.com/go-kit/log/level"15"github.com/grafana/agent/component"16"github.com/grafana/agent/component/common/config"17"github.com/grafana/agent/component/common/kubernetes"18"github.com/grafana/agent/component/common/loki"19"github.com/grafana/agent/component/common/loki/positions"20"github.com/grafana/agent/pkg/river"21"github.com/grafana/agent/pkg/runner"22"github.com/oklog/run"23"k8s.io/client-go/rest"24)2526// Generous timeout period for configuring informers27const informerSyncTimeout = 10 * time.Second2829func init() {30component.Register(component.Registration{31Name: "loki.source.kubernetes_events",32Args: Arguments{},3334Build: func(opts component.Options, args component.Arguments) (component.Component, error) {35return New(opts, args.(Arguments))36},37})38}3940// Arguments holds values which are used to configure the41// loki.source.kubernetes_events component.42type Arguments struct {43ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`4445JobName string `river:"job_name,attr,optional"`46Namespaces []string `river:"namespaces,attr,optional"`4748// Client settings to connect to Kubernetes.49Client kubernetes.ClientArguments `river:"client,block,optional"`50}5152var _ river.Unmarshaler = (*Arguments)(nil)5354// DefaultArguments holds default settings for loki.source.kubernetes_events.55var DefaultArguments = Arguments{56JobName: "loki.source.kubernetes_events",5758Client: kubernetes.ClientArguments{59HTTPClientConfig: config.DefaultHTTPClientConfig,60},61}6263// UnmarshalRiver implements river.Unmarshaler and applies defaults.64func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {65*args = DefaultArguments6667type arguments Arguments68if err := f((*arguments)(args)); err != nil {69return err70}7172if args.JobName == "" {73return fmt.Errorf("job_name must not be an empty string")74}75return nil76}7778// Component implements the loki.source.kubernetes_events component, which79// watches events from Kubernetes and forwards received events to other Loki80// components.81type Component struct {82log log.Logger83opts component.Options84positions positions.Positions85handler loki.LogsReceiver86runner *runner.Runner[eventControllerTask]87newTasksCh chan struct{}8889mut sync.Mutex90args Arguments91restConfig *rest.Config9293tasksMut sync.RWMutex94tasks []eventControllerTask9596receiversMut sync.RWMutex97receivers []loki.LogsReceiver98}99100var (101_ component.Component = (*Component)(nil)102_ component.DebugComponent = (*Component)(nil)103)104105// New creates a new loki.source.kubernetes_events component.106func New(o component.Options, args Arguments) (*Component, error) {107err := os.MkdirAll(o.DataPath, 0750)108if err != nil && !os.IsExist(err) {109return nil, err110}111positionsFile, err := positions.New(o.Logger, positions.Config{112SyncPeriod: 10 * time.Second,113PositionsFile: filepath.Join(o.DataPath, "positions.yml"),114})115if err != nil {116return nil, err117}118119c := &Component{120log: o.Logger,121opts: o,122positions: positionsFile,123handler: make(loki.LogsReceiver),124runner: runner.New(func(t eventControllerTask) runner.Worker {125return newEventController(t)126}),127newTasksCh: make(chan struct{}, 1),128}129if err := c.Update(args); err != nil {130return nil, err131}132return c, nil133}134135// Run implements component.Component.136func (c *Component) Run(ctx context.Context) error {137ctx, cancel := context.WithCancel(ctx)138defer cancel()139140defer c.positions.Stop()141defer c.runner.Stop()142143var rg run.Group144145// Runner to apply tasks.146rg.Add(func() error {147for {148select {149case <-ctx.Done():150return nil151case <-c.newTasksCh:152c.tasksMut.RLock()153tasks := c.tasks154c.tasksMut.RUnlock()155156if err := c.runner.ApplyTasks(ctx, tasks); err != nil {157level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)158}159}160}161}, func(_ error) {162cancel()163})164165// Runner to forward received logs.166rg.Add(func() error {167for {168select {169case <-ctx.Done():170return nil171case entry := <-c.handler:172c.receiversMut.RLock()173receivers := c.receivers174c.receiversMut.RUnlock()175176for _, receiver := range receivers {177receiver <- entry178}179}180}181}, func(_ error) {182cancel()183})184185return rg.Run()186}187188// Update implements component.Component.189func (c *Component) Update(args component.Arguments) error {190c.mut.Lock()191defer c.mut.Unlock()192193newArgs := args.(Arguments)194195c.receiversMut.Lock()196c.receivers = newArgs.ForwardTo197c.receiversMut.Unlock()198199restConfig := c.restConfig200201// Create a new restConfig if we don't have one or if our arguments changed.202if restConfig == nil || !reflect.DeepEqual(c.args.Client, newArgs.Client) {203var err error204restConfig, err = newArgs.Client.BuildRESTConfig(c.log)205if err != nil {206return fmt.Errorf("building Kubernetes client config: %w", err)207}208}209210// Create a task for each defined namespace.211var newTasks []eventControllerTask212for _, namespace := range getNamespaces(newArgs) {213newTasks = append(newTasks, eventControllerTask{214Log: c.log,215Config: restConfig,216JobName: newArgs.JobName,217InstanceName: c.opts.ID,218Namespace: namespace,219Receiver: c.handler,220Positions: c.positions,221})222}223224c.tasksMut.Lock()225c.tasks = newTasks226c.tasksMut.Unlock()227228select {229case c.newTasksCh <- struct{}{}:230default:231// no-op: task reload already queued.232}233234c.args = newArgs235return nil236}237238// getNamespaces gets a list of namespaces to watch from the arguments. If the239// list of namespaces is empty, returns a slice to watch all namespaces.240func getNamespaces(args Arguments) []string {241if len(args.Namespaces) == 0 {242return []string{""} // Empty string means to watch all namespaces243}244return args.Namespaces245}246247// DebugInfo implements [component.DebugComponent].248func (c *Component) DebugInfo() interface{} {249type Info struct {250Controllers []controllerInfo `river:"event_controller,block,optional"`251}252253var info Info254for _, worker := range c.runner.Workers() {255info.Controllers = append(info.Controllers, worker.(*eventController).DebugInfo())256}257return info258}259260261