Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/scrape/scrape_test.go
4096 views
1
package scrape
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"net/http/httptest"
8
"strings"
9
"testing"
10
"time"
11
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/component/discovery"
14
"github.com/grafana/agent/component/phlare"
15
"github.com/grafana/agent/component/prometheus/scrape"
16
"github.com/grafana/agent/pkg/cluster"
17
"github.com/grafana/agent/pkg/river"
18
"github.com/grafana/agent/pkg/util"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/prometheus/common/model"
21
"github.com/stretchr/testify/require"
22
"go.uber.org/atomic"
23
"go.uber.org/goleak"
24
)
25
26
func TestComponent(t *testing.T) {
27
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
28
reloadInterval = 100 * time.Millisecond
29
arg := NewDefaultArguments()
30
arg.JobName = "test"
31
c, err := New(component.Options{
32
Logger: util.TestFlowLogger(t),
33
Registerer: prometheus.NewRegistry(),
34
OnStateChange: func(e component.Exports) {},
35
Clusterer: &cluster.Clusterer{Node: cluster.NewLocalNode("")},
36
}, arg)
37
require.NoError(t, err)
38
ctx, cancel := context.WithCancel(context.Background())
39
defer cancel()
40
go func() {
41
err := c.Run(ctx)
42
require.NoError(t, err)
43
}()
44
45
// trigger an update
46
require.Empty(t, c.appendable.Children())
47
require.Empty(t, c.DebugInfo().(scrape.ScraperStatus).TargetStatus)
48
49
arg.ForwardTo = []phlare.Appendable{phlare.NoopAppendable}
50
arg.Targets = []discovery.Target{
51
{
52
model.AddressLabel: "foo",
53
},
54
{
55
model.AddressLabel: "bar",
56
},
57
}
58
c.Update(arg)
59
60
require.Eventually(t, func() bool {
61
fmt.Println(c.DebugInfo().(scrape.ScraperStatus).TargetStatus)
62
return len(c.appendable.Children()) == 1 && len(c.DebugInfo().(scrape.ScraperStatus).TargetStatus) == 10
63
}, 5*time.Second, 100*time.Millisecond)
64
}
65
66
func TestUnmarshalConfig(t *testing.T) {
67
for name, tt := range map[string]struct {
68
in string
69
expected func() Arguments
70
expectedErr string
71
}{
72
"default": {
73
in: `
74
targets = [
75
{"__address__" = "localhost:9090", "foo" = "bar"},
76
]
77
forward_to = null
78
`,
79
expected: func() Arguments {
80
r := NewDefaultArguments()
81
r.Targets = []discovery.Target{
82
{
83
"__address__": "localhost:9090",
84
"foo": "bar",
85
},
86
}
87
return r
88
},
89
},
90
"custom": {
91
in: `
92
targets = [
93
{"__address__" = "localhost:9090", "foo" = "bar"},
94
{"__address__" = "localhost:8080", "foo" = "buzz"},
95
]
96
forward_to = null
97
profiling_config {
98
path_prefix = "v1/"
99
100
profile.block {
101
enabled = false
102
}
103
104
profile.custom "something" {
105
enabled = true
106
path = "/debug/fgprof"
107
delta = true
108
}
109
}
110
`,
111
expected: func() Arguments {
112
r := NewDefaultArguments()
113
r.Targets = []discovery.Target{
114
{
115
"__address__": "localhost:9090",
116
"foo": "bar",
117
},
118
{
119
"__address__": "localhost:8080",
120
"foo": "buzz",
121
},
122
}
123
r.ProfilingConfig.Block.Enabled = false
124
r.ProfilingConfig.Custom = append(r.ProfilingConfig.Custom, CustomProfilingTarget{
125
Enabled: true,
126
Path: "/debug/fgprof",
127
Delta: true,
128
Name: "something",
129
})
130
r.ProfilingConfig.PprofPrefix = "v1/"
131
return r
132
},
133
},
134
"invalid cpu timeout": {
135
in: `
136
targets = []
137
forward_to = null
138
scrape_timeout = "1s"
139
scrape_interval = "0.5s"
140
`,
141
expectedErr: "process_cpu scrape_timeout must be at least 2 seconds",
142
},
143
"invalid timeout/interval": {
144
in: `
145
targets = []
146
forward_to = null
147
scrape_timeout = "4s"
148
scrape_interval = "5s"
149
`,
150
expectedErr: "scrape_timeout must be greater than scrape_interval",
151
},
152
"invalid HTTPClientConfig": {
153
in: `
154
targets = []
155
forward_to = null
156
scrape_timeout = "5s"
157
scrape_interval = "1s"
158
bearer_token = "token"
159
bearer_token_file = "/path/to/file.token"
160
`,
161
expectedErr: "at most one of bearer_token & bearer_token_file must be configured",
162
},
163
} {
164
tt := tt
165
name := name
166
t.Run(name, func(t *testing.T) {
167
arg := Arguments{}
168
if tt.expectedErr != "" {
169
err := river.Unmarshal([]byte(tt.in), &arg)
170
require.Error(t, err)
171
require.Equal(t, tt.expectedErr, err.Error())
172
return
173
}
174
require.NoError(t, river.Unmarshal([]byte(tt.in), &arg))
175
require.Equal(t, tt.expected(), arg)
176
})
177
}
178
}
179
180
func TestUpdateWhileScraping(t *testing.T) {
181
args := NewDefaultArguments()
182
// speed up reload interval for this tests
183
old := reloadInterval
184
reloadInterval = 1 * time.Microsecond
185
defer func() {
186
reloadInterval = old
187
}()
188
args.ScrapeInterval = 1 * time.Second
189
190
c, err := New(component.Options{
191
Logger: util.TestFlowLogger(t),
192
Registerer: prometheus.NewRegistry(),
193
OnStateChange: func(e component.Exports) {},
194
Clusterer: &cluster.Clusterer{Node: cluster.NewLocalNode("")},
195
}, args)
196
require.NoError(t, err)
197
scraping := atomic.NewBool(false)
198
ctx, cancel := context.WithCancel(context.Background())
199
200
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
201
scraping.Store(true)
202
select {
203
case <-ctx.Done():
204
return
205
case <-time.After(15 * time.Second):
206
}
207
w.WriteHeader(http.StatusOK)
208
}))
209
defer server.Close()
210
211
address := strings.TrimPrefix(server.URL, "http://")
212
213
defer cancel()
214
215
go c.Run(ctx)
216
217
args.Targets = []discovery.Target{
218
{
219
model.AddressLabel: address,
220
"foo": "bar",
221
},
222
{
223
model.AddressLabel: address,
224
"foo": "buz",
225
},
226
}
227
228
c.Update(args)
229
c.scraper.reload()
230
// Wait for the targets to be scraping.
231
require.Eventually(t, func() bool {
232
return scraping.Load()
233
}, 10*time.Second, 1*time.Second)
234
235
// Send updates to the targets.
236
done := make(chan struct{})
237
go func() {
238
for i := 0; i < 100; i++ {
239
args.Targets = []discovery.Target{
240
{
241
model.AddressLabel: address,
242
"foo": fmt.Sprintf("%d", i),
243
},
244
}
245
require.NoError(t, c.Update(args))
246
c.scraper.reload()
247
}
248
close(done)
249
}()
250
select {
251
case <-done:
252
case <-time.After(10 * time.Second):
253
t.Fatal("timed out waiting for updates to finish")
254
}
255
}
256
257