Path: blob/main/component/loki/source/podlogs/podlogs.go
5340 views
package podlogs12import (3"context"4"fmt"5"os"6"path/filepath"7"reflect"8"sync"9"time"1011"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/grafana/agent/component"14"github.com/grafana/agent/component/common/config"15commonk8s "github.com/grafana/agent/component/common/kubernetes"16"github.com/grafana/agent/component/common/loki"17"github.com/grafana/agent/component/common/loki/positions"18"github.com/grafana/agent/component/loki/source/kubernetes"19"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"20"github.com/grafana/agent/pkg/river"21"github.com/oklog/run"22kubeclient "k8s.io/client-go/kubernetes"23"k8s.io/client-go/rest"24)2526func init() {27component.Register(component.Registration{28Name: "loki.source.podlogs",29Args: Arguments{},3031Build: func(opts component.Options, args component.Arguments) (component.Component, error) {32return New(opts, args.(Arguments))33},34})35}3637// Arguments holds values which are used to configure the loki.source.podlogs38// component.39type Arguments struct {40ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`4142// Client settings to connect to Kubernetes.43Client commonk8s.ClientArguments `river:"client,block,optional"`4445Selector config.LabelSelector `river:"selector,block,optional"`46NamespaceSelector config.LabelSelector `river:"namespace_selector,block,optional"`47}4849var _ river.Unmarshaler = (*Arguments)(nil)5051// DefaultArguments holds default settings for loki.source.kubernetes.52var DefaultArguments = Arguments{53Client: commonk8s.ClientArguments{54HTTPClientConfig: config.DefaultHTTPClientConfig,55},56}5758// UnmarshalRiver implements river.Unmarshaler and applies defaults.59func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {60*args = DefaultArguments6162type arguments Arguments63return f((*arguments)(args))64}6566// Component implements the loki.source.podlogs component.67type Component struct {68log log.Logger69opts component.Options7071tailer *kubetail.Manager72reconciler *reconciler73controller *controller7475positions positions.Positions76handler loki.LogsReceiver7778mut sync.RWMutex79args Arguments80lastOptions *kubetail.Options81restConfig *rest.Config8283receiversMut sync.RWMutex84receivers []loki.LogsReceiver85}8687var (88_ component.Component = (*Component)(nil)89_ component.DebugComponent = (*Component)(nil)90)9192// New creates a new loki.source.podlogs component.93func New(o component.Options, args Arguments) (*Component, error) {94err := os.MkdirAll(o.DataPath, 0750)95if err != nil && !os.IsExist(err) {96return nil, err97}98positionsFile, err := positions.New(o.Logger, positions.Config{99SyncPeriod: 10 * time.Second,100PositionsFile: filepath.Join(o.DataPath, "positions.yml"),101})102if err != nil {103return nil, err104}105106var (107tailer = kubetail.NewManager(o.Logger, nil)108reconciler = newReconciler(o.Logger, tailer)109controller = newController(o.Logger, reconciler)110)111112c := &Component{113log: o.Logger,114opts: o,115116tailer: tailer,117reconciler: reconciler,118controller: controller,119120positions: positionsFile,121handler: make(loki.LogsReceiver),122}123if err := c.Update(args); err != nil {124return nil, err125}126return c, nil127}128129// Run implements component.Component.130func (c *Component) Run(ctx context.Context) error {131ctx, cancel := context.WithCancel(ctx)132defer cancel()133134defer c.positions.Stop()135136defer func() {137c.mut.RLock()138defer c.mut.RUnlock()139140// Guard for safety, but it's not possible for Run to be called without141// c.tailer being initialized.142if c.tailer != nil {143c.tailer.Stop()144}145}()146147var g run.Group148149g.Add(func() error {150c.runHandler(ctx)151return nil152}, func(_ error) {153cancel()154})155156g.Add(func() error {157err := c.controller.Run(ctx)158if err != nil {159level.Error(c.log).Log("msg", "controller exited with error", "err", err)160}161return err162}, func(_ error) {163cancel()164})165166return g.Run()167}168169func (c *Component) runHandler(ctx context.Context) {170for {171select {172case <-ctx.Done():173return174case entry := <-c.handler:175c.receiversMut.RLock()176receivers := c.receivers177c.receiversMut.RUnlock()178179for _, receiver := range receivers {180receiver <- entry181}182}183}184}185186// Update implements component.Component.187func (c *Component) Update(args component.Arguments) error {188newArgs := args.(Arguments)189190// Update the receivers before anything else, just in case something fails.191c.receiversMut.Lock()192c.receivers = newArgs.ForwardTo193c.receiversMut.Unlock()194195c.mut.Lock()196defer c.mut.Unlock()197198if err := c.updateTailer(newArgs); err != nil {199return err200}201if err := c.updateReconciler(newArgs); err != nil {202return err203}204if err := c.updateController(newArgs); err != nil {205return err206}207208c.args = newArgs209return nil210}211212// updateTailer updates the state of the tailer. mut must be held when calling.213func (c *Component) updateTailer(args Arguments) error {214if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil {215return nil216}217218cfg, err := args.Client.BuildRESTConfig(c.log)219if err != nil {220return fmt.Errorf("building Kubernetes config: %w", err)221}222clientSet, err := kubeclient.NewForConfig(cfg)223if err != nil {224return fmt.Errorf("building Kubernetes client: %w", err)225}226227managerOpts := &kubetail.Options{228Client: clientSet,229Handler: loki.NewEntryHandler(c.handler, func() {}),230Positions: c.positions,231}232c.lastOptions = managerOpts233234// Options changed; pass it to the tailer. This will never fail because it235// only fails if the context gets canceled.236//237// TODO(rfratto): should we have a generous update timeout to prevent this238// from potentially hanging forever?239_ = c.tailer.UpdateOptions(context.Background(), managerOpts)240return nil241}242243// updateReconciler updates the state of the reconciler. This must only be244// called after updateTailer. mut must be held when calling.245func (c *Component) updateReconciler(args Arguments) error {246var (247selectorChanged = !reflect.DeepEqual(c.args.Selector, args.Selector)248namespaceSelectorChanged = !reflect.DeepEqual(c.args.NamespaceSelector, args.NamespaceSelector)249)250if !selectorChanged && !namespaceSelectorChanged {251return nil252}253254sel, err := args.Selector.BuildSelector()255if err != nil {256return err257}258nsSel, err := args.NamespaceSelector.BuildSelector()259if err != nil {260return err261}262263c.reconciler.UpdateSelectors(sel, nsSel)264265// Request a reconcile so the new selectors get applied.266c.controller.RequestReconcile()267return nil268}269270// updateController updates the state of the controller. This must only be271// called after updateReconciler. mut must be held when calling.272func (c *Component) updateController(args Arguments) error {273// We only need to update the controller if we already have a rest config274// generated and our client args haven't changed since the last call.275if reflect.DeepEqual(c.args.Client, args.Client) && c.restConfig != nil {276return nil277}278279cfg, err := args.Client.BuildRESTConfig(c.log)280if err != nil {281return fmt.Errorf("building Kubernetes config: %w", err)282}283c.restConfig = cfg284285return c.controller.UpdateConfig(cfg)286}287288// DebugInfo returns debug information for loki.source.podlogs.289func (c *Component) DebugInfo() interface{} {290var info DebugInfo291292info.DiscoveredPodLogs = c.reconciler.DebugInfo()293294for _, target := range c.tailer.Targets() {295var lastError string296if err := target.LastError(); err != nil {297lastError = err.Error()298}299300info.Targets = append(info.Targets, kubernetes.DebugInfoTarget{301Labels: target.Labels().Map(),302DiscoveryLabels: target.DiscoveryLabels().Map(),303LastError: lastError,304UpdateTime: target.LastEntry().Local(),305})306}307308return info309}310311// DebugInfo stores debug information for loki.source.podlogs.312type DebugInfo struct {313DiscoveredPodLogs []DiscoveredPodLogs `river:"pod_logs,block"`314Targets []kubernetes.DebugInfoTarget `river:"target,block,optional"`315}316317318