Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/write/write_test.go
4096 views
1
package write
2
3
import (
4
"context"
5
"errors"
6
"net/http"
7
"net/http/httptest"
8
"sync"
9
"testing"
10
"time"
11
12
"github.com/bufbuild/connect-go"
13
"github.com/grafana/agent/component"
14
"github.com/grafana/agent/component/phlare"
15
"github.com/grafana/agent/pkg/river"
16
"github.com/grafana/agent/pkg/util"
17
pushv1 "github.com/grafana/phlare/api/gen/proto/go/push/v1"
18
pushv1connect "github.com/grafana/phlare/api/gen/proto/go/push/v1/pushv1connect"
19
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
20
"github.com/prometheus/client_golang/prometheus"
21
"github.com/prometheus/prometheus/model/labels"
22
"github.com/stretchr/testify/require"
23
"go.uber.org/atomic"
24
)
25
26
type PushFunc func(context.Context, *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error)
27
28
func (p PushFunc) Push(ctx context.Context, r *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
29
return p(ctx, r)
30
}
31
32
func Test_Write_FanOut(t *testing.T) {
33
var (
34
export Exports
35
argument = DefaultArguments()
36
pushTotal = atomic.NewInt32(0)
37
serverCount = int32(10)
38
servers []*httptest.Server = make([]*httptest.Server, serverCount)
39
endpoints []*EndpointOptions = make([]*EndpointOptions, 0, serverCount)
40
)
41
argument.ExternalLabels = map[string]string{"foo": "buzz"}
42
handlerFn := func(err error) http.Handler {
43
_, handler := pushv1connect.NewPusherServiceHandler(PushFunc(
44
func(_ context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
45
pushTotal.Inc()
46
require.Equal(t, "test", req.Header()["X-Test-Header"][0])
47
require.Contains(t, req.Header()["User-Agent"][0], "GrafanaAgent/")
48
require.Equal(t, []*typesv1.LabelPair{
49
{Name: "__name__", Value: "test"},
50
{Name: "foo", Value: "buzz"},
51
{Name: "job", Value: "foo"},
52
}, req.Msg.Series[0].Labels)
53
require.Equal(t, []byte("pprofraw"), req.Msg.Series[0].Samples[0].RawProfile)
54
return &connect.Response[pushv1.PushResponse]{}, err
55
},
56
))
57
return handler
58
}
59
60
for i := int32(0); i < serverCount; i++ {
61
if i == 0 {
62
servers[i] = httptest.NewServer(handlerFn(errors.New("test")))
63
} else {
64
servers[i] = httptest.NewServer(handlerFn(nil))
65
}
66
endpoints = append(endpoints, &EndpointOptions{
67
URL: servers[i].URL,
68
MinBackoff: 100 * time.Millisecond,
69
MaxBackoff: 200 * time.Millisecond,
70
MaxBackoffRetries: 1,
71
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
72
Headers: map[string]string{
73
"X-Test-Header": "test",
74
},
75
})
76
}
77
defer func() {
78
for _, s := range servers {
79
s.Close()
80
}
81
}()
82
createReceiver := func(t *testing.T, arg Arguments) phlare.Appendable {
83
t.Helper()
84
var wg sync.WaitGroup
85
wg.Add(1)
86
c, err := NewComponent(component.Options{
87
ID: "1",
88
Logger: util.TestFlowLogger(t),
89
Registerer: prometheus.NewRegistry(),
90
OnStateChange: func(e component.Exports) {
91
defer wg.Done()
92
export = e.(Exports)
93
},
94
}, arg)
95
require.NoError(t, err)
96
ctx, cancel := context.WithCancel(context.Background())
97
defer cancel()
98
go c.Run(ctx)
99
wg.Wait() // wait for the state change to happen
100
require.NotNil(t, export.Receiver)
101
return export.Receiver
102
}
103
104
t.Run("with_failure", func(t *testing.T) {
105
argument.Endpoints = endpoints
106
r := createReceiver(t, argument)
107
pushTotal.Store(0)
108
err := r.Appender().Append(context.Background(), labels.FromMap(map[string]string{
109
"__name__": "test",
110
"__type__": "type",
111
"job": "foo",
112
"foo": "bar",
113
}), []*phlare.RawSample{
114
{RawProfile: []byte("pprofraw")},
115
})
116
require.EqualErrorf(t, err, "unknown: test", "expected error to be test")
117
require.Equal(t, serverCount, pushTotal.Load())
118
})
119
120
t.Run("all_success", func(t *testing.T) {
121
argument.Endpoints = endpoints[1:]
122
r := createReceiver(t, argument)
123
pushTotal.Store(0)
124
err := r.Appender().Append(context.Background(), labels.FromMap(map[string]string{
125
"__name__": "test",
126
"__type__": "type",
127
"job": "foo",
128
"foo": "bar",
129
}), []*phlare.RawSample{
130
{RawProfile: []byte("pprofraw")},
131
})
132
require.NoError(t, err)
133
require.Equal(t, serverCount-1, pushTotal.Load())
134
})
135
136
t.Run("with_backoff", func(t *testing.T) {
137
argument.Endpoints = endpoints[:1]
138
argument.Endpoints[0].MaxBackoffRetries = 3
139
r := createReceiver(t, argument)
140
pushTotal.Store(0)
141
err := r.Appender().Append(context.Background(), labels.FromMap(map[string]string{
142
"__name__": "test",
143
"__type__": "type",
144
"job": "foo",
145
"foo": "bar",
146
}), []*phlare.RawSample{
147
{RawProfile: []byte("pprofraw")},
148
})
149
require.Error(t, err)
150
require.Equal(t, int32(3), pushTotal.Load())
151
})
152
}
153
154
func Test_Write_Update(t *testing.T) {
155
var (
156
export Exports
157
argument = DefaultArguments()
158
pushTotal = atomic.NewInt32(0)
159
)
160
var wg sync.WaitGroup
161
wg.Add(1)
162
c, err := NewComponent(component.Options{
163
ID: "1",
164
Logger: util.TestFlowLogger(t),
165
Registerer: prometheus.NewRegistry(),
166
OnStateChange: func(e component.Exports) {
167
defer wg.Done()
168
export = e.(Exports)
169
},
170
}, argument)
171
require.NoError(t, err)
172
ctx, cancel := context.WithCancel(context.Background())
173
defer cancel()
174
go c.Run(ctx)
175
wg.Wait() // wait for the state change to happen
176
require.NotNil(t, export.Receiver)
177
// First one is a noop
178
err = export.Receiver.Appender().Append(context.Background(), labels.FromMap(map[string]string{
179
"__name__": "test",
180
}), []*phlare.RawSample{
181
{RawProfile: []byte("pprofraw")},
182
})
183
require.NoError(t, err)
184
185
_, handler := pushv1connect.NewPusherServiceHandler(PushFunc(
186
func(_ context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
187
pushTotal.Inc()
188
return &connect.Response[pushv1.PushResponse]{}, err
189
},
190
))
191
server := httptest.NewServer(handler)
192
defer server.Close()
193
argument.Endpoints = []*EndpointOptions{
194
{
195
URL: server.URL,
196
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
197
},
198
}
199
wg.Add(1)
200
require.NoError(t, c.Update(argument))
201
wg.Wait()
202
err = export.Receiver.Appender().Append(context.Background(), labels.FromMap(map[string]string{
203
"__name__": "test",
204
}), []*phlare.RawSample{
205
{RawProfile: []byte("pprofraw")},
206
})
207
require.NoError(t, err)
208
require.Equal(t, int32(1), pushTotal.Load())
209
}
210
211
func Test_Unmarshal_Config(t *testing.T) {
212
var arg Arguments
213
river.Unmarshal([]byte(`
214
endpoint {
215
url = "http://localhost:4100"
216
remote_timeout = "10s"
217
}
218
endpoint {
219
url = "http://localhost:4200"
220
remote_timeout = "5s"
221
min_backoff_period = "1s"
222
max_backoff_period = "10s"
223
max_backoff_retries = 10
224
}
225
external_labels = {
226
"foo" = "bar",
227
}`), &arg)
228
require.Equal(t, "http://localhost:4100", arg.Endpoints[0].URL)
229
require.Equal(t, "http://localhost:4200", arg.Endpoints[1].URL)
230
require.Equal(t, time.Second*10, arg.Endpoints[0].RemoteTimeout)
231
require.Equal(t, time.Second*5, arg.Endpoints[1].RemoteTimeout)
232
require.Equal(t, "bar", arg.ExternalLabels["foo"])
233
require.Equal(t, time.Second, arg.Endpoints[1].MinBackoff)
234
require.Equal(t, time.Second*10, arg.Endpoints[1].MaxBackoff)
235
require.Equal(t, 10, arg.Endpoints[1].MaxBackoffRetries)
236
}
237
238
func TestBadRiverConfig(t *testing.T) {
239
exampleRiverConfig := `
240
endpoint {
241
url = "http://localhost:4100"
242
remote_timeout = "10s"
243
bearer_token = "token"
244
bearer_token_file = "/path/to/file.token"
245
}
246
external_labels = {
247
"foo" = "bar",
248
}
249
`
250
251
// Make sure the squashed HTTPClientConfig Validate function is being utilized correctly
252
var args Arguments
253
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
254
require.ErrorContains(t, err, "at most one of bearer_token & bearer_token_file must be configured")
255
}
256
257