package integrations
import (
"context"
"fmt"
"net/http"
"path"
"strings"
"sync"
"time"
config_util "github.com/prometheus/common/config"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/metrics"
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/grafana/agent/pkg/metrics/instance/configstore"
"github.com/grafana/agent/pkg/server"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"
)
var (
integrationAbnormalExits = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "agent_metrics_integration_abnormal_exits_total",
Help: "Total number of times an agent integration exited unexpectedly, causing it to be restarted.",
}, []string{"integration_name"})
)
var CurrentManagerConfig ManagerConfig = DefaultManagerConfig()
func DefaultManagerConfig() ManagerConfig {
return ManagerConfig{
ScrapeIntegrations: true,
IntegrationRestartBackoff: 5 * time.Second,
UseHostnameLabel: true,
ReplaceInstanceLabel: true,
}
}
type ManagerConfig struct {
ScrapeIntegrations bool `yaml:"scrape_integrations,omitempty"`
Integrations Configs `yaml:"-"`
Labels model.LabelSet `yaml:"labels,omitempty"`
PrometheusRemoteWrite []*promConfig.RemoteWriteConfig `yaml:"prometheus_remote_write,omitempty"`
IntegrationRestartBackoff time.Duration `yaml:"integration_restart_backoff,omitempty"`
ListenPort int `yaml:"-"`
ListenHost string `yaml:"-"`
TLSConfig config_util.TLSConfig `yaml:"http_tls_config,omitempty"`
ServerUsingTLS bool `yaml:"-"`
PrometheusGlobalConfig promConfig.GlobalConfig `yaml:"-"`
ReplaceInstanceLabel bool `yaml:"replace_instance_label,omitempty"`
UseHostnameLabel bool `yaml:"use_hostname_label,omitempty"`
}
func (c ManagerConfig) MarshalYAML() (interface{}, error) {
return MarshalYAML(c)
}
func (c *ManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultManagerConfig()
return UnmarshalYAML(c, unmarshal)
}
func (c *ManagerConfig) DefaultRelabelConfigs(instanceKey string) []*relabel.Config {
return []*relabel.Config{{
SourceLabels: model.LabelNames{model.AddressLabel},
Action: relabel.Replace,
Separator: ";",
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: instanceKey,
TargetLabel: model.InstanceLabel,
}}
}
func (c *ManagerConfig) ApplyDefaults(sflags *server.Flags, mcfg *metrics.Config) error {
host, port, err := sflags.HTTP.ListenHostPort()
if err != nil {
return fmt.Errorf("reading HTTP host:port: %w", err)
}
c.ListenHost = host
c.ListenPort = port
c.ServerUsingTLS = sflags.HTTP.UseTLS
if len(c.PrometheusRemoteWrite) == 0 {
c.PrometheusRemoteWrite = mcfg.Global.RemoteWrite
}
c.PrometheusGlobalConfig = mcfg.Global.Prometheus
for _, ic := range c.Integrations {
if !ic.Common.Enabled {
continue
}
scrapeIntegration := c.ScrapeIntegrations
if common := ic.Common; common.ScrapeIntegration != nil {
scrapeIntegration = *common.ScrapeIntegration
}
if scrapeIntegration && mcfg.WALDir == "" {
return fmt.Errorf("no wal_directory configured")
}
}
return nil
}
type Manager struct {
logger log.Logger
cfgMut sync.RWMutex
cfg ManagerConfig
hostname string
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
im instance.Manager
validator configstore.Validator
integrationsMut sync.RWMutex
integrations map[string]*integrationProcess
handlerMut sync.Mutex
handlerCache map[string]handlerCacheEntry
}
func NewManager(cfg ManagerConfig, logger log.Logger, im instance.Manager, validate configstore.Validator) (*Manager, error) {
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
logger: logger,
ctx: ctx,
cancel: cancel,
im: im,
validator: validate,
integrations: make(map[string]*integrationProcess, len(cfg.Integrations)),
handlerCache: make(map[string]handlerCacheEntry),
}
var err error
m.hostname, err = instance.Hostname()
if err != nil {
return nil, err
}
if err := m.ApplyConfig(cfg); err != nil {
return nil, fmt.Errorf("failed applying config: %w", err)
}
return m, nil
}
func (m *Manager) ApplyConfig(cfg ManagerConfig) error {
var failed bool
m.cfgMut.Lock()
defer m.cfgMut.Unlock()
m.integrationsMut.Lock()
defer m.integrationsMut.Unlock()
if util.CompareYAML(m.cfg, cfg) && util.CompareYAML(m.cfg.PrometheusGlobalConfig, cfg.PrometheusGlobalConfig) {
level.Debug(m.logger).Log("msg", "Integrations config is unchanged skipping apply")
return nil
}
level.Debug(m.logger).Log("msg", "Applying integrations config changes")
select {
case <-m.ctx.Done():
return fmt.Errorf("Manager already stopped")
default:
}
for _, ic := range cfg.Integrations {
if !ic.Common.Enabled {
continue
}
key := integrationKey(ic.Name())
if p, exist := m.integrations[key]; exist {
if util.CompareYAMLWithHook(p.cfg, ic, noScrubbedSecretsHook) {
continue
}
p.stop()
delete(m.integrations, key)
}
l := log.With(m.logger, "integration", ic.Name())
i, err := ic.NewIntegration(l)
if err != nil {
level.Error(m.logger).Log("msg", "failed to initialize integration. it will not run or be scraped", "integration", ic.Name(), "err", err)
failed = true
_ = m.im.DeleteConfig(key)
continue
}
var instanceKey string
if kp := ic.Common.InstanceKey; kp != nil {
instanceKey = strings.TrimSpace(*kp)
} else {
instanceKey, err = ic.InstanceKey(fmt.Sprintf("%s:%d", m.hostname, cfg.ListenPort))
if err != nil {
level.Error(m.logger).Log("msg", "failed to get instance key for integration. it will not run or be scraped", "integration", ic.Name(), "err", err)
failed = true
_ = m.im.DeleteConfig(key)
continue
}
}
ctx, cancel := context.WithCancel(m.ctx)
p := &integrationProcess{
log: m.logger,
cfg: ic,
i: i,
instanceKey: instanceKey,
ctx: ctx,
stop: cancel,
wg: &m.wg,
wait: m.instanceBackoff,
}
go p.Run()
m.integrations[key] = p
}
for key, process := range m.integrations {
foundConfig := false
for _, ic := range cfg.Integrations {
if integrationKey(ic.Name()) == key {
if !ic.Common.Enabled {
break
}
foundConfig = true
break
}
}
if foundConfig {
continue
}
_ = m.im.DeleteConfig(key)
process.stop()
delete(m.integrations, key)
}
for key, p := range m.integrations {
shouldCollect := cfg.ScrapeIntegrations
if common := p.cfg.Common; common.ScrapeIntegration != nil {
shouldCollect = *common.ScrapeIntegration
}
switch shouldCollect {
case true:
instanceConfig := m.instanceConfigForIntegration(p, cfg)
if err := m.validator(&instanceConfig); err != nil {
level.Error(p.log).Log("msg", "failed to validate generated scrape config for integration. integration will not be scraped", "err", err, "integration", p.cfg.Name())
failed = true
break
}
if err := m.im.ApplyConfig(instanceConfig); err != nil {
level.Error(p.log).Log("msg", "failed to apply integration. integration will not be scraped", "err", err, "integration", p.cfg.Name())
failed = true
}
case false:
_ = m.im.DeleteConfig(key)
}
}
m.cfg = cfg
if failed {
return fmt.Errorf("not all integrations were correctly updated")
}
return nil
}
func noScrubbedSecretsHook(in interface{}) (ok bool, out interface{}, err error) {
switch v := in.(type) {
case config_util.Secret:
return true, string(v), nil
case *config_util.URL:
return true, v.String(), nil
default:
return false, nil, nil
}
}
type integrationProcess struct {
log log.Logger
ctx context.Context
stop context.CancelFunc
cfg UnmarshaledConfig
instanceKey string
i Integration
wg *sync.WaitGroup
wait func(cfg Config, err error)
}
func (p *integrationProcess) Run() {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
level.Error(p.log).Log("msg", "integration has panicked. THIS IS A BUG!", "err", err, "integration", p.cfg.Name())
}
}()
p.wg.Add(1)
defer p.wg.Done()
for {
err := p.i.Run(p.ctx)
if err != nil && err != context.Canceled {
p.wait(p.cfg, err)
} else {
level.Info(p.log).Log("msg", "stopped integration", "integration", p.cfg.Name())
break
}
}
}
func (m *Manager) instanceBackoff(cfg Config, err error) {
m.cfgMut.RLock()
defer m.cfgMut.RUnlock()
integrationAbnormalExits.WithLabelValues(cfg.Name()).Inc()
level.Error(m.logger).Log("msg", "integration stopped abnormally, restarting after backoff", "err", err, "integration", cfg.Name(), "backoff", m.cfg.IntegrationRestartBackoff)
time.Sleep(m.cfg.IntegrationRestartBackoff)
}
func (m *Manager) instanceConfigForIntegration(p *integrationProcess, cfg ManagerConfig) instance.Config {
common := p.cfg.Common
relabelConfigs := append(cfg.DefaultRelabelConfigs(p.instanceKey), common.RelabelConfigs...)
schema := "http"
var httpClientConfig config_util.HTTPClientConfig
if cfg.ServerUsingTLS {
schema = "https"
httpClientConfig.TLSConfig = cfg.TLSConfig
}
var scrapeConfigs []*promConfig.ScrapeConfig
for _, isc := range p.i.ScrapeConfigs() {
sc := &promConfig.ScrapeConfig{
JobName: fmt.Sprintf("integrations/%s", isc.JobName),
MetricsPath: path.Join("/integrations", p.cfg.Name(), isc.MetricsPath),
Params: isc.QueryParams,
Scheme: schema,
HonorLabels: false,
HonorTimestamps: true,
ScrapeInterval: model.Duration(common.ScrapeInterval),
ScrapeTimeout: model.Duration(common.ScrapeTimeout),
ServiceDiscoveryConfigs: m.scrapeServiceDiscovery(cfg),
RelabelConfigs: relabelConfigs,
MetricRelabelConfigs: common.MetricRelabelConfigs,
HTTPClientConfig: httpClientConfig,
}
scrapeConfigs = append(scrapeConfigs, sc)
}
instanceCfg := instance.DefaultConfig
instanceCfg.Name = integrationKey(p.cfg.Name())
instanceCfg.ScrapeConfigs = scrapeConfigs
instanceCfg.RemoteWrite = cfg.PrometheusRemoteWrite
if common.WALTruncateFrequency > 0 {
instanceCfg.WALTruncateFrequency = common.WALTruncateFrequency
}
return instanceCfg
}
func integrationKey(name string) string {
return fmt.Sprintf("integration/%s", name)
}
func (m *Manager) scrapeServiceDiscovery(cfg ManagerConfig) discovery.Configs {
newHost := cfg.ListenHost
if newHost == "" {
newHost = "127.0.0.1"
}
localAddr := fmt.Sprintf("%s:%d", newHost, cfg.ListenPort)
labels := model.LabelSet{}
labels[model.LabelName("agent_hostname")] = model.LabelValue(m.hostname)
for k, v := range cfg.Labels {
labels[k] = v
}
return discovery.Configs{
discovery.StaticConfig{{
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(localAddr)}},
Labels: labels,
}},
}
}
func (m *Manager) WireAPI(r *mux.Router) {
r.HandleFunc("/integrations/{name}/metrics", func(rw http.ResponseWriter, r *http.Request) {
m.integrationsMut.RLock()
defer m.integrationsMut.RUnlock()
key := integrationKey(mux.Vars(r)["name"])
handler := m.loadHandler(key)
handler.ServeHTTP(rw, r)
})
}
func (m *Manager) loadHandler(key string) http.Handler {
m.handlerMut.Lock()
defer m.handlerMut.Unlock()
p, ok := m.integrations[key]
if !ok {
delete(m.handlerCache, key)
return http.NotFoundHandler()
}
cacheEntry, ok := m.handlerCache[key]
if ok && cacheEntry.process == p {
return cacheEntry.handler
}
handler, err := p.i.MetricsHandler()
if err != nil {
level.Error(m.logger).Log("msg", "could not create http handler for integration", "integration", p.cfg.Name(), "err", err)
return http.HandlerFunc(internalServiceError)
}
cacheEntry = handlerCacheEntry{handler: handler, process: p}
m.handlerCache[key] = cacheEntry
return cacheEntry.handler
}
func internalServiceError(w http.ResponseWriter, r *http.Request) {
http.Error(w, "500 Internal Server Error", http.StatusInternalServerError)
}
func (m *Manager) Stop() {
m.cancel()
m.wg.Wait()
}
type handlerCacheEntry struct {
handler http.Handler
process *integrationProcess
}