Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/scrape/scrape_loop.go
4096 views
1
package scrape
2
3
import (
4
"bytes"
5
"context"
6
"fmt"
7
"io"
8
"net/http"
9
"reflect"
10
"sync"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
commonconfig "github.com/prometheus/common/config"
16
"github.com/prometheus/prometheus/discovery/targetgroup"
17
"github.com/prometheus/prometheus/util/pool"
18
"golang.org/x/net/context/ctxhttp"
19
20
"github.com/grafana/agent/component/phlare"
21
"github.com/grafana/agent/pkg/build"
22
)
23
24
var (
25
payloadBuffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
26
userAgentHeader = fmt.Sprintf("GrafanaAgent/%s", build.Version)
27
)
28
29
type scrapePool struct {
30
config Arguments
31
32
logger log.Logger
33
scrapeClient *http.Client
34
appendable phlare.Appendable
35
36
mtx sync.RWMutex
37
activeTargets map[uint64]*scrapeLoop
38
droppedTargets []*Target
39
}
40
41
func newScrapePool(cfg Arguments, appendable phlare.Appendable, logger log.Logger) (*scrapePool, error) {
42
scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName)
43
if err != nil {
44
return nil, err
45
}
46
47
return &scrapePool{
48
config: cfg,
49
logger: logger,
50
scrapeClient: scrapeClient,
51
appendable: appendable,
52
activeTargets: map[uint64]*scrapeLoop{},
53
}, nil
54
}
55
56
func (tg *scrapePool) sync(groups []*targetgroup.Group) {
57
tg.mtx.Lock()
58
defer tg.mtx.Unlock()
59
60
level.Info(tg.logger).Log("msg", "syncing target groups", "job", tg.config.JobName)
61
var actives []*Target
62
tg.droppedTargets = []*Target{}
63
for _, group := range groups {
64
targets, dropped, err := targetsFromGroup(group, tg.config)
65
if err != nil {
66
level.Error(tg.logger).Log("msg", "creating targets failed", "err", err)
67
continue
68
}
69
for _, t := range targets {
70
if t.Labels().Len() > 0 {
71
actives = append(actives, t)
72
}
73
}
74
tg.droppedTargets = append(tg.droppedTargets, dropped...)
75
}
76
77
for _, t := range actives {
78
if _, ok := tg.activeTargets[t.hash()]; !ok {
79
loop := newScrapeLoop(t, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger)
80
tg.activeTargets[t.hash()] = loop
81
loop.start()
82
} else {
83
tg.activeTargets[t.hash()].SetDiscoveredLabels(t.DiscoveredLabels())
84
}
85
}
86
87
// Removes inactive targets.
88
Outer:
89
for h, t := range tg.activeTargets {
90
for _, at := range actives {
91
if h == at.hash() {
92
continue Outer
93
}
94
}
95
t.stop(false)
96
delete(tg.activeTargets, h)
97
}
98
}
99
100
func (tg *scrapePool) reload(cfg Arguments) error {
101
tg.mtx.Lock()
102
defer tg.mtx.Unlock()
103
104
if tg.config.ScrapeInterval == cfg.ScrapeInterval &&
105
tg.config.ScrapeTimeout == cfg.ScrapeTimeout &&
106
reflect.DeepEqual(tg.config.HTTPClientConfig, cfg.HTTPClientConfig) {
107
108
tg.config = cfg
109
return nil
110
}
111
tg.config = cfg
112
113
scrapeClient, err := commonconfig.NewClientFromConfig(*cfg.HTTPClientConfig.Convert(), cfg.JobName)
114
if err != nil {
115
return err
116
}
117
tg.scrapeClient = scrapeClient
118
for hash, t := range tg.activeTargets {
119
// restart the loop with the new configuration
120
t.stop(false)
121
loop := newScrapeLoop(t.Target, tg.scrapeClient, tg.appendable, tg.config.ScrapeInterval, tg.config.ScrapeTimeout, tg.logger)
122
tg.activeTargets[hash] = loop
123
loop.start()
124
}
125
return nil
126
}
127
128
func (tg *scrapePool) stop() {
129
tg.mtx.Lock()
130
defer tg.mtx.Unlock()
131
132
wg := sync.WaitGroup{}
133
for _, t := range tg.activeTargets {
134
wg.Add(1)
135
go func(t *scrapeLoop) {
136
defer wg.Done()
137
t.stop(true)
138
}(t)
139
}
140
wg.Wait()
141
}
142
143
func (tg *scrapePool) ActiveTargets() []*Target {
144
tg.mtx.RLock()
145
defer tg.mtx.RUnlock()
146
result := make([]*Target, 0, len(tg.activeTargets))
147
for _, target := range tg.activeTargets {
148
result = append(result, target.Target)
149
}
150
return result
151
}
152
153
func (tg *scrapePool) DroppedTargets() []*Target {
154
tg.mtx.RLock()
155
defer tg.mtx.RUnlock()
156
result := make([]*Target, 0, len(tg.droppedTargets))
157
result = append(result, tg.droppedTargets...)
158
return result
159
}
160
161
type scrapeLoop struct {
162
*Target
163
164
lastScrapeSize int
165
166
scrapeClient *http.Client
167
appendable phlare.Appendable
168
169
req *http.Request
170
logger log.Logger
171
interval, timeout time.Duration
172
graceShut chan struct{}
173
once sync.Once
174
wg sync.WaitGroup
175
}
176
177
func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable phlare.Appendable, interval, timeout time.Duration, logger log.Logger) *scrapeLoop {
178
return &scrapeLoop{
179
Target: t,
180
logger: logger,
181
scrapeClient: scrapeClient,
182
appendable: appendable,
183
interval: interval,
184
timeout: timeout,
185
}
186
}
187
188
func (t *scrapeLoop) start() {
189
t.graceShut = make(chan struct{})
190
t.once = sync.Once{}
191
t.wg.Add(1)
192
193
go func() {
194
defer t.wg.Done()
195
196
select {
197
case <-time.After(t.offset(t.interval)):
198
case <-t.graceShut:
199
return
200
}
201
ticker := time.NewTicker(t.interval)
202
defer ticker.Stop()
203
204
for {
205
select {
206
case <-t.graceShut:
207
return
208
case <-ticker.C:
209
}
210
t.scrape()
211
}
212
}()
213
}
214
215
func (t *scrapeLoop) scrape() {
216
var (
217
start = time.Now()
218
b = payloadBuffers.Get(t.lastScrapeSize).([]byte)
219
buf = bytes.NewBuffer(b)
220
profileType string
221
scrapeCtx, cancel = context.WithTimeout(context.Background(), t.timeout)
222
)
223
defer cancel()
224
225
for _, l := range t.labels {
226
if l.Name == ProfileName {
227
profileType = l.Value
228
break
229
}
230
}
231
if err := t.fetchProfile(scrapeCtx, profileType, buf); err != nil {
232
level.Debug(t.logger).Log("msg", "fetch profile failed", "target", t.Labels().String(), "err", err)
233
t.updateTargetStatus(start, err)
234
return
235
}
236
237
b = buf.Bytes()
238
if len(b) > 0 {
239
t.lastScrapeSize = len(b)
240
}
241
if err := t.appendable.Appender().Append(context.Background(), t.labels, []*phlare.RawSample{{RawProfile: b}}); err != nil {
242
level.Error(t.logger).Log("msg", "push failed", "labels", t.Labels().String(), "err", err)
243
t.updateTargetStatus(start, err)
244
return
245
}
246
t.updateTargetStatus(start, nil)
247
}
248
249
func (t *scrapeLoop) updateTargetStatus(start time.Time, err error) {
250
t.mtx.Lock()
251
defer t.mtx.Unlock()
252
if err != nil {
253
t.health = HealthBad
254
t.lastError = err
255
} else {
256
t.health = HealthGood
257
t.lastError = nil
258
}
259
t.lastScrape = start
260
t.lastScrapeDuration = time.Since(start)
261
}
262
263
func (t *scrapeLoop) fetchProfile(ctx context.Context, profileType string, buf io.Writer) error {
264
if t.req == nil {
265
req, err := http.NewRequest("GET", t.URL().String(), nil)
266
if err != nil {
267
return err
268
}
269
req.Header.Set("User-Agent", userAgentHeader)
270
271
t.req = req
272
}
273
274
level.Debug(t.logger).Log("msg", "scraping profile", "labels", t.Labels().String(), "url", t.req.URL.String())
275
resp, err := ctxhttp.Do(ctx, t.scrapeClient, t.req)
276
if err != nil {
277
return err
278
}
279
defer resp.Body.Close()
280
281
b, err := io.ReadAll(io.TeeReader(resp.Body, buf))
282
if err != nil {
283
return fmt.Errorf("failed to read body: %w", err)
284
}
285
286
if resp.StatusCode/100 != 2 {
287
if len(b) > 0 {
288
return fmt.Errorf("server returned HTTP status (%d) %v", resp.StatusCode, string(bytes.TrimSpace(b)))
289
}
290
return fmt.Errorf("server returned HTTP status (%d) %v", resp.StatusCode, resp.Status)
291
}
292
293
if len(b) == 0 {
294
return fmt.Errorf("empty %s profile from %s", profileType, t.req.URL.String())
295
}
296
return nil
297
}
298
299
func (t *scrapeLoop) stop(wait bool) {
300
t.once.Do(func() {
301
close(t.graceShut)
302
})
303
if wait {
304
t.wg.Wait()
305
}
306
}
307
308