Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/cluster/gossip.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"fmt"
6
"io"
7
stdlog "log"
8
"net"
9
"net/http"
10
"os"
11
12
"github.com/go-kit/log"
13
"github.com/grafana/dskit/flagext"
14
"github.com/hashicorp/go-discover"
15
"github.com/hashicorp/go-discover/provider/k8s"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/rfratto/ckit"
18
"github.com/rfratto/ckit/advertise"
19
"github.com/rfratto/ckit/peer"
20
"github.com/rfratto/ckit/shard"
21
"go.uber.org/atomic"
22
)
23
24
// extraDiscoverProviders used in tests.
25
var extraDiscoverProviders map[string]discover.Provider
26
27
// tokensPerNode is used to decide how many tokens each node should be given in
28
// the hash ring. All nodes must use the same value, otherwise they will have
29
// different views of the ring and assign work differently.
30
//
31
// Using 256 tokens strikes a good balance between distribution accuracy and
32
// memory consumption. A cluster of 1,000 nodes with 256 tokens per node
33
// requires 6MB for the hash ring, while 12MB is used for 512 tokens per node.
34
//
35
// Distribution accuracy measures how close a node was to being responsible for
36
// exactly 1/N keys during simulation. Simulation tests used a cluster of 10
37
// nodes and hashing 100,000 random keys:
38
//
39
// 256 tokens per node: min 94.0%, median 96.3%, max 115.3%
40
// 512 tokens per node: min 96.1%, median 99.9%, max 103.2%
41
//
42
// While 512 tokens per node is closer to perfect distribution, 256 tokens per
43
// node is good enough, optimizing for lower memory usage.
44
const tokensPerNode = 256
45
46
// GossipConfig controls clustering of Agents through gRPC-based gossip.
47
// GossipConfig cannot be changed at runtime.
48
type GossipConfig struct {
49
// Name of the node within the cluster. Must be unique cluster-wide.
50
NodeName string
51
52
// host:port address to advertise to peers to connect to. When unset, the
53
// first discovered IP from AdvertiseInterfaces will be used to find.
54
AdvertiseAddr string
55
56
// Slice of interface names to infer an advertise IP from. Must be set if
57
// AdvertiseAddr is unset.
58
AdvertiseInterfaces flagext.StringSlice
59
60
// List of one or more host:port peer addresses to connect to. Mutually
61
// exclusive with DiscoverPeers.
62
//
63
// If an agent connects to no peers, it will form a one-node cluster until a
64
// peer connects to it explicitly.
65
JoinPeers flagext.StringSlice
66
67
// Discover peers to connect to using go-discover. Mutually exclusive with
68
// JoinPeers.
69
DiscoverPeers string
70
}
71
72
// DefaultGossipConfig holds default GossipConfig options.
73
var DefaultGossipConfig = GossipConfig{
74
AdvertiseInterfaces: advertise.DefaultInterfaces,
75
}
76
77
// ApplyDefaults mutates c with default settings applied. defaultPort is
78
// added as the default port for addresses that do not have port numbers
79
// assigned.
80
//
81
// An error will be returned if the configuration is invalid or if an error
82
// occurred while applying defaults.
83
func (c *GossipConfig) ApplyDefaults(defaultPort int) error {
84
if c.NodeName == "" {
85
hn, err := os.Hostname()
86
if err != nil {
87
return fmt.Errorf("generating node name: %w", err)
88
}
89
c.NodeName = hn
90
}
91
92
if c.AdvertiseAddr == "" {
93
if len(c.AdvertiseInterfaces) == 0 {
94
return fmt.Errorf("one of advertise address or advertise interfaces must be set")
95
}
96
97
addr, err := advertise.FirstAddress(c.AdvertiseInterfaces)
98
if err != nil {
99
return fmt.Errorf("determining advertise address: %w", err)
100
}
101
c.AdvertiseAddr = fmt.Sprintf("%s:%d", addr.String(), defaultPort)
102
} else {
103
c.AdvertiseAddr = appendDefaultPort(c.AdvertiseAddr, defaultPort)
104
}
105
106
if len(c.JoinPeers) > 0 && c.DiscoverPeers != "" {
107
return fmt.Errorf("at most one of join peers and discover peers may be set")
108
} else if c.DiscoverPeers != "" {
109
providers := make(map[string]discover.Provider, len(discover.Providers)+1)
110
for k, v := range discover.Providers {
111
providers[k] = v
112
}
113
// Extra providers used by tests
114
for k, v := range extraDiscoverProviders {
115
providers[k] = v
116
}
117
118
// Custom providers that aren't enabled by default
119
providers["k8s"] = &k8s.Provider{}
120
121
d, err := discover.New(discover.WithProviders(providers))
122
if err != nil {
123
return fmt.Errorf("bootstrapping peer discovery: %w", err)
124
}
125
126
addrs, err := d.Addrs(c.DiscoverPeers, stdlog.New(io.Discard, "", 0)) // TODO(rfratto): log to log.Logger?
127
if err != nil {
128
return fmt.Errorf("discovering peers: %w", err)
129
}
130
c.JoinPeers = addrs
131
}
132
133
for i := range c.JoinPeers {
134
// Default to using the same advertise port as the local node. This may
135
// break in some cases, so the user should make sure the port numbers
136
// align on as many nodes as possible.
137
c.JoinPeers[i] = appendDefaultPort(c.JoinPeers[i], defaultPort)
138
}
139
140
return nil
141
}
142
143
func appendDefaultPort(addr string, port int) string {
144
_, _, err := net.SplitHostPort(addr)
145
if err == nil {
146
// No error means there was a port in the string
147
return addr
148
}
149
return fmt.Sprintf("%s:%d", addr, port)
150
}
151
152
// GossipNode is a Node which uses gRPC and gossip to discover peers.
153
type GossipNode struct {
154
// NOTE(rfratto): GossipNode is a *very* thin wrapper over ckit.Node, but it
155
// still abstracted out as its own type to have more agent-specific control
156
// over the exposed API.
157
158
cfg *GossipConfig
159
innerNode *ckit.Node
160
log log.Logger
161
sharder shard.Sharder
162
163
started atomic.Bool
164
}
165
166
// NewGossipNode creates an unstarted GossipNode. The GossipNode will use the
167
// passed http.Client to create a new HTTP/2-compatible Transport that can
168
// communicate with other nodes over HTTP/2. GossipConfig is expected to be
169
// valid and have already had ApplyDefaults called on it.
170
//
171
// GossipNode operations are unavailable until the node is started.
172
func NewGossipNode(l log.Logger, reg prometheus.Registerer, cli *http.Client, c *GossipConfig) (*GossipNode, error) {
173
if l == nil {
174
l = log.NewNopLogger()
175
}
176
177
sharder := shard.Ring(tokensPerNode)
178
179
ckitConfig := ckit.Config{
180
Name: c.NodeName,
181
AdvertiseAddr: c.AdvertiseAddr,
182
Sharder: sharder,
183
Log: l,
184
}
185
186
inner, err := ckit.NewNode(cli, ckitConfig)
187
if err != nil {
188
return nil, err
189
}
190
reg.MustRegister(inner.Metrics())
191
192
return &GossipNode{
193
cfg: c,
194
innerNode: inner,
195
log: l,
196
sharder: sharder,
197
}, nil
198
}
199
200
// ChangeState changes the state of n. ChangeState will block until the state
201
// change has been received by another node; cancel the context to stop
202
// waiting. ChangeState will fail if the current state cannot move to the
203
// target state.
204
//
205
// Nodes must be a StateParticipant to receive writes.
206
func (n *GossipNode) ChangeState(ctx context.Context, to peer.State) error {
207
if !n.started.Load() {
208
return fmt.Errorf("node not started")
209
}
210
return n.innerNode.ChangeState(ctx, to)
211
}
212
213
// CurrentState returns the current state of the node. Note that other nodes
214
// may have an older view of the state while a state change propagates
215
// throughout the cluster.
216
func (n *GossipNode) CurrentState() peer.State {
217
return n.innerNode.CurrentState()
218
}
219
220
// Lookup implements Node and returns numOwners Peers that are responsible for
221
// key. Only peers in StateParticipant are considered during a lookup; if no
222
// peers are in StateParticipant, the Lookup will fail.
223
func (n *GossipNode) Lookup(key shard.Key, numOwners int, op shard.Op) ([]peer.Peer, error) {
224
if !n.started.Load() {
225
return nil, fmt.Errorf("node not started")
226
}
227
return n.sharder.Lookup(key, numOwners, op)
228
}
229
230
// Observe registers o to be informed when the cluster changes, including peers
231
// appearing, disappearing, or changing state.
232
//
233
// Calls will have to filter events if they are only interested in a subset of
234
// changes.
235
func (n *GossipNode) Observe(o ckit.Observer) {
236
n.innerNode.Observe(o)
237
}
238
239
// Peers returns the current set of Peers.
240
func (n *GossipNode) Peers() []peer.Peer {
241
return n.innerNode.Peers()
242
}
243
244
// Handler returns the base route and HTTP handlers to register for this node.
245
func (n *GossipNode) Handler() (string, http.Handler) {
246
return n.innerNode.Handler()
247
}
248
249
// Start starts the node. Start will connect to peers if configured to do so.
250
//
251
// Start must only be called after the gRPC server is running, otherwise Start
252
// will block forever.
253
func (n *GossipNode) Start() (err error) {
254
defer func() {
255
if err == nil {
256
n.started.Store(true)
257
}
258
}()
259
return n.innerNode.Start(n.cfg.JoinPeers)
260
}
261
262
// Stop leaves the cluster and terminates n. n cannot be re-used after
263
// stopping.
264
//
265
// It is advisable to ChangeState to StateTerminating and StateGone before
266
// stopping so the local node has an opportunity to move work to other nodes.
267
func (n *GossipNode) Stop() error {
268
return n.innerNode.Stop()
269
}
270
271