package cluster
import (
"context"
"fmt"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/protobuf/ptypes/empty"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/agentproto"
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/grafana/agent/pkg/metrics/instance/configstore"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
type Cluster struct {
mut sync.RWMutex
log log.Logger
cfg Config
baseValidation ValidationFunc
node *node
store *configstore.Remote
storeAPI *configstore.API
watcher *configWatcher
}
func New(
l log.Logger,
reg prometheus.Registerer,
cfg Config,
im instance.Manager,
validate ValidationFunc,
) (*Cluster, error) {
l = log.With(l, "component", "cluster")
var (
c = &Cluster{log: l, cfg: cfg, baseValidation: validate}
err error
)
c.mut.Lock()
defer c.mut.Unlock()
c.node, err = newNode(reg, l, cfg, c)
if err != nil {
return nil, fmt.Errorf("failed to initialize node membership: %w", err)
}
c.store, err = configstore.NewRemote(l, reg, cfg.KVStore.Config, cfg.Enabled)
if err != nil {
return nil, fmt.Errorf("failed to initialize configstore: %w", err)
}
c.storeAPI = configstore.NewAPI(l, c.store, c.storeValidate, cfg.APIEnableGetConfiguration)
reg.MustRegister(c.storeAPI)
c.watcher, err = newConfigWatcher(l, cfg, c.store, im, c.node.Owns, validate)
if err != nil {
return nil, fmt.Errorf("failed to initialize configwatcher: %w", err)
}
return c, nil
}
func (c *Cluster) storeValidate(cfg *instance.Config) error {
c.mut.RLock()
defer c.mut.RUnlock()
if err := c.baseValidation(cfg); err != nil {
return err
}
if c.cfg.DangerousAllowReadingFiles {
return nil
}
return validateNofiles(cfg)
}
func (c *Cluster) Reshard(ctx context.Context, _ *agentproto.ReshardRequest) (*empty.Empty, error) {
c.mut.RLock()
defer c.mut.RUnlock()
level.Info(c.log).Log("msg", "received reshard notification, requesting refresh")
c.watcher.RequestRefresh()
return &empty.Empty{}, nil
}
func (c *Cluster) ApplyConfig(cfg Config) error {
c.mut.Lock()
defer c.mut.Unlock()
if util.CompareYAML(c.cfg, cfg) {
return nil
}
if err := c.node.ApplyConfig(cfg); err != nil {
return fmt.Errorf("failed to apply config to node membership: %w", err)
}
if err := c.store.ApplyConfig(cfg.Lifecycler.RingConfig.KVStore, cfg.Enabled); err != nil {
return fmt.Errorf("failed to apply config to config store: %w", err)
}
if err := c.watcher.ApplyConfig(cfg); err != nil {
return fmt.Errorf("failed to apply config to watcher: %w", err)
}
c.cfg = cfg
level.Info(c.log).Log("msg", "cluster config changed, queueing refresh")
c.watcher.RequestRefresh()
return nil
}
func (c *Cluster) WireAPI(r *mux.Router) {
c.storeAPI.WireAPI(r)
c.node.WireAPI(r)
}
func (c *Cluster) WireGRPC(srv *grpc.Server) {
agentproto.RegisterScrapingServiceServer(srv, c)
}
func (c *Cluster) Stop() {
c.mut.Lock()
defer c.mut.Unlock()
deps := []struct {
name string
closer func() error
}{
{"node", c.node.Stop},
{"config store", c.store.Close},
{"config watcher", c.watcher.Stop},
}
for _, dep := range deps {
err := dep.closer()
if err != nil {
level.Error(c.log).Log("msg", "failed to stop dependency", "dependency", dep.name, "err", err)
}
}
}