package scrape
import (
"context"
"fmt"
"net/url"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/grafana/agent/component"
component_config "github.com/grafana/agent/component/common/config"
"github.com/grafana/agent/component/discovery"
"github.com/grafana/agent/component/phlare"
"github.com/grafana/agent/component/prometheus/scrape"
)
const (
pprofMemory string = "memory"
pprofBlock string = "block"
pprofGoroutine string = "goroutine"
pprofMutex string = "mutex"
pprofProcessCPU string = "process_cpu"
pprofFgprof string = "fgprof"
)
func init() {
component.Register(component.Registration{
Name: "phlare.scrape",
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
type Arguments struct {
Targets []discovery.Target `river:"targets,attr"`
ForwardTo []phlare.Appendable `river:"forward_to,attr"`
JobName string `river:"job_name,attr,optional"`
Params url.Values `river:"params,attr,optional"`
ScrapeInterval time.Duration `river:"scrape_interval,attr,optional"`
ScrapeTimeout time.Duration `river:"scrape_timeout,attr,optional"`
Scheme string `river:"scheme,attr,optional"`
HTTPClientConfig component_config.HTTPClientConfig `river:",squash"`
ProfilingConfig ProfilingConfig `river:"profiling_config,block,optional"`
Clustering scrape.Clustering `river:"clustering,block,optional"`
}
type ProfilingConfig struct {
Memory ProfilingTarget `river:"profile.memory,block,optional"`
Block ProfilingTarget `river:"profile.block,block,optional"`
Goroutine ProfilingTarget `river:"profile.goroutine,block,optional"`
Mutex ProfilingTarget `river:"profile.mutex,block,optional"`
ProcessCPU ProfilingTarget `river:"profile.process_cpu,block,optional"`
FGProf ProfilingTarget `river:"profile.fgprof,block,optional"`
Custom []CustomProfilingTarget `river:"profile.custom,block,optional"`
PprofPrefix string `river:"path_prefix,attr,optional"`
}
func (cfg ProfilingConfig) AllTargets() map[string]ProfilingTarget {
targets := map[string]ProfilingTarget{
pprofMemory: cfg.Memory,
pprofBlock: cfg.Block,
pprofGoroutine: cfg.Goroutine,
pprofMutex: cfg.Mutex,
pprofProcessCPU: cfg.ProcessCPU,
pprofFgprof: cfg.FGProf,
}
for _, custom := range cfg.Custom {
targets[custom.Name] = ProfilingTarget{
Enabled: custom.Enabled,
Path: custom.Path,
Delta: custom.Delta,
}
}
return targets
}
var DefaultProfilingConfig = ProfilingConfig{
Memory: ProfilingTarget{
Enabled: true,
Path: "/debug/pprof/allocs",
},
Block: ProfilingTarget{
Enabled: true,
Path: "/debug/pprof/block",
},
Goroutine: ProfilingTarget{
Enabled: true,
Path: "/debug/pprof/goroutine",
},
Mutex: ProfilingTarget{
Enabled: true,
Path: "/debug/pprof/mutex",
},
ProcessCPU: ProfilingTarget{
Enabled: true,
Path: "/debug/pprof/profile",
Delta: true,
},
FGProf: ProfilingTarget{
Enabled: false,
Path: "/debug/fgprof",
Delta: true,
},
}
func (cfg *ProfilingConfig) UnmarshalRiver(f func(interface{}) error) error {
*cfg = DefaultProfilingConfig
type args ProfilingConfig
if err := f((*args)(cfg)); err != nil {
return err
}
return nil
}
type ProfilingTarget struct {
Enabled bool `river:"enabled,attr,optional"`
Path string `river:"path,attr,optional"`
Delta bool `river:"delta,attr,optional"`
}
type CustomProfilingTarget struct {
Enabled bool `river:"enabled,attr"`
Path string `river:"path,attr"`
Delta bool `river:"delta,attr,optional"`
Name string `river:",label"`
}
var DefaultArguments = NewDefaultArguments()
func NewDefaultArguments() Arguments {
return Arguments{
Scheme: "http",
HTTPClientConfig: component_config.DefaultHTTPClientConfig,
ScrapeInterval: 15 * time.Second,
ScrapeTimeout: 15*time.Second + (3 * time.Second),
ProfilingConfig: DefaultProfilingConfig,
}
}
func (arg *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*arg = NewDefaultArguments()
type args Arguments
if err := f((*args)(arg)); err != nil {
return err
}
if arg.ScrapeTimeout <= 0 {
return fmt.Errorf("scrape_timeout must be greater than 0")
}
if arg.ScrapeTimeout <= arg.ScrapeInterval {
return fmt.Errorf("scrape_timeout must be greater than scrape_interval")
}
if cfg, ok := arg.ProfilingConfig.ProcessCPU, true; ok {
if cfg.Enabled && arg.ScrapeTimeout < time.Second*2 {
return fmt.Errorf("%v scrape_timeout must be at least 2 seconds", pprofProcessCPU)
}
}
return arg.HTTPClientConfig.Validate()
}
type Component struct {
opts component.Options
reloadTargets chan struct{}
mut sync.RWMutex
args Arguments
scraper *Manager
appendable *phlare.Fanout
}
var _ component.Component = (*Component)(nil)
func New(o component.Options, args Arguments) (*Component, error) {
flowAppendable := phlare.NewFanout(args.ForwardTo, o.ID, o.Registerer)
scraper := NewManager(flowAppendable, o.Logger)
c := &Component{
opts: o,
reloadTargets: make(chan struct{}, 1),
scraper: scraper,
appendable: flowAppendable,
}
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
defer c.scraper.Stop()
targetSetsChan := make(chan map[string][]*targetgroup.Group)
go func() {
c.scraper.Run(targetSetsChan)
level.Info(c.opts.Logger).Log("msg", "scrape manager stopped")
}()
for {
select {
case <-ctx.Done():
return nil
case <-c.reloadTargets:
c.mut.RLock()
var (
tgs = c.args.Targets
jobName = c.opts.ID
clustering = c.args.Clustering.Enabled
)
if c.args.JobName != "" {
jobName = c.args.JobName
}
c.mut.RUnlock()
ct := discovery.NewDistributedTargets(clustering, c.opts.Clusterer.Node, tgs)
promTargets := c.componentTargetsToProm(jobName, ct.Get())
select {
case targetSetsChan <- promTargets:
level.Debug(c.opts.Logger).Log("msg", "passed new targets to scrape manager")
case <-ctx.Done():
return nil
}
}
}
}
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.mut.Lock()
defer c.mut.Unlock()
c.args = newArgs
c.appendable.UpdateChildren(newArgs.ForwardTo)
err := c.scraper.ApplyConfig(newArgs)
if err != nil {
return fmt.Errorf("error applying scrape configs: %w", err)
}
level.Debug(c.opts.Logger).Log("msg", "scrape config was updated")
select {
case c.reloadTargets <- struct{}{}:
default:
}
return nil
}
func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg))
}
return map[string][]*targetgroup.Group{jobName: {promGroup}}
}
func convertLabelSet(tg discovery.Target) model.LabelSet {
lset := make(model.LabelSet, len(tg))
for k, v := range tg {
lset[model.LabelName(k)] = model.LabelValue(v)
}
return lset
}
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args.Clustering.Enabled
}
func (c *Component) DebugInfo() interface{} {
var res []scrape.TargetStatus
for job, stt := range c.scraper.TargetsActive() {
for _, st := range stt {
var lastError string
if st.LastError() != nil {
lastError = st.LastError().Error()
}
if st != nil {
res = append(res, scrape.TargetStatus{
JobName: job,
URL: st.URL().String(),
Health: string(st.Health()),
Labels: st.discoveredLabels.Map(),
LastError: lastError,
LastScrape: st.LastScrape(),
LastScrapeDuration: st.LastScrapeDuration(),
})
}
}
}
return scrape.ScraperStatus{TargetStatus: res}
}