Path: blob/main/component/loki/source/gcplog/gcplog.go
4096 views
package gcplog12import (3"context"4"fmt"5"strings"6"sync"78"github.com/go-kit/log/level"9"github.com/prometheus/client_golang/prometheus"10"github.com/prometheus/prometheus/model/relabel"1112"github.com/grafana/agent/component"13"github.com/grafana/agent/component/common/loki"14flow_relabel "github.com/grafana/agent/component/common/relabel"15gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"16"github.com/grafana/agent/pkg/util"17)1819func init() {20component.Register(component.Registration{21Name: "loki.source.gcplog",22Args: Arguments{},2324Build: func(opts component.Options, args component.Arguments) (component.Component, error) {25return New(opts, args.(Arguments))26},27})28}2930// Arguments holds values which are used to configure the loki.source.gcplog31// component.32type Arguments struct {33// TODO(@tpaschalis) Having these types defined in an internal package34// means that an external caller cannot build this component's Arguments35// by hand for now.36PullTarget *gt.PullConfig `river:"pull,block,optional"`37PushTarget *gt.PushConfig `river:"push,block,optional"`38ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`39RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`40}4142// UnmarshalRiver implements the unmarshaller43func (a *Arguments) UnmarshalRiver(f func(v interface{}) error) error {44*a = Arguments{}45type arguments Arguments46err := f((*arguments)(a))47if err != nil {48return err49}5051if (a.PullTarget != nil) == (a.PushTarget != nil) {52return fmt.Errorf("exactly one of 'push' or 'pull' must be provided")53}54return nil55}5657// Component implements the loki.source.gcplog component.58type Component struct {59opts component.Options60metrics *gt.Metrics61serverMetrics *util.UncheckedCollector6263mut sync.RWMutex64fanout []loki.LogsReceiver65target gt.Target6667handler loki.LogsReceiver68}6970// New creates a new loki.source.gcplog component.71func New(o component.Options, args Arguments) (*Component, error) {72c := &Component{73opts: o,74metrics: gt.NewMetrics(o.Registerer),75handler: make(loki.LogsReceiver),76fanout: args.ForwardTo,77serverMetrics: util.NewUncheckedCollector(nil),78}7980o.Registerer.MustRegister(c.serverMetrics)8182// Call to Update() to start readers and set receivers once at the start.83if err := c.Update(args); err != nil {84return nil, err85}8687return c, nil88}8990// Run implements component.Component.91func (c *Component) Run(ctx context.Context) error {92defer func() {93level.Info(c.opts.Logger).Log("msg", "loki.source.gcplog component shutting down, stopping the targets")94c.mut.RLock()95err := c.target.Stop()96if err != nil {97level.Error(c.opts.Logger).Log("msg", "error while stopping gcplog target", "err", err)98}99c.mut.RUnlock()100}()101102for {103select {104case <-ctx.Done():105return nil106case entry := <-c.handler:107c.mut.RLock()108for _, receiver := range c.fanout {109receiver <- entry110}111c.mut.RUnlock()112}113}114}115116// Update implements component.Component.117func (c *Component) Update(args component.Arguments) error {118c.mut.Lock()119defer c.mut.Unlock()120121newArgs := args.(Arguments)122c.fanout = newArgs.ForwardTo123124var rcs []*relabel.Config125if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {126rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)127}128129if c.target != nil {130err := c.target.Stop()131if err != nil {132level.Error(c.opts.Logger).Log("msg", "error while stopping gcplog target", "err", err)133}134}135entryHandler := loki.NewEntryHandler(c.handler, func() {})136jobName := strings.Replace(c.opts.ID, ".", "_", -1)137138if newArgs.PullTarget != nil {139// TODO(@tpaschalis) Are there any options from "google.golang.org/api/option"140// we should expose as configuration and pass here?141t, err := gt.NewPullTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PullTarget, rcs)142if err != nil {143level.Error(c.opts.Logger).Log("msg", "failed to create gcplog target with provided config", "err", err)144return err145}146c.target = t147}148if newArgs.PushTarget != nil {149// [gt.NewPushTarget] registers new metrics every time it is called. To150// avoid issues with re-registering metrics with the same name, we create a151// new registry for the target every time we create one, and pass it to an152// unchecked collector to bypass uniqueness checking.153registry := prometheus.NewRegistry()154c.serverMetrics.SetCollector(registry)155156t, err := gt.NewPushTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PushTarget, rcs, registry)157if err != nil {158level.Error(c.opts.Logger).Log("msg", "failed to create gcplog target with provided config", "err", err)159return err160}161c.target = t162}163164return nil165}166167// DebugInfo returns information about the status of targets.168func (c *Component) DebugInfo() interface{} {169c.mut.RLock()170defer c.mut.RUnlock()171return targetDebugInfo{Details: c.target.Details()}172}173174type targetDebugInfo struct {175Details map[string]string `river:"target_info,attr"`176}177178179