package cluster12import (3"context"4"fmt"5"io"6stdlog "log"7"net"8"net/http"9"os"1011"github.com/go-kit/log"12"github.com/grafana/dskit/flagext"13"github.com/hashicorp/go-discover"14"github.com/hashicorp/go-discover/provider/k8s"15"github.com/prometheus/client_golang/prometheus"16"github.com/rfratto/ckit"17"github.com/rfratto/ckit/advertise"18"github.com/rfratto/ckit/peer"19"github.com/rfratto/ckit/shard"20"go.uber.org/atomic"21)2223// extraDiscoverProviders used in tests.24var extraDiscoverProviders map[string]discover.Provider2526// tokensPerNode is used to decide how many tokens each node should be given in27// the hash ring. All nodes must use the same value, otherwise they will have28// different views of the ring and assign work differently.29//30// Using 256 tokens strikes a good balance between distribution accuracy and31// memory consumption. A cluster of 1,000 nodes with 256 tokens per node32// requires 6MB for the hash ring, while 12MB is used for 512 tokens per node.33//34// Distribution accuracy measures how close a node was to being responsible for35// exactly 1/N keys during simulation. Simulation tests used a cluster of 1036// nodes and hashing 100,000 random keys:37//38// 256 tokens per node: min 94.0%, median 96.3%, max 115.3%39// 512 tokens per node: min 96.1%, median 99.9%, max 103.2%40//41// While 512 tokens per node is closer to perfect distribution, 256 tokens per42// node is good enough, optimizing for lower memory usage.43const tokensPerNode = 2564445// GossipConfig controls clustering of Agents through gRPC-based gossip.46// GossipConfig cannot be changed at runtime.47type GossipConfig struct {48// Name of the node within the cluster. Must be unique cluster-wide.49NodeName string5051// host:port address to advertise to peers to connect to. When unset, the52// first discovered IP from AdvertiseInterfaces will be used to find.53AdvertiseAddr string5455// Slice of interface names to infer an advertise IP from. Must be set if56// AdvertiseAddr is unset.57AdvertiseInterfaces flagext.StringSlice5859// List of one or more host:port peer addresses to connect to. Mutually60// exclusive with DiscoverPeers.61//62// If an agent connects to no peers, it will form a one-node cluster until a63// peer connects to it explicitly.64JoinPeers flagext.StringSlice6566// Discover peers to connect to using go-discover. Mutually exclusive with67// JoinPeers.68DiscoverPeers string69}7071// DefaultGossipConfig holds default GossipConfig options.72var DefaultGossipConfig = GossipConfig{73AdvertiseInterfaces: advertise.DefaultInterfaces,74}7576// ApplyDefaults mutates c with default settings applied. defaultPort is77// added as the default port for addresses that do not have port numbers78// assigned.79//80// An error will be returned if the configuration is invalid or if an error81// occurred while applying defaults.82func (c *GossipConfig) ApplyDefaults(defaultPort int) error {83if c.NodeName == "" {84hn, err := os.Hostname()85if err != nil {86return fmt.Errorf("generating node name: %w", err)87}88c.NodeName = hn89}9091if c.AdvertiseAddr == "" {92if len(c.AdvertiseInterfaces) == 0 {93return fmt.Errorf("one of advertise address or advertise interfaces must be set")94}9596addr, err := advertise.FirstAddress(c.AdvertiseInterfaces)97if err != nil {98return fmt.Errorf("determining advertise address: %w", err)99}100c.AdvertiseAddr = fmt.Sprintf("%s:%d", addr.String(), defaultPort)101} else {102c.AdvertiseAddr = appendDefaultPort(c.AdvertiseAddr, defaultPort)103}104105if len(c.JoinPeers) > 0 && c.DiscoverPeers != "" {106return fmt.Errorf("at most one of join peers and discover peers may be set")107} else if c.DiscoverPeers != "" {108providers := make(map[string]discover.Provider, len(discover.Providers)+1)109for k, v := range discover.Providers {110providers[k] = v111}112// Extra providers used by tests113for k, v := range extraDiscoverProviders {114providers[k] = v115}116117// Custom providers that aren't enabled by default118providers["k8s"] = &k8s.Provider{}119120d, err := discover.New(discover.WithProviders(providers))121if err != nil {122return fmt.Errorf("bootstrapping peer discovery: %w", err)123}124125addrs, err := d.Addrs(c.DiscoverPeers, stdlog.New(io.Discard, "", 0)) // TODO(rfratto): log to log.Logger?126if err != nil {127return fmt.Errorf("discovering peers: %w", err)128}129c.JoinPeers = addrs130}131132for i := range c.JoinPeers {133// Default to using the same advertise port as the local node. This may134// break in some cases, so the user should make sure the port numbers135// align on as many nodes as possible.136c.JoinPeers[i] = appendDefaultPort(c.JoinPeers[i], defaultPort)137}138139return nil140}141142func appendDefaultPort(addr string, port int) string {143_, _, err := net.SplitHostPort(addr)144if err == nil {145// No error means there was a port in the string146return addr147}148return fmt.Sprintf("%s:%d", addr, port)149}150151// GossipNode is a Node which uses gRPC and gossip to discover peers.152type GossipNode struct {153// NOTE(rfratto): GossipNode is a *very* thin wrapper over ckit.Node, but it154// still abstracted out as its own type to have more agent-specific control155// over the exposed API.156157cfg *GossipConfig158innerNode *ckit.Node159log log.Logger160sharder shard.Sharder161162started atomic.Bool163}164165// NewGossipNode creates an unstarted GossipNode. The GossipNode will use the166// passed http.Client to create a new HTTP/2-compatible Transport that can167// communicate with other nodes over HTTP/2. GossipConfig is expected to be168// valid and have already had ApplyDefaults called on it.169//170// GossipNode operations are unavailable until the node is started.171func NewGossipNode(l log.Logger, reg prometheus.Registerer, cli *http.Client, c *GossipConfig) (*GossipNode, error) {172if l == nil {173l = log.NewNopLogger()174}175176sharder := shard.Ring(tokensPerNode)177178ckitConfig := ckit.Config{179Name: c.NodeName,180AdvertiseAddr: c.AdvertiseAddr,181Sharder: sharder,182Log: l,183}184185inner, err := ckit.NewNode(cli, ckitConfig)186if err != nil {187return nil, err188}189reg.MustRegister(inner.Metrics())190191return &GossipNode{192cfg: c,193innerNode: inner,194log: l,195sharder: sharder,196}, nil197}198199// ChangeState changes the state of n. ChangeState will block until the state200// change has been received by another node; cancel the context to stop201// waiting. ChangeState will fail if the current state cannot move to the202// target state.203//204// Nodes must be a StateParticipant to receive writes.205func (n *GossipNode) ChangeState(ctx context.Context, to peer.State) error {206if !n.started.Load() {207return fmt.Errorf("node not started")208}209return n.innerNode.ChangeState(ctx, to)210}211212// CurrentState returns the current state of the node. Note that other nodes213// may have an older view of the state while a state change propagates214// throughout the cluster.215func (n *GossipNode) CurrentState() peer.State {216return n.innerNode.CurrentState()217}218219// Lookup implements Node and returns numOwners Peers that are responsible for220// key. Only peers in StateParticipant are considered during a lookup; if no221// peers are in StateParticipant, the Lookup will fail.222func (n *GossipNode) Lookup(key shard.Key, numOwners int, op shard.Op) ([]peer.Peer, error) {223if !n.started.Load() {224return nil, fmt.Errorf("node not started")225}226return n.sharder.Lookup(key, numOwners, op)227}228229// Observe registers o to be informed when the cluster changes, including peers230// appearing, disappearing, or changing state.231//232// Calls will have to filter events if they are only interested in a subset of233// changes.234func (n *GossipNode) Observe(o ckit.Observer) {235n.innerNode.Observe(o)236}237238// Peers returns the current set of Peers.239func (n *GossipNode) Peers() []peer.Peer {240return n.innerNode.Peers()241}242243// Handler returns the base route and HTTP handlers to register for this node.244func (n *GossipNode) Handler() (string, http.Handler) {245return n.innerNode.Handler()246}247248// Start starts the node. Start will connect to peers if configured to do so.249//250// Start must only be called after the gRPC server is running, otherwise Start251// will block forever.252func (n *GossipNode) Start() (err error) {253defer func() {254if err == nil {255n.started.Store(true)256}257}()258return n.innerNode.Start(n.cfg.JoinPeers)259}260261// Stop leaves the cluster and terminates n. n cannot be re-used after262// stopping.263//264// It is advisable to ChangeState to StateTerminating and StateGone before265// stopping so the local node has an opportunity to move work to other nodes.266func (n *GossipNode) Stop() error {267return n.innerNode.Stop()268}269270271