Path: blob/main/pkg/integrations/v2/autoscrape/autoscrape.go
5367 views
// Package autoscrape implements a scraper for integrations.1package autoscrape23import (4"context"5"sync"67"github.com/go-kit/log"8"github.com/go-kit/log/level"9"github.com/grafana/agent/pkg/metrics"10"github.com/grafana/agent/pkg/metrics/instance"11"github.com/grafana/agent/pkg/server"12"github.com/oklog/run"13config_util "github.com/prometheus/common/config"14"github.com/prometheus/common/model"15prom_config "github.com/prometheus/prometheus/config"16"github.com/prometheus/prometheus/discovery"17"github.com/prometheus/prometheus/model/relabel"18"github.com/prometheus/prometheus/scrape"19"github.com/prometheus/prometheus/storage"20)2122// DefaultGlobal holds default values for Global.23var DefaultGlobal = Global{24Enable: true,25MetricsInstance: "default",26}2728// Global holds default settings for metrics integrations that support29// autoscraping. Integrations may override their settings.30type Global struct {31Enable bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.32MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.33ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.34ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.35}3637// UnmarshalYAML implements yaml.Unmarshaler.38func (g *Global) UnmarshalYAML(f func(interface{}) error) error {39*g = DefaultGlobal40type global Global41return f((*global)(g))42}4344// Config configure autoscrape for an individual integration. Override defaults.45type Config struct {46Enable *bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.47MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.48ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.49ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.5051RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` // Relabel the autoscrape job52MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"` // Relabel individual autoscrape metrics53}5455// InstanceStore is used to find instances to send metrics to. It is a subset56// of the pkg/metrics/instance.Manager interface.57type InstanceStore interface {58// GetInstance retrieves a ManagedInstance by name.59GetInstance(name string) (instance.ManagedInstance, error)60}6162// ScrapeConfig bind a Prometheus scrape config with an instance to send63// scraped metrics to.64type ScrapeConfig struct {65Instance string66Config prom_config.ScrapeConfig67}6869// Scraper is a metrics autoscraper.70type Scraper struct {71ctx context.Context72cancel context.CancelFunc7374log log.Logger75is InstanceStore7677// Prometheus doesn't pass contextual information at scrape time that could78// be used to change the behavior of generating an appender. This means that79// it's not yet possible for us to just run a single SD + scrape manager for80// all of our integrations, and we instead need to launch a pair of each for81// every instance we're writing to.8283iscrapersMut sync.RWMutex84iscrapers map[string]*instanceScraper85dialerFunc server.DialContextFunc86}8788// NewScraper creates a new autoscraper. Scraper will run until Stop is called.89// Instances to send scraped metrics to will be looked up via im. Scraping will90// use the provided dialerFunc to make connections if non-nil.91func NewScraper(l log.Logger, is InstanceStore, dialerFunc server.DialContextFunc) *Scraper {92l = log.With(l, "component", "autoscraper")9394ctx, cancel := context.WithCancel(context.Background())9596s := &Scraper{97ctx: ctx,98cancel: cancel,99100log: l,101is: is,102iscrapers: map[string]*instanceScraper{},103dialerFunc: dialerFunc,104}105return s106}107108// ApplyConfig will apply the given jobs. An error will be returned for any109// jobs that failed to be applied.110func (s *Scraper) ApplyConfig(jobs []*ScrapeConfig) error {111s.iscrapersMut.Lock()112defer s.iscrapersMut.Unlock()113114var firstError error115saveError := func(e error) {116if firstError == nil {117firstError = e118}119}120121// Shard our jobs by target instance.122shardedJobs := map[string][]*prom_config.ScrapeConfig{}123for _, j := range jobs {124_, err := s.is.GetInstance(j.Instance)125if err != nil {126level.Error(s.log).Log("msg", "cannot autoscrape integration", "name", j.Config.JobName, "err", err)127saveError(err)128continue129}130131shardedJobs[j.Instance] = append(shardedJobs[j.Instance], &j.Config)132}133134// Then pass the jobs to instanceScraper, creating them if we need to.135for instance, jobs := range shardedJobs {136is, ok := s.iscrapers[instance]137if !ok {138is = newInstanceScraper(s.ctx, s.log, s.is, instance, config_util.DialContextFunc(s.dialerFunc))139s.iscrapers[instance] = is140}141if err := is.ApplyConfig(jobs); err != nil {142// Not logging here; is.ApplyConfig already logged the errors.143saveError(err)144}145}146147// Garbage collect: If there's a key in s.scrapers that wasn't in148// shardedJobs, stop that unused scraper.149for instance, is := range s.iscrapers {150_, current := shardedJobs[instance]151if !current {152is.Stop()153delete(s.iscrapers, instance)154}155}156157return firstError158}159160// TargetsActive returns the set of active scrape targets for all target161// instances.162func (s *Scraper) TargetsActive() map[string]metrics.TargetSet {163s.iscrapersMut.RLock()164defer s.iscrapersMut.RUnlock()165166allTargets := make(map[string]metrics.TargetSet, len(s.iscrapers))167for instance, is := range s.iscrapers {168allTargets[instance] = is.sm.TargetsActive()169}170return allTargets171}172173// Stop stops the Scraper.174func (s *Scraper) Stop() {175s.iscrapersMut.Lock()176defer s.iscrapersMut.Unlock()177178for instance, is := range s.iscrapers {179is.Stop()180delete(s.iscrapers, instance)181}182183s.cancel()184}185186// instanceScraper is a Scraper which always sends to the same instance.187type instanceScraper struct {188log log.Logger189190sd *discovery.Manager191sm *scrape.Manager192cancel context.CancelFunc193exited chan struct{}194}195196// newInstanceScraper runs a new instanceScraper. Must be stopped by calling197// Stop.198func newInstanceScraper(199ctx context.Context,200l log.Logger,201s InstanceStore,202instanceName string,203dialerFunc config_util.DialContextFunc,204) *instanceScraper {205206ctx, cancel := context.WithCancel(ctx)207l = log.With(l, "target_instance", instanceName)208209sdOpts := []func(*discovery.Manager){210discovery.Name("autoscraper/" + instanceName),211discovery.HTTPClientOptions(212// If dialerFunc is nil, scrape.NewManager will use Go's default dialer.213config_util.WithDialContextFunc(dialerFunc),214),215}216sd := discovery.NewManager(ctx, l, sdOpts...)217sm := scrape.NewManager(&scrape.Options{218HTTPClientOptions: []config_util.HTTPClientOption{219// If dialerFunc is nil, scrape.NewManager will use Go's default dialer.220config_util.WithDialContextFunc(dialerFunc),221},222}, l, &agentAppender{223inst: instanceName,224is: s,225})226227is := &instanceScraper{228log: l,229230sd: sd,231sm: sm,232cancel: cancel,233exited: make(chan struct{}),234}235236go is.run()237return is238}239240type agentAppender struct {241inst string242is InstanceStore243}244245func (aa *agentAppender) Appender(ctx context.Context) storage.Appender {246mi, err := aa.is.GetInstance(aa.inst)247if err != nil {248return &failedAppender{instanceName: aa.inst}249}250return mi.Appender(ctx)251}252253func (is *instanceScraper) run() {254defer close(is.exited)255var rg run.Group256257rg.Add(func() error {258// Service discovery will stop whenever our parent context is canceled or259// if is.cancel is called.260err := is.sd.Run()261if err != nil {262level.Error(is.log).Log("msg", "autoscrape service discovery exited with error", "err", err)263}264return err265}, func(_ error) {266is.cancel()267})268269rg.Add(func() error {270err := is.sm.Run(is.sd.SyncCh())271if err != nil {272level.Error(is.log).Log("msg", "autoscrape scrape manager exited with error", "err", err)273}274return err275}, func(_ error) {276is.sm.Stop()277})278279_ = rg.Run()280}281282func (is *instanceScraper) ApplyConfig(jobs []*prom_config.ScrapeConfig) error {283var firstError error284saveError := func(e error) {285if firstError == nil && e != nil {286firstError = e287}288}289290var (291scrapeConfigs = make([]*prom_config.ScrapeConfig, 0, len(jobs))292sdConfigs = make(map[string]discovery.Configs, len(jobs))293)294for _, job := range jobs {295sdConfigs[job.JobName] = job.ServiceDiscoveryConfigs296scrapeConfigs = append(scrapeConfigs, job)297}298if err := is.sd.ApplyConfig(sdConfigs); err != nil {299level.Error(is.log).Log("msg", "error when applying SD to autoscraper", "err", err)300saveError(err)301}302if err := is.sm.ApplyConfig(&prom_config.Config{ScrapeConfigs: scrapeConfigs}); err != nil {303level.Error(is.log).Log("msg", "error when applying jobs to scraper", "err", err)304saveError(err)305}306307return firstError308}309310func (is *instanceScraper) Stop() {311is.cancel()312<-is.exited313}314315316