Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/cluster/cluster.go
4093 views
1
// Package cluster enables an agent-wide cluster mechanism which subsystems can
2
// use to determine ownership of some key.
3
package cluster
4
5
import (
6
"context"
7
"crypto/tls"
8
"fmt"
9
"net"
10
"net/http"
11
"strconv"
12
"strings"
13
"time"
14
15
"github.com/go-kit/log"
16
"github.com/go-kit/log/level"
17
"github.com/prometheus/client_golang/prometheus"
18
"github.com/rfratto/ckit"
19
"github.com/rfratto/ckit/peer"
20
"github.com/rfratto/ckit/shard"
21
"golang.org/x/net/http2"
22
)
23
24
// Node is a read-only view of a cluster node.
25
type Node interface {
26
// Lookup determines the set of replicationFactor owners for a given key.
27
// peer.Peer.Self can be used to determine if the local node is the owner,
28
// allowing for short-circuiting logic to connect directly to the local node
29
// instead of using the network.
30
//
31
// Callers can use github.com/rfratto/ckit/shard.StringKey or
32
// shard.NewKeyBuilder to create a key.
33
Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error)
34
35
// Observe registers an Observer to receive notifications when the set of
36
// Peers for a Node changes.
37
Observe(ckit.Observer)
38
39
// Peers returns the current set of peers for a Node.
40
Peers() []peer.Peer
41
42
Handler() (string, http.Handler)
43
}
44
45
// NewLocalNode returns a Node which forms a single-node cluster and never
46
// connects to other nodes.
47
//
48
// selfAddr is the address for a Node to use to connect to itself over gRPC.
49
func NewLocalNode(selfAddr string) Node {
50
p := peer.Peer{
51
Name: "local",
52
Addr: selfAddr,
53
Self: true,
54
State: peer.StateParticipant,
55
}
56
57
return &localNode{self: p}
58
}
59
60
type localNode struct{ self peer.Peer }
61
62
func (ln *localNode) Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error) {
63
if replicationFactor == 0 {
64
return nil, nil
65
} else if replicationFactor > 1 {
66
return nil, fmt.Errorf("need %d nodes; only 1 available", replicationFactor)
67
}
68
69
return []peer.Peer{ln.self}, nil
70
}
71
72
func (ln *localNode) Observe(ckit.Observer) {
73
// no-op: the cluster will never change for a local-only node.
74
}
75
76
func (ln *localNode) Peers() []peer.Peer {
77
return []peer.Peer{ln.self}
78
}
79
80
func (ln *localNode) Handler() (string, http.Handler) {
81
mux := http.NewServeMux()
82
mux.HandleFunc("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
83
_, _ = w.Write([]byte("clustering is disabled"))
84
w.WriteHeader(http.StatusBadRequest)
85
}))
86
87
return "/api/v1/ckit/transport/", mux
88
}
89
90
// Clusterer implements the behavior required for operating Flow controllers
91
// in a distributed fashion.
92
type Clusterer struct {
93
Node Node
94
}
95
96
func getJoinAddr(addrs []string, in string) []string {
97
_, _, err := net.SplitHostPort(in)
98
if err == nil {
99
addrs = append(addrs, in)
100
return addrs
101
}
102
103
ip := net.ParseIP(in)
104
if ip != nil {
105
addrs = append(addrs, ip.String())
106
return addrs
107
}
108
109
_, srvs, err := net.LookupSRV("", "", in)
110
if err == nil {
111
for _, srv := range srvs {
112
addrs = append(addrs, srv.Target)
113
}
114
}
115
116
return addrs
117
}
118
119
// New creates a Clusterer.
120
func New(log log.Logger, reg prometheus.Registerer, clusterEnabled bool, listenAddr, advertiseAddr, joinAddr string) (*Clusterer, error) {
121
// Standalone node.
122
if !clusterEnabled {
123
return &Clusterer{Node: NewLocalNode(listenAddr)}, nil
124
}
125
126
gossipConfig := DefaultGossipConfig
127
128
defaultPort := 80
129
_, portStr, err := net.SplitHostPort(listenAddr)
130
if err == nil { // there was a port
131
defaultPort, err = strconv.Atoi(portStr)
132
if err != nil {
133
return nil, err
134
}
135
}
136
137
if advertiseAddr != "" {
138
gossipConfig.AdvertiseAddr = advertiseAddr
139
}
140
141
if joinAddr != "" {
142
gossipConfig.JoinPeers = []string{}
143
jaddrs := strings.Split(joinAddr, ",")
144
for _, jaddr := range jaddrs {
145
gossipConfig.JoinPeers = getJoinAddr(gossipConfig.JoinPeers, jaddr)
146
}
147
}
148
149
err = gossipConfig.ApplyDefaults(defaultPort)
150
if err != nil {
151
return nil, err
152
}
153
154
cli := &http.Client{
155
Transport: &http2.Transport{
156
AllowHTTP: true,
157
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
158
// Set a maximum timeout for establishing the connection. If our
159
// context has a deadline earlier than our timeout, we shrink the
160
// timeout to it.
161
//
162
// TODO(rfratto): consider making the max timeout configurable.
163
timeout := 30 * time.Second
164
if dur, ok := deadlineDuration(ctx); ok && dur < timeout {
165
timeout = dur
166
}
167
168
return net.DialTimeout(network, addr, timeout)
169
},
170
},
171
}
172
173
level.Info(log).Log("msg", "starting a new gossip node", "join-peers", gossipConfig.JoinPeers)
174
175
gossipNode, err := NewGossipNode(log, reg, cli, &gossipConfig)
176
if err != nil {
177
return nil, err
178
}
179
180
// Attempt to start the Node by connecting to the peers in gossipConfig.
181
// If we cannot connect to any peers, fall back to bootstrapping a new
182
// cluster by ourselves.
183
err = gossipNode.Start()
184
if err != nil {
185
level.Debug(log).Log("msg", "failed to connect to peers; bootstrapping a new cluster")
186
gossipConfig.JoinPeers = nil
187
err = gossipNode.Start()
188
if err != nil {
189
return nil, err
190
}
191
}
192
193
// Nodes initially join the cluster in the Viewer state. We can move to the
194
// Participant state to signal that we wish to participate in reading or
195
// writing data.
196
err = gossipNode.ChangeState(context.Background(), peer.StateParticipant)
197
if err != nil {
198
return nil, err
199
}
200
201
res := &Clusterer{Node: gossipNode}
202
203
gossipNode.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) {
204
names := make([]string, len(peers))
205
for i, p := range peers {
206
names[i] = p.Name
207
}
208
level.Info(log).Log("msg", "peers changed", "new_peers", strings.Join(names, ","))
209
return true
210
}))
211
212
return res, nil
213
}
214
215
func deadlineDuration(ctx context.Context) (d time.Duration, ok bool) {
216
if t, ok := ctx.Deadline(); ok {
217
return time.Until(t), true
218
}
219
return 0, false
220
}
221
222