Path: blob/main/component/phlare/scrape/scrape_loop.go
4096 views
package scrape12import (3"bytes"4"context"5"fmt"6"io"7"net/http"8"reflect"9"sync"10"time"1112"github.com/go-kit/log"13"github.com/go-kit/log/level"14commonconfig "github.com/prometheus/common/config"15"github.com/prometheus/prometheus/discovery/targetgroup"16"github.com/prometheus/prometheus/util/pool"17"golang.org/x/net/context/ctxhttp"1819"github.com/grafana/agent/component/phlare"20"github.com/grafana/agent/pkg/build"21)2223var (24payloadBuffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })25userAgentHeader = fmt.Sprintf("GrafanaAgent/%s", build.Version)26)2728type scrapePool struct {29config Arguments3031logger log.Logger32scrapeClient *http.Client33appendable phlare.Appendable3435mtx sync.RWMutex36activeTargets map[uint64]*scrapeLoop37droppedTargets []*Target38}3940func newScrapePool(cfg Arguments, appendable phlare.Appendable, logger log.Logger) (*scrapePool, error) {41scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName)42if err != nil {43return nil, err44}4546return &scrapePool{47config: cfg,48logger: logger,49scrapeClient: scrapeClient,50appendable: appendable,51activeTargets: map[uint64]*scrapeLoop{},52}, nil53}5455func (tg *scrapePool) sync(groups []*targetgroup.Group) {56tg.mtx.Lock()57defer tg.mtx.Unlock()5859level.Info(tg.logger).Log("msg", "syncing target groups", "job", tg.config.JobName)60var actives []*Target61tg.droppedTargets = []*Target{}62for _, group := range groups {63targets, dropped, err := targetsFromGroup(group, tg.config)64if err != nil {65level.Error(tg.logger).Log("msg", "creating targets failed", "err", err)66continue67}68for _, t := range targets {69if t.Labels().Len() > 0 {70actives = append(actives, t)71}72}73tg.droppedTargets = append(tg.droppedTargets, dropped...)74}7576for _, t := range actives {77if _, ok := tg.activeTargets[t.hash()]; !ok {78loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger)79tg.activeTargets[t.hash()] = loop80loop.start()81} else {82tg.activeTargets[t.hash()].SetDiscoveredLabels(t.DiscoveredLabels())83}84}8586// Removes inactive targets.87Outer:88for h, t := range tg.activeTargets {89for _, at := range actives {90if h == at.hash() {91continue Outer92}93}94t.stop(false)95delete(tg.activeTargets, h)96}97}9899func (tg *scrapePool) reload(cfg Arguments) error {100tg.mtx.Lock()101defer tg.mtx.Unlock()102103if tg.config.ScrapeInterval == cfg.ScrapeInterval &&104tg.config.ScrapeTimeout == cfg.ScrapeTimeout &&105reflect.DeepEqual(tg.config.HTTPClientConfig, cfg.HTTPClientConfig) {106107tg.config = cfg108return nil109}110tg.config = cfg111112scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName)113if err != nil {114return err115}116tg.scrapeClient = scrapeClient117for hash, t := range tg.activeTargets {118// restart the loop with the new configuration119t.stop(false)120loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger)121tg.activeTargets[hash] = loop122loop.start()123}124return nil125}126127func (tg *scrapePool) stop() {128tg.mtx.Lock()129defer tg.mtx.Unlock()130131wg := sync.WaitGroup{}132for _, t := range tg.activeTargets {133wg.Add(1)134go func(t *scrapeLoop) {135defer wg.Done()136t.stop(true)137}(t)138}139wg.Wait()140}141142func (tg *scrapePool) ActiveTargets() []*Target {143tg.mtx.RLock()144defer tg.mtx.RUnlock()145result := make([]*Target, 0, len(tg.activeTargets))146for _, target := range tg.activeTargets {147result = append(result, target.Target)148}149return result150}151152func (tg *scrapePool) DroppedTargets() []*Target {153tg.mtx.RLock()154defer tg.mtx.RUnlock()155result := make([]*Target, 0, len(tg.droppedTargets))156result = append(result, tg.droppedTargets...)157return result158}159160type scrapeLoop struct {161*Target162163lastScrapeSize int164165scrapeClient *http.Client166appendable phlare.Appendable167168req *http.Request169logger log.Logger170interval, timeout time.Duration171graceShut chan struct{}172once sync.Once173wg sync.WaitGroup174}175176func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable phlare.Appendable, interval, timeout time.Duration, logger log.Logger) *scrapeLoop {177return &scrapeLoop{178Target: t,179logger: logger,180scrapeClient: scrapeClient,181appendable: appendable,182interval: interval,183timeout: timeout,184}185}186187func (t *scrapeLoop) start() {188t.graceShut = make(chan struct{})189t.once = sync.Once{}190t.wg.Add(1)191192go func() {193defer t.wg.Done()194195select {196case <-time.After(t.offset(t.interval)):197case <-t.graceShut:198return199}200ticker := time.NewTicker(t.interval)201defer ticker.Stop()202203for {204select {205case <-t.graceShut:206return207case <-ticker.C:208}209t.scrape()210}211}()212}213214func (t *scrapeLoop) scrape() {215var (216start = time.Now()217b = payloadBuffers.Get(t.lastScrapeSize).([]byte)218buf = bytes.NewBuffer(b)219profileType string220scrapeCtx, cancel = context.WithTimeout(context.Background(), t.timeout)221)222defer cancel()223224for _, l := range t.labels {225if l.Name == ProfileName {226profileType = l.Value227break228}229}230if err := t.fetchProfile(scrapeCtx, profileType, buf); err != nil {231level.Debug(t.logger).Log("msg", "fetch profile failed", "target", t.Labels().String(), "err", err)232t.updateTargetStatus(start, err)233return234}235236b = buf.Bytes()237if len(b) > 0 {238t.lastScrapeSize = len(b)239}240if err := t.appendable.Appender().Append(context.Background(), t.labels, []*phlare.RawSample{{RawProfile: b}}); err != nil {241level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err)242t.updateTargetStatus(start, err)243return244}245t.updateTargetStatus(start, nil)246}247248func (t *scrapeLoop) updateTargetStatus(start time.Time, err error) {249t.mtx.Lock()250defer t.mtx.Unlock()251if err != nil {252t.health = HealthBad253t.lastError = err254} else {255t.health = HealthGood256t.lastError = nil257}258t.lastScrape = start259t.lastScrapeDuration = time.Since(start)260}261262func (t *scrapeLoop) fetchProfile(ctx context.Context, profileType string, buf io.Writer) error {263if t.req == nil {264req, err := http.NewRequest("GET", t.URL().String(), nil)265if err != nil {266return err267}268req.Header.Set("User-Agent", userAgentHeader)269270t.req = req271}272273level.Debug(t.logger).Log("msg", "scraping profile", "labels", t.Labels().String(), "url", t.req.URL.String())274resp, err := ctxhttp.Do(ctx, t.scrapeClient, t.req)275if err != nil {276return err277}278defer resp.Body.Close()279280b, err := io.ReadAll(io.TeeReader(resp.Body, buf))281if err != nil {282return fmt.Errorf("failed to read body: %w", err)283}284285if resp.StatusCode/100 != 2 {286if len(b) > 0 {287return fmt.Errorf("server returned HTTP status (%d) %v", resp.StatusCode, string(bytes.TrimSpace(b)))288}289return fmt.Errorf("server returned HTTP status (%d) %v", resp.StatusCode, resp.Status)290}291292if len(b) == 0 {293return fmt.Errorf("empty %s profile from %s", profileType, t.req.URL.String())294}295return nil296}297298func (t *scrapeLoop) stop(wait bool) {299t.once.Do(func() {300close(t.graceShut)301})302if wait {303t.wg.Wait()304}305}306307308