Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/scrape/manager.go
4096 views
1
package scrape
2
3
import (
4
"errors"
5
"sync"
6
"time"
7
8
"github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/prometheus/prometheus/discovery/targetgroup"
11
12
"github.com/grafana/agent/component/phlare"
13
)
14
15
var reloadInterval = 5 * time.Second
16
17
type Manager struct {
18
logger log.Logger
19
20
graceShut chan struct{}
21
appendable phlare.Appendable
22
23
mtxScrape sync.Mutex // Guards the fields below.
24
config Arguments
25
targetsGroups map[string]*scrapePool
26
targetSets map[string][]*targetgroup.Group
27
28
triggerReload chan struct{}
29
}
30
31
func NewManager(appendable phlare.Appendable, logger log.Logger) *Manager {
32
if logger == nil {
33
logger = log.NewNopLogger()
34
}
35
return &Manager{
36
logger: logger,
37
appendable: appendable,
38
graceShut: make(chan struct{}),
39
triggerReload: make(chan struct{}, 1),
40
targetsGroups: make(map[string]*scrapePool),
41
targetSets: make(map[string][]*targetgroup.Group),
42
}
43
}
44
45
// Run receives and saves target set updates and triggers the scraping loops reloading.
46
// Reloading happens in the background so that it doesn't block receiving targets updates.
47
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
48
go m.reloader()
49
for {
50
select {
51
case ts := <-tsets:
52
m.updateTsets(ts)
53
54
select {
55
case m.triggerReload <- struct{}{}:
56
default:
57
}
58
59
case <-m.graceShut:
60
return
61
}
62
}
63
}
64
65
func (m *Manager) reloader() {
66
ticker := time.NewTicker(reloadInterval)
67
68
defer ticker.Stop()
69
70
for {
71
select {
72
case <-m.graceShut:
73
return
74
case <-ticker.C:
75
select {
76
case <-m.triggerReload:
77
m.reload()
78
case <-m.graceShut:
79
return
80
}
81
}
82
}
83
}
84
85
func (m *Manager) reload() {
86
m.mtxScrape.Lock()
87
defer m.mtxScrape.Unlock()
88
89
var wg sync.WaitGroup
90
for setName, groups := range m.targetSets {
91
if _, ok := m.targetsGroups[setName]; !ok {
92
sp, err := newScrapePool(m.config, m.appendable, log.With(m.logger, "scrape_pool", setName))
93
if err != nil {
94
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
95
continue
96
}
97
m.targetsGroups[setName] = sp
98
}
99
100
wg.Add(1)
101
// Run the sync in parallel as these take a while and at high load can't catch up.
102
go func(sp *scrapePool, groups []*targetgroup.Group) {
103
sp.sync(groups)
104
wg.Done()
105
}(m.targetsGroups[setName], groups)
106
}
107
wg.Wait()
108
}
109
110
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
111
func (m *Manager) ApplyConfig(cfg Arguments) error {
112
m.mtxScrape.Lock()
113
defer m.mtxScrape.Unlock()
114
// Cleanup and reload pool if the configuration has changed.
115
var failed bool
116
m.config = cfg
117
118
for name, sp := range m.targetsGroups {
119
err := sp.reload(cfg)
120
if err != nil {
121
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
122
failed = true
123
}
124
}
125
126
if failed {
127
return errors.New("failed to apply the new configuration")
128
}
129
return nil
130
}
131
132
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
133
m.mtxScrape.Lock()
134
m.targetSets = tsets
135
m.mtxScrape.Unlock()
136
}
137
138
// TargetsAll returns active and dropped targets grouped by job_name.
139
func (m *Manager) TargetsAll() map[string][]*Target {
140
m.mtxScrape.Lock()
141
defer m.mtxScrape.Unlock()
142
143
targets := make(map[string][]*Target, len(m.targetsGroups))
144
for tset, sp := range m.targetsGroups {
145
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
146
}
147
return targets
148
}
149
150
// TargetsActive returns the active targets currently being scraped.
151
func (m *Manager) TargetsActive() map[string][]*Target {
152
m.mtxScrape.Lock()
153
defer m.mtxScrape.Unlock()
154
155
var (
156
wg sync.WaitGroup
157
mtx sync.Mutex
158
)
159
160
targets := make(map[string][]*Target, len(m.targetsGroups))
161
wg.Add(len(m.targetsGroups))
162
for tset, sp := range m.targetsGroups {
163
// Running in parallel limits the blocking time of scrapePool to scrape
164
// interval when there's an update from SD.
165
go func(tset string, sp *scrapePool) {
166
mtx.Lock()
167
targets[tset] = sp.ActiveTargets()
168
mtx.Unlock()
169
wg.Done()
170
}(tset, sp)
171
}
172
wg.Wait()
173
return targets
174
}
175
176
// TargetsDropped returns the dropped targets during relabelling.
177
func (m *Manager) TargetsDropped() map[string][]*Target {
178
m.mtxScrape.Lock()
179
defer m.mtxScrape.Unlock()
180
181
targets := make(map[string][]*Target, len(m.targetsGroups))
182
for tset, sp := range m.targetsGroups {
183
targets[tset] = sp.DroppedTargets()
184
}
185
return targets
186
}
187
188
func (m *Manager) Stop() {
189
m.mtxScrape.Lock()
190
defer m.mtxScrape.Unlock()
191
192
wg := sync.WaitGroup{}
193
for _, sp := range m.targetsGroups {
194
wg.Add(1)
195
go func(sp *scrapePool) {
196
defer wg.Done()
197
sp.stop()
198
}(sp)
199
}
200
wg.Wait()
201
close(m.graceShut)
202
}
203
204