Path: blob/main/component/prometheus/scrape/scrape.go
4094 views
package scrape12import (3"context"4"fmt"5"net/url"6"sync"7"time"89"github.com/alecthomas/units"10"github.com/go-kit/log/level"11"github.com/grafana/agent/component"12component_config "github.com/grafana/agent/component/common/config"13"github.com/grafana/agent/component/discovery"14"github.com/grafana/agent/component/prometheus"15"github.com/grafana/agent/pkg/build"16client_prometheus "github.com/prometheus/client_golang/prometheus"17config_util "github.com/prometheus/common/config"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/config"20"github.com/prometheus/prometheus/discovery/targetgroup"21"github.com/prometheus/prometheus/scrape"22"github.com/prometheus/prometheus/storage"23)2425func init() {26scrape.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)2728component.Register(component.Registration{29Name: "prometheus.scrape",30Args: Arguments{},3132Build: func(opts component.Options, args component.Arguments) (component.Component, error) {33return New(opts, args.(Arguments))34},35})36}3738// Arguments holds values which are used to configure the prometheus.scrape39// component.40type Arguments struct {41Targets []discovery.Target `river:"targets,attr"`42ForwardTo []storage.Appendable `river:"forward_to,attr"`4344// The job name to override the job label with.45JobName string `river:"job_name,attr,optional"`46// Indicator whether the scraped metrics should remain unmodified.47HonorLabels bool `river:"honor_labels,attr,optional"`48// Indicator whether the scraped timestamps should be respected.49HonorTimestamps bool `river:"honor_timestamps,attr,optional"`50// A set of query parameters with which the target is scraped.51Params url.Values `river:"params,attr,optional"`52// How frequently to scrape the targets of this scrape config.53ScrapeInterval time.Duration `river:"scrape_interval,attr,optional"`54// The timeout for scraping targets of this config.55ScrapeTimeout time.Duration `river:"scrape_timeout,attr,optional"`56// The HTTP resource path on which to fetch metrics from targets.57MetricsPath string `river:"metrics_path,attr,optional"`58// The URL scheme with which to fetch metrics from targets.59Scheme string `river:"scheme,attr,optional"`60// An uncompressed response body larger than this many bytes will cause the61// scrape to fail. 0 means no limit.62BodySizeLimit units.Base2Bytes `river:"body_size_limit,attr,optional"`63// More than this many samples post metric-relabeling will cause the scrape64// to fail.65SampleLimit uint `river:"sample_limit,attr,optional"`66// More than this many targets after the target relabeling will cause the67// scrapes to fail.68TargetLimit uint `river:"target_limit,attr,optional"`69// More than this many labels post metric-relabeling will cause the scrape70// to fail.71LabelLimit uint `river:"label_limit,attr,optional"`72// More than this label name length post metric-relabeling will cause the73// scrape to fail.74LabelNameLengthLimit uint `river:"label_name_length_limit,attr,optional"`75// More than this label value length post metric-relabeling will cause the76// scrape to fail.77LabelValueLengthLimit uint `river:"label_value_length_limit,attr,optional"`7879HTTPClientConfig component_config.HTTPClientConfig `river:",squash"`8081// Scrape Options82ExtraMetrics bool `river:"extra_metrics,attr,optional"`8384Clustering Clustering `river:"clustering,block,optional"`85}8687// Clustering holds values that configure clustering-specific behavior.88type Clustering struct {89// TODO(@tpaschalis) Move this block to a shared place for all components using clustering.90Enabled bool `river:"enabled,attr"`91}9293// DefaultArguments defines the default settings for a scrape job.94var DefaultArguments = Arguments{95MetricsPath: "/metrics",96Scheme: "http",97HonorLabels: false,98HonorTimestamps: true,99HTTPClientConfig: component_config.DefaultHTTPClientConfig,100ScrapeInterval: 1 * time.Minute, // From config.DefaultGlobalConfig101ScrapeTimeout: 10 * time.Second, // From config.DefaultGlobalConfig102}103104// UnmarshalRiver implements river.Unmarshaler.105func (arg *Arguments) UnmarshalRiver(f func(interface{}) error) error {106*arg = DefaultArguments107108type args Arguments109err := f((*args)(arg))110if err != nil {111return err112}113114// We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise115return arg.HTTPClientConfig.Validate()116}117118// Component implements the prometheus.scrape component.119type Component struct {120opts component.Options121122reloadTargets chan struct{}123124mut sync.RWMutex125args Arguments126scraper *scrape.Manager127appendable *prometheus.Fanout128targetsGauge client_prometheus.Gauge129}130131var (132_ component.Component = (*Component)(nil)133)134135// New creates a new prometheus.scrape component.136func New(o component.Options, args Arguments) (*Component, error) {137flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer)138scrapeOptions := &scrape.Options{139ExtraMetrics: args.ExtraMetrics,140HTTPClientOptions: []config_util.HTTPClientOption{141config_util.WithDialContextFunc(o.DialFunc),142},143}144scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable)145146targetsGauge := client_prometheus.NewGauge(client_prometheus.GaugeOpts{147Name: "agent_prometheus_scrape_targets_gauge",148Help: "Number of targets this component is configured to scrape"})149err := o.Registerer.Register(targetsGauge)150if err != nil {151return nil, err152}153154c := &Component{155opts: o,156reloadTargets: make(chan struct{}, 1),157scraper: scraper,158appendable: flowAppendable,159targetsGauge: targetsGauge,160}161162// Call to Update() to set the receivers and targets once at the start.163if err := c.Update(args); err != nil {164return nil, err165}166167return c, nil168}169170// Run implements component.Component.171func (c *Component) Run(ctx context.Context) error {172defer c.scraper.Stop()173174targetSetsChan := make(chan map[string][]*targetgroup.Group)175176go func() {177err := c.scraper.Run(targetSetsChan)178level.Info(c.opts.Logger).Log("msg", "scrape manager stopped")179if err != nil {180level.Error(c.opts.Logger).Log("msg", "scrape manager failed", "err", err)181}182}()183184for {185select {186case <-ctx.Done():187return nil188case <-c.reloadTargets:189c.mut.RLock()190var (191tgs = c.args.Targets192jobName = c.opts.ID193cl = c.args.Clustering.Enabled194)195if c.args.JobName != "" {196jobName = c.args.JobName197}198c.mut.RUnlock()199200// NOTE(@tpaschalis) First approach, manually building the201// 'clustered' targets implementation every time.202ct := discovery.NewDistributedTargets(cl, c.opts.Clusterer.Node, tgs)203promTargets := c.componentTargetsToProm(jobName, ct.Get())204205select {206case targetSetsChan <- promTargets:207level.Debug(c.opts.Logger).Log("msg", "passed new targets to scrape manager")208case <-ctx.Done():209}210}211}212}213214// Update implements component.Component.215func (c *Component) Update(args component.Arguments) error {216newArgs := args.(Arguments)217218c.mut.Lock()219defer c.mut.Unlock()220c.args = newArgs221222c.appendable.UpdateChildren(newArgs.ForwardTo)223224sc := getPromScrapeConfigs(c.opts.ID, newArgs)225err := c.scraper.ApplyConfig(&config.Config{226ScrapeConfigs: []*config.ScrapeConfig{sc},227})228if err != nil {229return fmt.Errorf("error applying scrape configs: %w", err)230}231level.Debug(c.opts.Logger).Log("msg", "scrape config was updated")232233select {234case c.reloadTargets <- struct{}{}:235default:236}237238c.targetsGauge.Set(float64(len(c.args.Targets)))239return nil240}241242// Helper function to bridge the in-house configuration with the Prometheus243// scrape_config.244// As explained in the Config struct, the following fields are purposefully245// missing out, as they're being implemented by another components.246// - RelabelConfigs247// - MetricsRelabelConfigs248// - ServiceDiscoveryConfigs249func getPromScrapeConfigs(jobName string, c Arguments) *config.ScrapeConfig {250dec := config.DefaultScrapeConfig251if c.JobName != "" {252dec.JobName = c.JobName253} else {254dec.JobName = jobName255}256dec.HonorLabels = c.HonorLabels257dec.HonorTimestamps = c.HonorTimestamps258dec.Params = c.Params259dec.ScrapeInterval = model.Duration(c.ScrapeInterval)260dec.ScrapeTimeout = model.Duration(c.ScrapeTimeout)261dec.MetricsPath = c.MetricsPath262dec.Scheme = c.Scheme263dec.BodySizeLimit = c.BodySizeLimit264dec.SampleLimit = c.SampleLimit265dec.TargetLimit = c.TargetLimit266dec.LabelLimit = c.LabelLimit267dec.LabelNameLengthLimit = c.LabelNameLengthLimit268dec.LabelValueLengthLimit = c.LabelValueLengthLimit269270// HTTP scrape client settings271dec.HTTPClientConfig = *c.HTTPClientConfig.Convert()272return &dec273}274275// ScraperStatus reports the status of the scraper's jobs.276type ScraperStatus struct {277TargetStatus []TargetStatus `river:"target,block,optional"`278}279280// TargetStatus reports on the status of the latest scrape for a target.281type TargetStatus struct {282JobName string `river:"job,attr"`283URL string `river:"url,attr"`284Health string `river:"health,attr"`285Labels map[string]string `river:"labels,attr"`286LastError string `river:"last_error,attr,optional"`287LastScrape time.Time `river:"last_scrape,attr"`288LastScrapeDuration time.Duration `river:"last_scrape_duration,attr,optional"`289}290291// BuildTargetStatuses transforms the targets from a scrape manager into our internal status type for debug info.292func BuildTargetStatuses(targets map[string][]*scrape.Target) []TargetStatus {293var res []TargetStatus294295for job, stt := range targets {296for _, st := range stt {297var lastError string298if st.LastError() != nil {299lastError = st.LastError().Error()300}301if st != nil {302res = append(res, TargetStatus{303JobName: job,304URL: st.URL().String(),305Health: string(st.Health()),306Labels: st.Labels().Map(),307LastError: lastError,308LastScrape: st.LastScrape(),309LastScrapeDuration: st.LastScrapeDuration(),310})311}312}313}314return res315}316317// DebugInfo implements component.DebugComponent318func (c *Component) DebugInfo() interface{} {319return ScraperStatus{320TargetStatus: BuildTargetStatuses(c.scraper.TargetsActive()),321}322}323324// ClusterUpdatesRegistration implements component.ClusterComponent.325func (c *Component) ClusterUpdatesRegistration() bool {326c.mut.RLock()327defer c.mut.RUnlock()328return c.args.Clustering.Enabled329}330331func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {332promGroup := &targetgroup.Group{Source: jobName}333for _, tg := range tgs {334promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg))335}336337return map[string][]*targetgroup.Group{jobName: {promGroup}}338}339340func convertLabelSet(tg discovery.Target) model.LabelSet {341lset := make(model.LabelSet, len(tg))342for k, v := range tg {343lset[model.LabelName(k)] = model.LabelValue(v)344}345return lset346}347348349