Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go
4096 views
1
package gcplogtarget
2
3
import (
4
"context"
5
"errors"
6
"io"
7
"testing"
8
"time"
9
10
"github.com/grafana/agent/component/common/loki/client/fake"
11
12
"cloud.google.com/go/pubsub"
13
"github.com/go-kit/log"
14
"github.com/grafana/dskit/backoff"
15
"github.com/prometheus/client_golang/prometheus"
16
"github.com/stretchr/testify/require"
17
"gotest.tools/assert"
18
)
19
20
func TestPullTarget_RunStop(t *testing.T) {
21
t.Run("it sends messages to the promclient and stopps when Stop() is called", func(t *testing.T) {
22
tc := testPullTarget(t)
23
24
runErr := make(chan error)
25
go func() {
26
runErr <- tc.target.run()
27
}()
28
29
tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}
30
require.Eventually(t, func() bool {
31
return len(tc.promClient.Received()) > 0
32
}, time.Second, 50*time.Millisecond)
33
34
require.NoError(t, tc.target.Stop())
35
require.EqualError(t, <-runErr, "context canceled")
36
})
37
38
t.Run("it retries when there is an error", func(t *testing.T) {
39
tc := testPullTarget(t)
40
41
runErr := make(chan error)
42
go func() {
43
runErr <- tc.target.run()
44
}()
45
46
tc.sub.errors <- errors.New("something bad")
47
tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}
48
require.Eventually(t, func() bool {
49
return len(tc.promClient.Received()) > 0
50
}, time.Second, 50*time.Millisecond)
51
52
require.NoError(t, tc.target.Stop())
53
54
require.Eventually(t, func() bool {
55
select {
56
case e := <-runErr:
57
return e.Error() == "context canceled"
58
default:
59
return false
60
}
61
}, time.Second, 50*time.Millisecond)
62
})
63
64
t.Run("a successful message resets retries", func(t *testing.T) {
65
tc := testPullTarget(t)
66
67
runErr := make(chan error)
68
go func() {
69
runErr <- tc.target.run()
70
}()
71
72
tc.sub.errors <- errors.New("something bad")
73
tc.sub.errors <- errors.New("something bad")
74
tc.sub.errors <- errors.New("something bad")
75
tc.sub.errors <- errors.New("something bad")
76
tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}
77
tc.sub.errors <- errors.New("something bad")
78
tc.sub.errors <- errors.New("something bad")
79
tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}
80
81
require.Eventually(t, func() bool {
82
return len(tc.promClient.Received()) > 1
83
}, time.Second, 50*time.Millisecond)
84
85
require.NoError(t, tc.target.Stop())
86
})
87
}
88
89
// func TestPullTarget_Ready(t *testing.T) {
90
// tc := testPullTarget(t)
91
// assert.Equal(t, true, tc.target.Ready())
92
// }
93
94
func TestPullTarget_Labels(t *testing.T) {
95
tc := testPullTarget(t)
96
97
assert.Equal(t, `{job="test-gcplogtarget"}`, tc.target.Labels().String())
98
}
99
100
type testContext struct {
101
target *PullTarget
102
promClient *fake.Client
103
sub *fakeSubscription
104
}
105
106
func testPullTarget(t *testing.T) *testContext {
107
t.Helper()
108
109
ctx, cancel := context.WithCancel(context.Background())
110
sub := newFakeSubscription()
111
promClient := fake.NewClient(func() {})
112
target := &PullTarget{
113
metrics: NewMetrics(prometheus.NewRegistry()),
114
logger: log.NewNopLogger(),
115
handler: promClient,
116
relabelConfig: nil,
117
ctx: ctx,
118
cancel: cancel,
119
config: testConfig,
120
jobName: t.Name() + "job-test-gcplogtarget",
121
ps: io.NopCloser(nil),
122
sub: sub,
123
msgs: make(chan *pubsub.Message),
124
backoff: backoff.New(ctx, testBackoff),
125
}
126
127
return &testContext{
128
target: target,
129
promClient: promClient,
130
sub: sub,
131
}
132
}
133
134
const (
135
project = "test-project"
136
subscription = "test-subscription"
137
gcpLogEntry = `
138
{
139
"insertId": "ajv4d1f1ch8dr",
140
"logName": "projects/grafanalabs-dev/logs/cloudaudit.googleapis.com%2Fdata_access",
141
"protoPayload": {
142
"@type": "type.googleapis.com/google.cloud.audit.AuditLog",
143
"authenticationInfo": {
144
"principalEmail": "[email protected]",
145
"serviceAccountDelegationInfo": [
146
{
147
"firstPartyPrincipal": {
148
"principalEmail": "[email protected]"
149
}
150
}
151
]
152
},
153
"authorizationInfo": [
154
{
155
"granted": true,
156
"permission": "storage.objects.list",
157
"resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev",
158
"resourceAttributes": {
159
}
160
},
161
{
162
"permission": "storage.objects.get",
163
"resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json",
164
"resourceAttributes": {
165
}
166
}
167
],
168
"methodName": "storage.objects.get",
169
"requestMetadata": {
170
"callerIp": "34.66.19.193",
171
"callerNetwork": "//compute.googleapis.com/projects/grafanalabs-dev/global/networks/__unknown__",
172
"callerSuppliedUserAgent": "thanos-store-gateway/1.5.0 (go1.14.9),gzip(gfe)",
173
"destinationAttributes": {
174
},
175
"requestAttributes": {
176
"auth": {
177
},
178
"time": "2021-01-01T02:17:10.661405637Z"
179
}
180
},
181
"resourceLocation": {
182
"currentLocations": [
183
"us-central1"
184
]
185
},
186
"resourceName": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json",
187
"serviceName": "storage.googleapis.com",
188
"status": {
189
}
190
},
191
"receiveTimestamp": "2021-01-01T02:17:10.82013623Z",
192
"resource": {
193
"labels": {
194
"bucket_name": "dev-us-central1-cortex-tsdb-dev",
195
"location": "us-central1",
196
"project_id": "grafanalabs-dev"
197
},
198
"type": "gcs_bucket"
199
},
200
"severity": "INFO",
201
"timestamp": "2021-01-01T02:17:10.655982344Z"
202
}
203
`
204
)
205
206
var testConfig = &PullConfig{
207
ProjectID: project,
208
Subscription: subscription,
209
Labels: map[string]string{
210
"job": "test-gcplogtarget",
211
},
212
}
213
214
func newFakeSubscription() *fakeSubscription {
215
return &fakeSubscription{
216
messages: make(chan *pubsub.Message),
217
errors: make(chan error),
218
}
219
}
220
221
type fakeSubscription struct {
222
messages chan *pubsub.Message
223
errors chan error
224
}
225
226
func (s *fakeSubscription) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error {
227
for {
228
select {
229
case m := <-s.messages:
230
f(ctx, m)
231
case e := <-s.errors:
232
return e
233
}
234
}
235
}
236
237
var testBackoff = backoff.Config{
238
MinBackoff: 1 * time.Millisecond,
239
MaxBackoff: 10 * time.Millisecond,
240
}
241
242