Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cluster/node_test.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"flag"
6
"fmt"
7
"math/rand"
8
"net"
9
"testing"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/golang/protobuf/ptypes/empty"
15
"github.com/grafana/agent/pkg/agentproto"
16
"github.com/grafana/agent/pkg/util"
17
"github.com/grafana/dskit/ring"
18
"github.com/grafana/dskit/services"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/stretchr/testify/require"
21
"go.uber.org/atomic"
22
"google.golang.org/grpc"
23
"gopkg.in/yaml.v2"
24
)
25
26
func Test_node_Join(t *testing.T) {
27
var (
28
reg = prometheus.NewRegistry()
29
logger = util.TestLogger(t)
30
31
localReshard = make(chan struct{}, 2)
32
remoteReshard = make(chan struct{}, 2)
33
)
34
35
local := &agentproto.FuncScrapingServiceServer{
36
ReshardFunc: func(c context.Context, rr *agentproto.ReshardRequest) (*empty.Empty, error) {
37
localReshard <- struct{}{}
38
return &empty.Empty{}, nil
39
},
40
}
41
42
remote := &agentproto.FuncScrapingServiceServer{
43
ReshardFunc: func(c context.Context, rr *agentproto.ReshardRequest) (*empty.Empty, error) {
44
remoteReshard <- struct{}{}
45
return &empty.Empty{}, nil
46
},
47
}
48
startNode(t, remote, logger)
49
50
nodeConfig := DefaultConfig
51
nodeConfig.Enabled = true
52
nodeConfig.Lifecycler.LifecyclerConfig = testLifecyclerConfig(t)
53
54
n, err := newNode(reg, logger, nodeConfig, local)
55
require.NoError(t, err)
56
t.Cleanup(func() { _ = n.Stop() })
57
58
require.NoError(t, n.WaitJoined(context.Background()))
59
60
waitAll(t, remoteReshard, localReshard)
61
}
62
63
// waitAll waits for a message on all channels.
64
func waitAll(t *testing.T, chs ...chan struct{}) {
65
timeoutCh := time.After(5 * time.Second)
66
for _, ch := range chs {
67
select {
68
case <-timeoutCh:
69
require.FailNow(t, "timeout exceeded")
70
case <-ch:
71
}
72
}
73
}
74
75
func Test_node_Leave(t *testing.T) {
76
var (
77
reg = prometheus.NewRegistry()
78
logger = util.TestLogger(t)
79
80
sendReshard = atomic.NewBool(false)
81
remoteReshard = make(chan struct{}, 2)
82
)
83
84
local := &agentproto.FuncScrapingServiceServer{
85
ReshardFunc: func(c context.Context, rr *agentproto.ReshardRequest) (*empty.Empty, error) {
86
return &empty.Empty{}, nil
87
},
88
}
89
90
remote := &agentproto.FuncScrapingServiceServer{
91
ReshardFunc: func(c context.Context, rr *agentproto.ReshardRequest) (*empty.Empty, error) {
92
if sendReshard.Load() {
93
remoteReshard <- struct{}{}
94
}
95
return &empty.Empty{}, nil
96
},
97
}
98
startNode(t, remote, logger)
99
100
nodeConfig := DefaultConfig
101
nodeConfig.Enabled = true
102
nodeConfig.Lifecycler.LifecyclerConfig = testLifecyclerConfig(t)
103
104
n, err := newNode(reg, logger, nodeConfig, local)
105
require.NoError(t, err)
106
require.NoError(t, n.WaitJoined(context.Background()))
107
108
// Update the reshard function to write to remoteReshard on shutdown.
109
sendReshard.Store(true)
110
111
// Stop the node so it transfers data outward.
112
require.NoError(t, n.Stop(), "failed to stop the node")
113
114
level.Info(logger).Log("msg", "waiting for remote reshard to occur")
115
waitAll(t, remoteReshard)
116
}
117
118
func Test_node_ApplyConfig(t *testing.T) {
119
var (
120
reg = prometheus.NewRegistry()
121
logger = util.TestLogger(t)
122
123
localReshard = make(chan struct{}, 10)
124
)
125
126
local := &agentproto.FuncScrapingServiceServer{
127
ReshardFunc: func(c context.Context, rr *agentproto.ReshardRequest) (*empty.Empty, error) {
128
localReshard <- struct{}{}
129
return &empty.Empty{}, nil
130
},
131
}
132
133
nodeConfig := DefaultConfig
134
nodeConfig.Enabled = true
135
nodeConfig.Lifecycler.LifecyclerConfig = testLifecyclerConfig(t)
136
137
n, err := newNode(reg, logger, nodeConfig, local)
138
require.NoError(t, err)
139
t.Cleanup(func() { _ = n.Stop() })
140
require.NoError(t, n.WaitJoined(context.Background()))
141
142
// Wait for the initial join to trigger.
143
waitAll(t, localReshard)
144
145
// An ApplyConfig working correctly should re-join the cluster, which can be
146
// detected by local resharding applying twice.
147
nodeConfig.Lifecycler.NumTokens = 1
148
require.NoError(t, n.ApplyConfig(nodeConfig), "failed to apply new config")
149
require.NoError(t, n.WaitJoined(context.Background()))
150
151
waitAll(t, localReshard)
152
}
153
154
// startNode launches srv as a gRPC server and registers it to the ring.
155
func startNode(t *testing.T, srv agentproto.ScrapingServiceServer, logger log.Logger) {
156
t.Helper()
157
158
l, err := net.Listen("tcp", "127.0.0.1:0")
159
require.NoError(t, err)
160
161
grpcServer := grpc.NewServer()
162
agentproto.RegisterScrapingServiceServer(grpcServer, srv)
163
164
go func() {
165
_ = grpcServer.Serve(l)
166
}()
167
t.Cleanup(func() { grpcServer.Stop() })
168
169
lcConfig := testLifecyclerConfig(t)
170
lcConfig.Addr = l.Addr().(*net.TCPAddr).IP.String()
171
lcConfig.Port = l.Addr().(*net.TCPAddr).Port
172
173
lc, err := ring.NewLifecycler(lcConfig, ring.NewNoopFlushTransferer(), "agent", "agent", false, logger, nil)
174
require.NoError(t, err)
175
176
err = services.StartAndAwaitRunning(context.Background(), lc)
177
require.NoError(t, err)
178
179
// Wait for the new node to be in the ring.
180
joinWaitCtx, joinWaitCancel := context.WithTimeout(context.Background(), 5*time.Second)
181
defer joinWaitCancel()
182
err = waitJoined(joinWaitCtx, agentKey, lc.KVStore, lc.ID)
183
require.NoError(t, err)
184
185
t.Cleanup(func() {
186
_ = services.StopAndAwaitTerminated(context.Background(), lc)
187
})
188
}
189
190
func testLifecyclerConfig(t *testing.T) ring.LifecyclerConfig {
191
t.Helper()
192
193
cfgText := util.Untab(fmt.Sprintf(`
194
ring:
195
kvstore:
196
store: inmemory
197
prefix: tests/%s
198
final_sleep: 0s
199
min_ready_duration: 0s
200
`, t.Name()))
201
202
// Apply default values by registering to a fake flag set.
203
var lc ring.LifecyclerConfig
204
lc.RegisterFlagsWithPrefix("", flag.NewFlagSet("", flag.ContinueOnError), log.NewNopLogger())
205
206
err := yaml.Unmarshal([]byte(cfgText), &lc)
207
require.NoError(t, err)
208
209
// Assign a random default ID.
210
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
211
name := make([]rune, 10)
212
for i := range name {
213
name[i] = letters[rand.Intn(len(letters))]
214
}
215
lc.ID = string(name)
216
217
// Add an invalid default address/port. Tests can override if they expect
218
// incoming traffic.
219
lc.Addr = "x.x.x.x"
220
lc.Port = -1
221
222
return lc
223
}
224
225