Path: blob/main/component/loki/source/kubernetes/kubernetes.go
4096 views
// Package kubernetes implements the loki.source.kubernetes component.1package kubernetes23import (4"context"5"fmt"6"os"7"path/filepath"8"reflect"9"sync"10"time"1112"github.com/go-kit/log"13"github.com/go-kit/log/level"14"github.com/grafana/agent/component"15"github.com/grafana/agent/component/common/config"16commonk8s "github.com/grafana/agent/component/common/kubernetes"17"github.com/grafana/agent/component/common/loki"18"github.com/grafana/agent/component/common/loki/positions"19"github.com/grafana/agent/component/discovery"20"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"21"github.com/grafana/agent/pkg/river"22"k8s.io/client-go/kubernetes"23)2425func init() {26component.Register(component.Registration{27Name: "loki.source.kubernetes",28Args: Arguments{},2930Build: func(opts component.Options, args component.Arguments) (component.Component, error) {31return New(opts, args.(Arguments))32},33})34}3536// Arguments holds values which are used to configure the loki.source.kubernetes37// component.38type Arguments struct {39Targets []discovery.Target `river:"targets,attr"`40ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`4142// Client settings to connect to Kubernetes.43Client commonk8s.ClientArguments `river:"client,block,optional"`44}4546var _ river.Unmarshaler = (*Arguments)(nil)4748// DefaultArguments holds default settings for loki.source.kubernetes.49var DefaultArguments = Arguments{50Client: commonk8s.ClientArguments{51HTTPClientConfig: config.DefaultHTTPClientConfig,52},53}5455// UnmarshalRiver implements river.Unmarshaler and applies defaults.56func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {57*args = DefaultArguments5859type arguments Arguments60return f((*arguments)(args))61}6263// Component implements the loki.source.kubernetes component.64type Component struct {65log log.Logger66opts component.Options67positions positions.Positions6869mut sync.Mutex70args Arguments71tailer *kubetail.Manager72lastOptions *kubetail.Options7374handler loki.LogsReceiver7576receiversMut sync.RWMutex77receivers []loki.LogsReceiver78}7980var (81_ component.Component = (*Component)(nil)82_ component.DebugComponent = (*Component)(nil)83)8485// New creates a new loki.source.kubernetes component.86func New(o component.Options, args Arguments) (*Component, error) {87err := os.MkdirAll(o.DataPath, 0750)88if err != nil && !os.IsExist(err) {89return nil, err90}91positionsFile, err := positions.New(o.Logger, positions.Config{92SyncPeriod: 10 * time.Second,93PositionsFile: filepath.Join(o.DataPath, "positions.yml"),94})9596if err != nil {97return nil, err98}99100c := &Component{101log: o.Logger,102opts: o,103handler: make(loki.LogsReceiver),104positions: positionsFile,105}106if err := c.Update(args); err != nil {107return nil, err108}109return c, nil110}111112// Run implements component.Component.113func (c *Component) Run(ctx context.Context) error {114defer c.positions.Stop()115116defer func() {117c.mut.Lock()118defer c.mut.Unlock()119120// Guard for safety, but it's not possible for Run to be called without121// c.tailer being initialized.122if c.tailer != nil {123c.tailer.Stop()124}125}()126127for {128select {129case <-ctx.Done():130return nil131case entry := <-c.handler:132c.receiversMut.RLock()133receivers := c.receivers134c.receiversMut.RUnlock()135136for _, receiver := range receivers {137receiver <- entry138}139}140}141}142143// Update implements component.Component.144func (c *Component) Update(args component.Arguments) error {145newArgs := args.(Arguments)146147// Update the receivers before anything else, just in case something fails.148c.receiversMut.Lock()149c.receivers = newArgs.ForwardTo150c.receiversMut.Unlock()151152c.mut.Lock()153defer c.mut.Unlock()154155managerOpts, err := c.getTailerOptions(newArgs)156if err != nil {157return err158}159160switch {161case c.tailer == nil:162// First call to Update; build the tailer.163c.tailer = kubetail.NewManager(c.log, managerOpts)164165case managerOpts != c.lastOptions:166// Options changed; pass it to the tailer.167//168// This will never fail because it only fails if the context gets canceled.169//170// TODO(rfratto): should we have a generous update timeout to prevent this171// from potentially hanging forever?172_ = c.tailer.UpdateOptions(context.Background(), managerOpts)173c.lastOptions = managerOpts174175default:176// No-op: manager already exists and options didn't change.177}178179// Convert input targets into targets to give to tailer.180targets := make([]*kubetail.Target, 0, len(newArgs.Targets))181182for _, inTarget := range newArgs.Targets {183lset := inTarget.Labels()184processed, err := kubetail.PrepareLabels(lset, c.opts.ID)185if err != nil {186// TODO(rfratto): should this set the health of the component?187level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err)188continue189}190targets = append(targets, kubetail.NewTarget(lset, processed))191}192193// This will never fail because it only fails if the context gets canceled.194//195// TODO(rfratto): should we have a generous update timeout to prevent this196// from potentially hanging forever?197_ = c.tailer.SyncTargets(context.Background(), targets)198199c.args = newArgs200return nil201}202203// getTailerOptions gets tailer options from arguments. If args hasn't changed204// from the last call to getTailerOptions, c.lastOptions is returned.205// c.lastOptions must be updated by the caller.206//207// getTailerOptions must only be called when c.mut is held.208func (c *Component) getTailerOptions(args Arguments) (*kubetail.Options, error) {209if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil {210return c.lastOptions, nil211}212213cfg, err := args.Client.BuildRESTConfig(c.log)214if err != nil {215return c.lastOptions, fmt.Errorf("building Kubernetes config: %w", err)216}217clientSet, err := kubernetes.NewForConfig(cfg)218if err != nil {219return c.lastOptions, fmt.Errorf("building Kubernetes client: %w", err)220}221222return &kubetail.Options{223Client: clientSet,224Handler: loki.NewEntryHandler(c.handler, func() {}),225Positions: c.positions,226}, nil227}228229// DebugInfo returns debug information for loki.source.kubernetes.230func (c *Component) DebugInfo() interface{} {231var info DebugInfo232233for _, target := range c.tailer.Targets() {234var lastError string235if err := target.LastError(); err != nil {236lastError = err.Error()237}238239info.Targets = append(info.Targets, DebugInfoTarget{240Labels: target.Labels().Map(),241DiscoveryLabels: target.DiscoveryLabels().Map(),242LastError: lastError,243UpdateTime: target.LastEntry().Local(),244})245}246247return info248}249250// DebugInfo represents debug information for loki.source.kubernetes.251type DebugInfo struct {252Targets []DebugInfoTarget `river:"target,block,optional"`253}254255// DebugInfoTarget is debug information for an individual target being tailed256// for logs.257type DebugInfoTarget struct {258Labels map[string]string `river:"labels,attr,optional"`259DiscoveryLabels map[string]string `river:"discovery_labels,attr,optional"`260LastError string `river:"last_error,attr,optional"`261UpdateTime time.Time `river:"update_time,attr,optional"`262}263264265