package scrape
import (
"errors"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/grafana/agent/component/phlare"
)
var reloadInterval = 5 * time.Second
type Manager struct {
logger log.Logger
graceShut chan struct{}
appendable phlare.Appendable
mtxScrape sync.Mutex
config Arguments
targetsGroups map[string]*scrapePool
targetSets map[string][]*targetgroup.Group
triggerReload chan struct{}
}
func NewManager(appendable phlare.Appendable, logger log.Logger) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
logger: logger,
appendable: appendable,
graceShut: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
targetsGroups: make(map[string]*scrapePool),
targetSets: make(map[string][]*targetgroup.Group),
}
}
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)
select {
case m.triggerReload <- struct{}{}:
default:
}
case <-m.graceShut:
return
}
}
}
func (m *Manager) reloader() {
ticker := time.NewTicker(reloadInterval)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
func (m *Manager) reload() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
if _, ok := m.targetsGroups[setName]; !ok {
sp, err := newScrapePool(m.config, m.appendable, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.targetsGroups[setName] = sp
}
wg.Add(1)
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.sync(groups)
wg.Done()
}(m.targetsGroups[setName], groups)
}
wg.Wait()
}
func (m *Manager) ApplyConfig(cfg Arguments) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var failed bool
m.config = cfg
for name, sp := range m.targetsGroups {
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
if failed {
return errors.New("failed to apply the new configuration")
}
return nil
}
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
}
func (m *Manager) TargetsAll() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
targets := make(map[string][]*Target, len(m.targetsGroups))
for tset, sp := range m.targetsGroups {
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
}
return targets
}
func (m *Manager) TargetsActive() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var (
wg sync.WaitGroup
mtx sync.Mutex
)
targets := make(map[string][]*Target, len(m.targetsGroups))
wg.Add(len(m.targetsGroups))
for tset, sp := range m.targetsGroups {
go func(tset string, sp *scrapePool) {
mtx.Lock()
targets[tset] = sp.ActiveTargets()
mtx.Unlock()
wg.Done()
}(tset, sp)
}
wg.Wait()
return targets
}
func (m *Manager) TargetsDropped() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
targets := make(map[string][]*Target, len(m.targetsGroups))
for tset, sp := range m.targetsGroups {
targets[tset] = sp.DroppedTargets()
}
return targets
}
func (m *Manager) Stop() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
wg := sync.WaitGroup{}
for _, sp := range m.targetsGroups {
wg.Add(1)
go func(sp *scrapePool) {
defer wg.Done()
sp.stop()
}(sp)
}
wg.Wait()
close(m.graceShut)
}