Path: blob/main/component/loki/source/cloudflare/cloudflare.go
4096 views
package cloudflare12import (3"context"4"fmt"5"os"6"path/filepath"7"sync"8"time"910"github.com/go-kit/log/level"11"github.com/grafana/agent/component"12"github.com/grafana/agent/component/common/loki"13"github.com/grafana/agent/component/common/loki/positions"14cft "github.com/grafana/agent/component/loki/source/cloudflare/internal/cloudflaretarget"15"github.com/grafana/agent/pkg/river"16"github.com/grafana/agent/pkg/river/rivertypes"17"github.com/prometheus/common/model"18)1920func init() {21component.Register(component.Registration{22Name: "loki.source.cloudflare",23Args: Arguments{},2425Build: func(opts component.Options, args component.Arguments) (component.Component, error) {26return New(opts, args.(Arguments))27},28})29}3031// Arguments holds values which are used to configure the32// loki.source.cloudflare component.33type Arguments struct {34APIToken rivertypes.Secret `river:"api_token,attr"`35ZoneID string `river:"zone_id,attr"`36Labels map[string]string `river:"labels,attr,optional"`37Workers int `river:"workers,attr,optional"`38PullRange time.Duration `river:"pull_range,attr,optional"`39FieldsType string `river:"fields_type,attr,optional"`40ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`41}4243// Convert returns a cloudflaretarget Config struct from the Arguments.44func (c Arguments) Convert() *cft.Config {45lbls := make(model.LabelSet, len(c.Labels))46for k, v := range c.Labels {47lbls[model.LabelName(k)] = model.LabelValue(v)48}49return &cft.Config{50APIToken: string(c.APIToken),51ZoneID: c.ZoneID,52Labels: lbls,53Workers: c.Workers,54PullRange: model.Duration(c.PullRange),55FieldsType: c.FieldsType,56}57}5859// DefaultArguments sets the configuration defaults.60var DefaultArguments = Arguments{61Workers: 3,62PullRange: 1 * time.Minute,63FieldsType: string(cft.FieldsTypeDefault),64}6566var _ river.Unmarshaler = (*Arguments)(nil)6768// UnmarshalRiver implements the unmarshaller69func (c *Arguments) UnmarshalRiver(f func(v interface{}) error) error {70*c = DefaultArguments71type args Arguments72err := f((*args)(c))73if err != nil {74return err75}76if c.PullRange < 0 {77return fmt.Errorf("pull_range must be a positive duration")78}79_, err = cft.Fields(cft.FieldsType(c.FieldsType))80if err != nil {81return fmt.Errorf("invalid fields_type set; the available values are 'default', 'minimal', 'extended' and 'all'")82}83return nil84}8586// Component implements the loki.source.cloudflare component.87type Component struct {88opts component.Options89metrics *cft.Metrics9091mut sync.RWMutex92fanout []loki.LogsReceiver93target *cft.Target9495posFile positions.Positions96handler loki.LogsReceiver97}9899// New creates a new loki.source.cloudflare component.100func New(o component.Options, args Arguments) (*Component, error) {101err := os.MkdirAll(o.DataPath, 0750)102if err != nil && !os.IsExist(err) {103return nil, err104}105positionsFile, err := positions.New(o.Logger, positions.Config{106SyncPeriod: 10 * time.Second,107PositionsFile: filepath.Join(o.DataPath, "positions.yml"),108IgnoreInvalidYaml: false,109ReadOnly: false,110})111if err != nil {112return nil, err113}114115c := &Component{116opts: o,117metrics: cft.NewMetrics(o.Registerer),118handler: make(loki.LogsReceiver),119fanout: args.ForwardTo,120posFile: positionsFile,121}122123// Call to Update() to start readers and set receivers once at the start.124if err := c.Update(args); err != nil {125return nil, err126}127128return c, nil129}130131// Run implements component.Component.132func (c *Component) Run(ctx context.Context) error {133defer func() {134c.mut.RLock()135level.Info(c.opts.Logger).Log("msg", "loki.source.cloudflare component shutting down, stopping the target")136c.target.Stop()137c.mut.RUnlock()138}()139140for {141select {142case <-ctx.Done():143return nil144case entry := <-c.handler:145c.mut.RLock()146for _, receiver := range c.fanout {147receiver <- entry148}149c.mut.RUnlock()150}151}152}153154// Update implements component.Component.155func (c *Component) Update(args component.Arguments) error {156c.mut.Lock()157defer c.mut.Unlock()158159newArgs := args.(Arguments)160c.fanout = newArgs.ForwardTo161162if c.target != nil {163c.target.Stop()164}165entryHandler := loki.NewEntryHandler(c.handler, func() {})166167t, err := cft.NewTarget(c.metrics, c.opts.Logger, entryHandler, c.posFile, newArgs.Convert())168if err != nil {169level.Error(c.opts.Logger).Log("msg", "failed to create cloudflare target with provided config", "err", err)170return err171}172c.target = t173174return nil175}176177// DebugInfo returns information about the status of targets.178func (c *Component) DebugInfo() interface{} {179c.mut.RLock()180defer c.mut.RUnlock()181182lbls := make(map[string]string, len(c.target.Labels()))183for k, v := range c.target.Labels() {184lbls[string(k)] = string(v)185}186return targetDebugInfo{187Ready: c.target.Ready(),188Details: c.target.Details(),189}190}191192type targetDebugInfo struct {193Ready bool `river:"ready,attr"`194Details map[string]string `river:"target_info,attr"`195}196197198