Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cluster/node.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"fmt"
6
"hash/fnv"
7
"net/http"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/gorilla/mux"
14
pb "github.com/grafana/agent/pkg/agentproto"
15
"github.com/grafana/agent/pkg/metrics/cluster/client"
16
"github.com/grafana/agent/pkg/util"
17
"github.com/grafana/dskit/backoff"
18
"github.com/grafana/dskit/kv"
19
"github.com/grafana/dskit/ring"
20
"github.com/grafana/dskit/services"
21
"github.com/prometheus/client_golang/prometheus"
22
"github.com/weaveworks/common/user"
23
)
24
25
const (
26
// agentKey is the key used for storing the hash ring.
27
agentKey = "agent"
28
)
29
30
var backoffConfig = backoff.Config{
31
MinBackoff: time.Second,
32
MaxBackoff: 2 * time.Minute,
33
MaxRetries: 10,
34
}
35
36
// node manages membership within a ring. when a node joins or leaves the ring,
37
// it will inform other nodes to reshard their workloads. After a node joins
38
// the ring, it will inform the local service to reshard.
39
type node struct {
40
log log.Logger
41
reg *util.Unregisterer
42
srv pb.ScrapingServiceServer
43
44
mut sync.RWMutex
45
cfg Config
46
ring *ring.Ring
47
lc *ring.Lifecycler
48
49
exited bool
50
reload chan struct{}
51
}
52
53
// newNode creates a new node and registers it to the ring.
54
func newNode(reg prometheus.Registerer, log log.Logger, cfg Config, s pb.ScrapingServiceServer) (*node, error) {
55
n := &node{
56
reg: util.WrapWithUnregisterer(reg),
57
srv: s,
58
log: log,
59
60
reload: make(chan struct{}, 1),
61
}
62
if err := n.ApplyConfig(cfg); err != nil {
63
return nil, err
64
}
65
go n.run()
66
return n, nil
67
}
68
69
func (n *node) ApplyConfig(cfg Config) error {
70
n.mut.Lock()
71
defer n.mut.Unlock()
72
73
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
74
defer cancel()
75
76
// Detect if the config changed.
77
if util.CompareYAML(n.cfg, cfg) {
78
return nil
79
}
80
81
if n.exited {
82
return fmt.Errorf("node already exited")
83
}
84
85
level.Info(n.log).Log("msg", "applying config")
86
87
// Shut down old components before re-creating the updated ones.
88
n.reg.UnregisterAll()
89
90
if n.lc != nil {
91
// Note that this will call performClusterReshard and will block until it
92
// completes.
93
err := services.StopAndAwaitTerminated(ctx, n.lc)
94
if err != nil {
95
return fmt.Errorf("failed to stop lifecycler: %w", err)
96
}
97
n.lc = nil
98
}
99
100
if n.ring != nil {
101
err := services.StopAndAwaitTerminated(ctx, n.ring)
102
if err != nil {
103
return fmt.Errorf("failed to stop ring: %w", err)
104
}
105
n.ring = nil
106
}
107
108
if !cfg.Enabled {
109
n.cfg = cfg
110
return nil
111
}
112
113
r, err := newRing(cfg.Lifecycler.RingConfig, "agent_viewer", agentKey, n.reg, n.log)
114
if err != nil {
115
return fmt.Errorf("failed to create ring: %w", err)
116
}
117
118
if err := services.StartAndAwaitRunning(context.Background(), r); err != nil {
119
return fmt.Errorf("failed to start ring: %w", err)
120
}
121
n.ring = r
122
123
lc, err := ring.NewLifecycler(cfg.Lifecycler.LifecyclerConfig, n, "agent", agentKey, false, n.log, prometheus.WrapRegistererWithPrefix("agent_dskit_", n.reg))
124
if err != nil {
125
return fmt.Errorf("failed to create lifecycler: %w", err)
126
}
127
if err := services.StartAndAwaitRunning(context.Background(), lc); err != nil {
128
if err := services.StopAndAwaitTerminated(ctx, r); err != nil {
129
level.Error(n.log).Log("msg", "failed to stop ring when returning error. next config reload will fail", "err", err)
130
}
131
return fmt.Errorf("failed to start lifecycler: %w", err)
132
}
133
n.lc = lc
134
135
n.cfg = cfg
136
137
// Reload and reshard the cluster.
138
n.reload <- struct{}{}
139
return nil
140
}
141
142
// newRing creates a new Cortex Ring that ignores unhealthy nodes.
143
func newRing(cfg ring.Config, name, key string, reg prometheus.Registerer, log log.Logger) (*ring.Ring, error) {
144
codec := ring.GetCodec()
145
store, err := kv.NewClient(
146
cfg.KVStore,
147
codec,
148
kv.RegistererWithKVName(reg, name+"-ring"),
149
log,
150
)
151
if err != nil {
152
return nil, err
153
}
154
return ring.NewWithStoreClientAndStrategy(cfg, name, key, store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("agent_dskit_", reg), log)
155
}
156
157
// run waits for connection to the ring and kickstarts the join process.
158
func (n *node) run() {
159
for range n.reload {
160
n.mut.RLock()
161
162
if err := n.performClusterReshard(context.Background(), true); err != nil {
163
level.Warn(n.log).Log("msg", "dynamic cluster reshard did not succeed", "err", err)
164
}
165
166
n.mut.RUnlock()
167
}
168
169
level.Info(n.log).Log("msg", "node run loop exiting")
170
}
171
172
// performClusterReshard informs the cluster to immediately trigger a reshard
173
// of their workloads. if joining is true, the server provided to newNode will
174
// also be informed.
175
func (n *node) performClusterReshard(ctx context.Context, joining bool) error {
176
if n.ring == nil || n.lc == nil {
177
level.Info(n.log).Log("msg", "node disabled, not resharding")
178
return nil
179
}
180
181
if n.cfg.ClusterReshardEventTimeout > 0 {
182
var cancel context.CancelFunc
183
ctx, cancel = context.WithTimeout(ctx, n.cfg.ClusterReshardEventTimeout)
184
defer cancel()
185
}
186
187
var (
188
rs ring.ReplicationSet
189
err error
190
)
191
192
backoff := backoff.New(ctx, backoffConfig)
193
for backoff.Ongoing() {
194
if ctx.Err() != nil {
195
return ctx.Err()
196
}
197
rs, err = n.ring.GetAllHealthy(ring.Read)
198
if err == nil {
199
break
200
}
201
backoff.Wait()
202
}
203
204
if len(rs.Instances) > 0 {
205
level.Info(n.log).Log("msg", "informing remote nodes to reshard")
206
}
207
208
// These are not in the go routine below due to potential race condition with n.lc.addr
209
_, err = rs.Do(ctx, 500*time.Millisecond, func(c context.Context, id *ring.InstanceDesc) (interface{}, error) {
210
// Skip over ourselves.
211
if id.Addr == n.lc.Addr {
212
return nil, nil
213
}
214
215
notifyCtx := user.InjectOrgID(c, "fake")
216
return nil, n.notifyReshard(notifyCtx, id)
217
})
218
219
if err != nil {
220
level.Error(n.log).Log("msg", "notifying other nodes failed", "err", err)
221
}
222
223
if joining {
224
level.Info(n.log).Log("msg", "running local reshard")
225
if _, err := n.srv.Reshard(ctx, &pb.ReshardRequest{}); err != nil {
226
level.Warn(n.log).Log("msg", "dynamic local reshard did not succeed", "err", err)
227
}
228
}
229
return err
230
}
231
232
// notifyReshard informs an individual node to reshard.
233
func (n *node) notifyReshard(ctx context.Context, id *ring.InstanceDesc) error {
234
cli, err := client.New(n.cfg.Client, id.Addr)
235
if err != nil {
236
return err
237
}
238
defer cli.Close()
239
240
level.Info(n.log).Log("msg", "attempting to notify remote agent to reshard", "addr", id.Addr)
241
242
backoff := backoff.New(ctx, backoffConfig)
243
for backoff.Ongoing() {
244
if ctx.Err() != nil {
245
return ctx.Err()
246
}
247
_, err := cli.Reshard(ctx, &pb.ReshardRequest{})
248
if err == nil {
249
break
250
}
251
252
level.Warn(n.log).Log("msg", "reshard notification attempt failed", "addr", id.Addr, "err", err, "attempt", backoff.NumRetries())
253
backoff.Wait()
254
}
255
256
return backoff.Err()
257
}
258
259
// WaitJoined waits for the node the join the cluster and enter the
260
// ACTIVE state.
261
func (n *node) WaitJoined(ctx context.Context) error {
262
n.mut.RLock()
263
defer n.mut.RUnlock()
264
265
level.Info(n.log).Log("msg", "waiting for the node to join the cluster")
266
defer level.Info(n.log).Log("msg", "node has joined the cluster")
267
268
if n.ring == nil || n.lc == nil {
269
return fmt.Errorf("node disabled")
270
}
271
272
return waitJoined(ctx, agentKey, n.ring.KVClient, n.lc.ID)
273
}
274
275
func waitJoined(ctx context.Context, key string, kvClient kv.Client, id string) error {
276
kvClient.WatchKey(ctx, key, func(value interface{}) bool {
277
if value == nil {
278
return true
279
}
280
281
desc := value.(*ring.Desc)
282
for ingID, ing := range desc.Ingesters {
283
if ingID == id && ing.State == ring.ACTIVE {
284
return false
285
}
286
}
287
288
return true
289
})
290
291
return ctx.Err()
292
}
293
294
func (n *node) WireAPI(r *mux.Router) {
295
r.HandleFunc("/debug/ring", func(rw http.ResponseWriter, r *http.Request) {
296
n.mut.RLock()
297
defer n.mut.RUnlock()
298
299
if n.ring == nil {
300
http.NotFoundHandler().ServeHTTP(rw, r)
301
return
302
}
303
304
n.ring.ServeHTTP(rw, r)
305
})
306
}
307
308
// Stop stops the node and cancels it from running. The node cannot be used
309
// again once Stop is called.
310
func (n *node) Stop() error {
311
n.mut.Lock()
312
defer n.mut.Unlock()
313
314
if n.exited {
315
return fmt.Errorf("node already exited")
316
}
317
n.exited = true
318
319
level.Info(n.log).Log("msg", "shutting down node")
320
321
// Shut down dependencies. The lifecycler *MUST* be shut down first since n.ring is
322
// used during the shutdown process to inform other nodes to reshard.
323
//
324
// Note that stopping the lifecycler will call performClusterReshard and will block
325
// until it completes.
326
var (
327
firstError error
328
deps []services.Service
329
)
330
331
if n.lc != nil {
332
deps = append(deps, n.lc)
333
}
334
if n.ring != nil {
335
deps = append(deps, n.ring)
336
}
337
for _, dep := range deps {
338
err := services.StopAndAwaitTerminated(context.Background(), dep)
339
if err != nil && firstError == nil {
340
firstError = err
341
}
342
}
343
344
close(n.reload)
345
level.Info(n.log).Log("msg", "node shut down")
346
return firstError
347
}
348
349
// Flush implements ring.FlushTransferer. It's a no-op.
350
func (n *node) Flush() {}
351
352
// TransferOut implements ring.FlushTransferer. It connects to all other healthy agents and
353
// tells them to reshard. TransferOut should NOT be called manually unless the mutex is
354
// held.
355
func (n *node) TransferOut(ctx context.Context) error {
356
return n.performClusterReshard(ctx, false)
357
}
358
359
// Owns checks to see if a key is owned by this node. owns will return
360
// an error if the ring is empty or if there aren't enough healthy nodes.
361
func (n *node) Owns(key string) (bool, error) {
362
n.mut.RLock()
363
defer n.mut.RUnlock()
364
365
rs, err := n.ring.Get(keyHash(key), ring.Write, nil, nil, nil)
366
if err != nil {
367
return false, err
368
}
369
for _, r := range rs.Instances {
370
if r.Addr == n.lc.Addr {
371
return true, nil
372
}
373
}
374
return false, nil
375
}
376
377
func keyHash(key string) uint32 {
378
h := fnv.New32()
379
_, _ = h.Write([]byte(key))
380
return h.Sum32()
381
}
382
383