package metrics
import (
"errors"
"flag"
"fmt"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"google.golang.org/grpc"
"github.com/grafana/agent/pkg/metrics/cluster"
"github.com/grafana/agent/pkg/metrics/cluster/client"
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/prometheus/discovery"
)
var DefaultConfig = Config{
Global: instance.DefaultGlobalConfig,
InstanceRestartBackoff: instance.DefaultBasicManagerConfig.InstanceRestartBackoff,
WALDir: "data-agent/",
WALCleanupAge: DefaultCleanupAge,
WALCleanupPeriod: DefaultCleanupPeriod,
ServiceConfig: cluster.DefaultConfig,
ServiceClientConfig: client.DefaultConfig,
InstanceMode: instance.DefaultMode,
}
type Config struct {
Global instance.GlobalConfig `yaml:"global,omitempty"`
WALDir string `yaml:"wal_directory,omitempty"`
WALCleanupAge time.Duration `yaml:"wal_cleanup_age,omitempty"`
WALCleanupPeriod time.Duration `yaml:"wal_cleanup_period,omitempty"`
ServiceConfig cluster.Config `yaml:"scraping_service,omitempty"`
ServiceClientConfig client.Config `yaml:"scraping_service_client,omitempty"`
Configs []instance.Config `yaml:"configs,omitempty"`
InstanceRestartBackoff time.Duration `yaml:"instance_restart_backoff,omitempty"`
InstanceMode instance.Mode `yaml:"instance_mode,omitempty"`
DisableKeepAlives bool `yaml:"http_disable_keepalives,omitempty"`
IdleConnTimeout time.Duration `yaml:"http_idle_conn_timeout,omitempty"`
Unmarshaled bool `yaml:"-"`
}
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig
util.DefaultConfigFromFlags(c)
c.Unmarshaled = true
type plain Config
err := unmarshal((*plain)(c))
if err != nil {
return err
}
c.ServiceConfig.Client = c.ServiceClientConfig
return nil
}
func (c *Config) ApplyDefaults() error {
needWAL := len(c.Configs) > 0 || c.ServiceConfig.Enabled
if needWAL && c.WALDir == "" {
return errors.New("no wal_directory configured")
}
if c.ServiceConfig.Enabled && len(c.Configs) > 0 {
return errors.New("cannot use configs when scraping_service mode is enabled")
}
c.Global.DisableKeepAlives = c.DisableKeepAlives
c.Global.IdleConnTimeout = c.IdleConnTimeout
usedNames := map[string]struct{}{}
for i := range c.Configs {
name := c.Configs[i].Name
if err := c.Configs[i].ApplyDefaults(c.Global); err != nil {
if name == "" {
name = fmt.Sprintf("at index %d", i)
}
return fmt.Errorf("error validating instance %s: %w", name, err)
}
if _, ok := usedNames[name]; ok {
return fmt.Errorf(
"prometheus instance names must be unique. found multiple instances with name %s",
name,
)
}
usedNames[name] = struct{}{}
}
return nil
}
func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("metrics.", f)
}
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WALDir, prefix+"wal-directory", DefaultConfig.WALDir, "base directory to store the WAL in")
f.DurationVar(&c.WALCleanupAge, prefix+"wal-cleanup-age", DefaultConfig.WALCleanupAge, "remove abandoned (unused) WALs older than this")
f.DurationVar(&c.WALCleanupPeriod, prefix+"wal-cleanup-period", DefaultConfig.WALCleanupPeriod, "how often to check for abandoned WALs")
f.DurationVar(&c.InstanceRestartBackoff, prefix+"instance-restart-backoff", DefaultConfig.InstanceRestartBackoff, "how long to wait before restarting a failed Prometheus instance")
c.ServiceConfig.RegisterFlagsWithPrefix(prefix+"service.", f)
c.ServiceClientConfig.RegisterFlagsWithPrefix(prefix, f)
}
type Agent struct {
mut sync.RWMutex
cfg Config
logger log.Logger
reg prometheus.Registerer
bm *instance.BasicManager
mm *instance.ModalManager
cleaner *WALCleaner
instanceFactory instanceFactory
cluster *cluster.Cluster
stopped bool
stopOnce sync.Once
actor chan func()
initialBootDone atomic.Bool
}
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (*Agent, error) {
discovery.RegisterMetrics()
return newAgent(reg, cfg, logger, defaultInstanceFactory)
}
func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact instanceFactory) (*Agent, error) {
a := &Agent{
logger: log.With(logger, "agent", "prometheus"),
instanceFactory: fact,
reg: reg,
actor: make(chan func(), 1),
}
a.bm = instance.NewBasicManager(instance.BasicManagerConfig{
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
}, a.logger, a.newInstance)
var err error
a.mm, err = instance.NewModalManager(a.reg, a.logger, a.bm, cfg.InstanceMode)
if err != nil {
return nil, fmt.Errorf("failed to create modal instance manager: %w", err)
}
a.cluster, err = cluster.New(a.logger, reg, cfg.ServiceConfig, a.mm, a.Validate)
if err != nil {
return nil, err
}
if err := a.ApplyConfig(cfg); err != nil {
return nil, err
}
go a.run()
return a, nil
}
func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error) {
a.mut.RLock()
defer a.mut.RUnlock()
instanceLabel := "instance_name"
if a.cfg.InstanceMode == instance.ModeShared {
instanceLabel = "instance_group_name"
}
reg := prometheus.WrapRegistererWith(prometheus.Labels{
instanceLabel: c.Name,
}, a.reg)
return a.instanceFactory(reg, c, a.cfg.WALDir, a.logger)
}
func (a *Agent) Validate(c *instance.Config) error {
a.mut.RLock()
defer a.mut.RUnlock()
if a.cfg.WALDir == "" {
return fmt.Errorf("no wal_directory configured")
}
if err := c.ApplyDefaults(a.cfg.Global); err != nil {
return fmt.Errorf("failed to apply defaults to %q: %w", c.Name, err)
}
return nil
}
func (a *Agent) ApplyConfig(cfg Config) error {
a.mut.Lock()
defer a.mut.Unlock()
if util.CompareYAML(a.cfg, cfg) {
return nil
}
if a.stopped {
return fmt.Errorf("agent stopped")
}
if a.cleaner != nil {
a.cleaner.Stop()
a.cleaner = nil
}
if cfg.WALDir != "" {
a.cleaner = NewWALCleaner(
a.logger,
a.mm,
cfg.WALDir,
cfg.WALCleanupAge,
cfg.WALCleanupPeriod,
)
}
a.bm.UpdateManagerConfig(instance.BasicManagerConfig{
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
})
if err := a.mm.SetMode(cfg.InstanceMode); err != nil {
return err
}
if err := a.cluster.ApplyConfig(cfg.ServiceConfig); err != nil {
return fmt.Errorf("failed to apply cluster config: %w", err)
}
oldConfig := a.cfg
a.actor <- func() {
a.syncInstances(oldConfig, cfg)
a.initialBootDone.Store(true)
}
a.cfg = cfg
return nil
}
func (a *Agent) syncInstances(oldConfig, newConfig Config) {
for _, c := range newConfig.Configs {
if err := a.mm.ApplyConfig(c); err != nil {
level.Error(a.logger).Log("msg", "failed to apply config", "name", c.Name, "err", err)
}
}
for _, oc := range oldConfig.Configs {
foundConfig := false
for _, nc := range newConfig.Configs {
if nc.Name == oc.Name {
foundConfig = true
break
}
}
if foundConfig {
continue
}
if err := a.mm.DeleteConfig(oc.Name); err != nil {
level.Error(a.logger).Log("msg", "failed to delete old config", "name", oc.Name, "err", err)
}
}
}
func (a *Agent) run() {
for f := range a.actor {
f()
}
}
func (a *Agent) Ready() bool {
if !a.initialBootDone.Load() {
return false
}
for _, inst := range a.mm.ListInstances() {
if !inst.Ready() {
return false
}
}
return true
}
func (a *Agent) WireGRPC(s *grpc.Server) {
a.cluster.WireGRPC(s)
}
func (a *Agent) Config() Config { return a.cfg }
func (a *Agent) InstanceManager() instance.Manager { return a.mm }
func (a *Agent) Stop() {
a.mut.Lock()
defer a.mut.Unlock()
a.stopOnce.Do(func() {
close(a.actor)
})
a.cluster.Stop()
if a.cleaner != nil {
a.cleaner.Stop()
}
a.mm.Stop()
a.stopped = true
}
type instanceFactory = func(reg prometheus.Registerer, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error)
func defaultInstanceFactory(reg prometheus.Registerer, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error) {
return instance.New(reg, cfg, walDir, logger)
}