package instance
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/metrics/wal"
"github.com/grafana/agent/pkg/util"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
)
func init() {
remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
scrape.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
config.DefaultRemoteWriteConfig.SendExemplars = true
}
var (
DefaultConfig = Config{
HostFilter: false,
WALTruncateFrequency: 60 * time.Minute,
MinWALTime: 5 * time.Minute,
MaxWALTime: 4 * time.Hour,
RemoteFlushDeadline: 1 * time.Minute,
WriteStaleOnShutdown: false,
global: DefaultGlobalConfig,
}
)
type Config struct {
Name string `yaml:"name,omitempty"`
HostFilter bool `yaml:"host_filter,omitempty"`
HostFilterRelabelConfigs []*relabel.Config `yaml:"host_filter_relabel_configs,omitempty"`
ScrapeConfigs []*config.ScrapeConfig `yaml:"scrape_configs,omitempty"`
RemoteWrite []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"`
WALTruncateFrequency time.Duration `yaml:"wal_truncate_frequency,omitempty"`
MinWALTime time.Duration `yaml:"min_wal_time,omitempty"`
MaxWALTime time.Duration `yaml:"max_wal_time,omitempty"`
RemoteFlushDeadline time.Duration `yaml:"remote_flush_deadline,omitempty"`
WriteStaleOnShutdown bool `yaml:"write_stale_on_shutdown,omitempty"`
global GlobalConfig `yaml:"-"`
}
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig
type plain Config
return unmarshal((*plain)(c))
}
func (c Config) MarshalYAML() (interface{}, error) {
bb, err := MarshalConfig(&c, true)
if err != nil {
return nil, err
}
var m yaml.MapSlice
if err := yaml.Unmarshal(bb, &m); err != nil {
return nil, err
}
return m, nil
}
func (c *Config) ApplyDefaults(global GlobalConfig) error {
c.global = global
switch {
case c.Name == "":
return errors.New("missing instance name")
case c.WALTruncateFrequency <= 0:
return errors.New("wal_truncate_frequency must be greater than 0s")
case c.RemoteFlushDeadline <= 0:
return errors.New("remote_flush_deadline must be greater than 0s")
case c.MinWALTime > c.MaxWALTime:
return errors.New("min_wal_time must be less than max_wal_time")
}
jobNames := map[string]struct{}{}
for _, sc := range c.ScrapeConfigs {
if sc == nil {
return fmt.Errorf("empty or null scrape config section")
}
if sc.ScrapeInterval == 0 {
sc.ScrapeInterval = c.global.Prometheus.ScrapeInterval
}
if sc.ScrapeTimeout > sc.ScrapeInterval {
return fmt.Errorf("scrape timeout greater than scrape interval for scrape config with job name %q", sc.JobName)
}
if time.Duration(sc.ScrapeInterval) > c.WALTruncateFrequency {
return fmt.Errorf("scrape interval greater than wal_truncate_frequency for scrape config with job name %q", sc.JobName)
}
if sc.ScrapeTimeout == 0 {
if c.global.Prometheus.ScrapeTimeout > sc.ScrapeInterval {
sc.ScrapeTimeout = sc.ScrapeInterval
} else {
sc.ScrapeTimeout = c.global.Prometheus.ScrapeTimeout
}
}
if _, exists := jobNames[sc.JobName]; exists {
return fmt.Errorf("found multiple scrape configs with job name %q", sc.JobName)
}
jobNames[sc.JobName] = struct{}{}
}
rwNames := map[string]struct{}{}
if len(c.RemoteWrite) == 0 {
c.RemoteWrite = c.global.RemoteWrite
}
for _, cfg := range c.RemoteWrite {
if cfg == nil {
return fmt.Errorf("empty or null remote write config section")
}
var generatedName bool
if cfg.Name == "" {
hash, err := getHash(cfg)
if err != nil {
return err
}
cfg.Name = c.Name + "-" + hash[:6]
generatedName = true
}
if _, exists := rwNames[cfg.Name]; exists {
if generatedName {
return fmt.Errorf("found two identical remote_write configs")
}
return fmt.Errorf("found duplicate remote write configs with name %q", cfg.Name)
}
rwNames[cfg.Name] = struct{}{}
}
return nil
}
func (c *Config) Clone() (Config, error) {
bb, err := MarshalConfig(c, false)
if err != nil {
return Config{}, err
}
cp, err := UnmarshalConfig(bytes.NewReader(bb))
if err != nil {
return Config{}, err
}
cp.global = c.global
if cp.ScrapeConfigs == nil && c.ScrapeConfigs != nil {
cp.ScrapeConfigs = []*config.ScrapeConfig{}
}
if cp.RemoteWrite == nil && c.RemoteWrite != nil {
cp.RemoteWrite = []*config.RemoteWriteConfig{}
}
return *cp, nil
}
type walStorageFactory func(reg prometheus.Registerer) (walStorage, error)
type Instance struct {
mut sync.Mutex
cfg Config
wal walStorage
discovery *discoveryService
readyScrapeManager *readyScrapeManager
remoteStore *remote.Storage
storage storage.Storage
ready atomic.Bool
hostFilter *HostFilter
logger log.Logger
reg prometheus.Registerer
newWal walStorageFactory
}
func New(reg prometheus.Registerer, cfg Config, walDir string, logger log.Logger) (*Instance, error) {
logger = log.With(logger, "instance", cfg.Name)
instWALDir := filepath.Join(walDir, cfg.Name)
newWal := func(reg prometheus.Registerer) (walStorage, error) {
return wal.NewStorage(logger, reg, instWALDir)
}
return newInstance(cfg, reg, logger, newWal)
}
func newInstance(cfg Config, reg prometheus.Registerer, logger log.Logger, newWal walStorageFactory) (*Instance, error) {
hostname, err := Hostname()
if err != nil {
return nil, fmt.Errorf("failed to get hostname: %w", err)
}
i := &Instance{
cfg: cfg,
logger: logger,
hostFilter: NewHostFilter(hostname, cfg.HostFilterRelabelConfigs),
reg: reg,
newWal: newWal,
readyScrapeManager: &readyScrapeManager{},
}
return i, nil
}
func (i *Instance) Run(ctx context.Context) error {
i.mut.Lock()
cfg := i.cfg
i.mut.Unlock()
level.Debug(i.logger).Log("msg", "initializing instance", "name", cfg.Name)
trackingReg := util.WrapWithUnregisterer(i.reg)
defer trackingReg.UnregisterAll()
if err := i.initialize(ctx, trackingReg, &cfg); err != nil {
level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err)
return fmt.Errorf("failed to initialize instance: %w", err)
}
rg := runGroupWithContext(ctx)
{
rg.Add(i.discovery.Run, i.discovery.Stop)
}
{
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
rg.Add(
func() error {
i.truncateLoop(ctx, i.wal, &cfg)
level.Info(i.logger).Log("msg", "truncation loop stopped")
return nil
},
func(err error) {
level.Info(i.logger).Log("msg", "stopping truncation loop...")
contextCancel()
},
)
}
{
sm, err := i.readyScrapeManager.Get()
if err != nil {
level.Error(i.logger).Log("msg", "failed to get scrape manager")
return err
}
rg.Add(
func() error {
err := sm.Run(i.discovery.SyncCh())
level.Info(i.logger).Log("msg", "scrape manager stopped")
return err
},
func(err error) {
level.Info(i.logger).Log("msg", "stopping scrape manager...")
sm.Stop()
if err == nil && cfg.WriteStaleOnShutdown {
level.Info(i.logger).Log("msg", "writing staleness markers...")
err := i.wal.WriteStalenessMarkers(i.getRemoteWriteTimestamp)
if err != nil {
level.Error(i.logger).Log("msg", "error writing staleness markers", "err", err)
}
}
level.Info(i.logger).Log("msg", "closing storage...")
if err := i.storage.Close(); err != nil {
level.Error(i.logger).Log("msg", "error stopping storage", "err", err)
}
},
)
}
level.Debug(i.logger).Log("msg", "running instance", "name", cfg.Name)
i.ready.Store(true)
err := rg.Run()
if err != nil {
level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err)
}
return err
}
func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer, cfg *Config) error {
i.mut.Lock()
defer i.mut.Unlock()
if cfg.HostFilter {
i.hostFilter.PatchSD(cfg.ScrapeConfigs)
}
var err error
i.wal, err = i.newWal(reg)
if err != nil {
return fmt.Errorf("error creating WAL: %w", err)
}
i.discovery, err = i.newDiscoveryManager(ctx, cfg)
if err != nil {
return fmt.Errorf("error creating discovery manager: %w", err)
}
i.readyScrapeManager = &readyScrapeManager{}
remoteLogger := log.With(i.logger, "component", "remote")
i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, i.readyScrapeManager)
err = i.remoteStore.ApplyConfig(&config.Config{
GlobalConfig: cfg.global.Prometheus,
RemoteWriteConfigs: cfg.RemoteWrite,
})
if err != nil {
return fmt.Errorf("failed applying config to remote storage: %w", err)
}
i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore)
opts := &scrape.Options{
ExtraMetrics: cfg.global.ExtraMetrics,
HTTPClientOptions: []config_util.HTTPClientOption{},
}
if cfg.global.DisableKeepAlives {
opts.HTTPClientOptions = append(opts.HTTPClientOptions, config_util.WithKeepAlivesDisabled())
}
if cfg.global.IdleConnTimeout > 0 {
opts.HTTPClientOptions = append(opts.HTTPClientOptions, config_util.WithIdleConnTimeout(cfg.global.IdleConnTimeout))
}
scrapeManager := newScrapeManager(opts, log.With(i.logger, "component", "scrape manager"), i.storage)
err = scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: cfg.global.Prometheus,
ScrapeConfigs: cfg.ScrapeConfigs,
})
if err != nil {
return fmt.Errorf("failed applying config to scrape manager: %w", err)
}
i.readyScrapeManager.Set(scrapeManager)
return nil
}
func (i *Instance) Ready() bool {
return i.ready.Load()
}
func (i *Instance) Update(c Config) (err error) {
i.mut.Lock()
defer i.mut.Unlock()
switch {
case i.cfg.Name != c.Name:
err = errImmutableField{Field: "name"}
case i.cfg.HostFilter != c.HostFilter:
err = errImmutableField{Field: "host_filter"}
case i.cfg.WALTruncateFrequency != c.WALTruncateFrequency:
err = errImmutableField{Field: "wal_truncate_frequency"}
case i.cfg.RemoteFlushDeadline != c.RemoteFlushDeadline:
err = errImmutableField{Field: "remote_flush_deadline"}
case i.cfg.WriteStaleOnShutdown != c.WriteStaleOnShutdown:
err = errImmutableField{Field: "write_stale_on_shutdown"}
}
if err != nil {
return ErrInvalidUpdate{Inner: err}
}
if i.discovery == nil || i.remoteStore == nil || i.readyScrapeManager == nil {
return ErrInvalidUpdate{
Inner: fmt.Errorf("cannot dynamically update because instance is not running"),
}
}
originalConfig := i.cfg
defer func() {
if err != nil {
i.cfg = originalConfig
}
}()
i.cfg = c
i.hostFilter.SetRelabels(c.HostFilterRelabelConfigs)
if c.HostFilter {
i.hostFilter.PatchSD(c.ScrapeConfigs)
}
err = i.remoteStore.ApplyConfig(&config.Config{
GlobalConfig: c.global.Prometheus,
RemoteWriteConfigs: c.RemoteWrite,
})
if err != nil {
return fmt.Errorf("error applying new remote_write configs: %w", err)
}
sm, err := i.readyScrapeManager.Get()
if err != nil {
return fmt.Errorf("couldn't get scrape manager to apply new scrape configs: %w", err)
}
err = sm.ApplyConfig(&config.Config{
GlobalConfig: c.global.Prometheus,
ScrapeConfigs: c.ScrapeConfigs,
})
if err != nil {
return fmt.Errorf("error applying updated configs to scrape manager: %w", err)
}
sdConfigs := map[string]discovery.Configs{}
for _, v := range c.ScrapeConfigs {
sdConfigs[v.JobName] = v.ServiceDiscoveryConfigs
}
err = i.discovery.Manager.ApplyConfig(sdConfigs)
if err != nil {
return fmt.Errorf("failed applying configs to discovery manager: %w", err)
}
return nil
}
func (i *Instance) TargetsActive() map[string][]*scrape.Target {
i.mut.Lock()
defer i.mut.Unlock()
if i.readyScrapeManager == nil {
return nil
}
mgr, err := i.readyScrapeManager.Get()
if err == ErrNotReady {
return nil
} else if err != nil {
level.Error(i.logger).Log("msg", "failed to get scrape manager when collecting active targets", "err", err)
return nil
}
return mgr.TargetsActive()
}
func (i *Instance) StorageDirectory() string {
return i.wal.Directory()
}
func (i *Instance) Appender(ctx context.Context) storage.Appender {
return i.wal.Appender(ctx)
}
type discoveryService struct {
Manager *discovery.Manager
RunFunc func() error
StopFunc func(err error)
SyncChFunc func() GroupChannel
}
func (s *discoveryService) Run() error { return s.RunFunc() }
func (s *discoveryService) Stop(err error) { s.StopFunc(err) }
func (s *discoveryService) SyncCh() GroupChannel { return s.SyncChFunc() }
func (i *Instance) newDiscoveryManager(ctx context.Context, cfg *Config) (*discoveryService, error) {
ctx, cancel := context.WithCancel(ctx)
logger := log.With(i.logger, "component", "discovery manager")
manager := discovery.NewManager(ctx, logger, discovery.Name("scrape"))
c := map[string]discovery.Configs{}
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
err := manager.ApplyConfig(c)
if err != nil {
cancel()
level.Error(i.logger).Log("msg", "failed applying config to discovery manager", "err", err)
return nil, fmt.Errorf("failed applying config to discovery manager: %w", err)
}
rg := runGroupWithContext(ctx)
rg.Add(func() error {
err := manager.Run()
level.Info(i.logger).Log("msg", "discovery manager stopped")
return err
}, func(err error) {
level.Info(i.logger).Log("msg", "stopping discovery manager...")
cancel()
})
syncChFunc := manager.SyncCh
if cfg.HostFilter {
rg.Add(func() error {
i.hostFilter.Run(manager.SyncCh())
level.Info(i.logger).Log("msg", "host filterer stopped")
return nil
}, func(_ error) {
level.Info(i.logger).Log("msg", "stopping host filterer...")
i.hostFilter.Stop()
})
syncChFunc = i.hostFilter.SyncCh
}
return &discoveryService{
Manager: manager,
RunFunc: rg.Run,
StopFunc: rg.Stop,
SyncChFunc: syncChFunc,
}, nil
}
func (i *Instance) truncateLoop(ctx context.Context, wal walStorage, cfg *Config) {
var lastTs int64 = math.MinInt64
for {
select {
case <-ctx.Done():
return
case <-time.After(cfg.WALTruncateFrequency):
ts := i.getRemoteWriteTimestamp() - i.cfg.MinWALTime.Milliseconds()
if ts < 0 {
ts = 0
}
if maxTS := timestamp.FromTime(time.Now().Add(-i.cfg.MaxWALTime)); ts < maxTS {
ts = maxTS
}
if ts == lastTs {
level.Debug(i.logger).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts)
continue
}
lastTs = ts
level.Debug(i.logger).Log("msg", "truncating the WAL", "ts", ts)
err := wal.Truncate(ts)
if err != nil {
level.Warn(i.logger).Log("msg", "could not truncate WAL", "err", err)
}
}
}
}
func (i *Instance) getRemoteWriteTimestamp() int64 {
i.mut.Lock()
defer i.mut.Unlock()
if len(i.cfg.RemoteWrite) == 0 {
return timestamp.FromTime(time.Now())
}
if i.remoteStore == nil {
return 0
}
return i.remoteStore.LowestSentTimestamp()
}
type walStorage interface {
storage.Queryable
storage.ChunkQueryable
Directory() string
StartTime() (int64, error)
WriteStalenessMarkers(remoteTsFunc func() int64) error
Appender(context.Context) storage.Appender
Truncate(mint int64) error
Close() error
}
func Hostname() (string, error) {
hostname := os.Getenv("HOSTNAME")
if hostname != "" {
return hostname, nil
}
hostname, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("failed to get hostname: %w", err)
}
return hostname, nil
}
func getHash(data interface{}) (string, error) {
bytes, err := json.Marshal(data)
if err != nil {
return "", err
}
hash := md5.Sum(bytes)
return hex.EncodeToString(hash[:]), nil
}
var managerMtx sync.Mutex
func newScrapeManager(o *scrape.Options, logger log.Logger, app storage.Appendable) *scrape.Manager {
managerMtx.Lock()
defer managerMtx.Unlock()
return scrape.NewManager(o, logger, app)
}
type runGroupContext struct {
cancel context.CancelFunc
g *run.Group
}
func runGroupWithContext(ctx context.Context) *runGroupContext {
ctx, cancel := context.WithCancel(ctx)
var g run.Group
g.Add(func() error {
<-ctx.Done()
return nil
}, func(_ error) {
cancel()
})
return &runGroupContext{cancel: cancel, g: &g}
}
func (rg *runGroupContext) Add(execute func() error, interrupt func(error)) {
rg.g.Add(execute, interrupt)
}
func (rg *runGroupContext) Run() error { return rg.g.Run() }
func (rg *runGroupContext) Stop(_ error) { rg.cancel() }
var ErrNotReady = errors.New("Scrape manager not ready")
type readyScrapeManager struct {
mtx sync.RWMutex
m *scrape.Manager
}
func (rm *readyScrapeManager) Set(m *scrape.Manager) {
rm.mtx.Lock()
defer rm.mtx.Unlock()
rm.m = m
}
func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
rm.mtx.RLock()
defer rm.mtx.RUnlock()
if rm.m != nil {
return rm.m, nil
}
return nil, ErrNotReady
}