package integrations
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/integrations/v2/autoscrape"
"github.com/prometheus/prometheus/discovery"
http_sd "github.com/prometheus/prometheus/discovery/http"
"go.uber.org/atomic"
)
type controllerConfig []Config
type controller struct {
logger log.Logger
mut sync.Mutex
cfg controllerConfig
globals Globals
integrations []*controlledIntegration
runIntegrations chan []*controlledIntegration
}
func newController(l log.Logger, cfg controllerConfig, globals Globals) (*controller, error) {
c := &controller{
logger: l,
runIntegrations: make(chan []*controlledIntegration, 1),
}
if err := c.UpdateController(cfg, globals); err != nil {
return nil, err
}
return c, nil
}
func (c *controller) run(ctx context.Context) {
pool := newWorkerPool(ctx, c.logger)
defer pool.Close()
for {
select {
case <-ctx.Done():
level.Debug(c.logger).Log("msg", "controller exiting")
return
case newIntegrations := <-c.runIntegrations:
pool.Reload(newIntegrations)
}
}
}
type controlledIntegration struct {
id integrationID
i Integration
c Config
running atomic.Bool
}
func (ci *controlledIntegration) Running() bool {
return ci.running.Load()
}
type integrationID struct{ Name, Identifier string }
func (id integrationID) String() string {
return fmt.Sprintf("%s/%s", id.Name, id.Identifier)
}
func (c *controller) UpdateController(cfg controllerConfig, globals Globals) error {
c.mut.Lock()
defer c.mut.Unlock()
var (
duplicatedSingletons []string
singletonSet = make(map[string]struct{})
)
for _, cfg := range cfg {
t, _ := RegisteredType(cfg)
if t != TypeSingleton {
continue
}
if _, exists := singletonSet[cfg.Name()]; exists {
duplicatedSingletons = append(duplicatedSingletons, cfg.Name())
continue
}
singletonSet[cfg.Name()] = struct{}{}
}
if len(duplicatedSingletons) == 1 {
return fmt.Errorf("integration %q may only be defined once", duplicatedSingletons[0])
} else if len(duplicatedSingletons) > 1 {
list := strings.Join(duplicatedSingletons, ", ")
return fmt.Errorf("the following integrations may only be defined once each: %s", list)
}
integrationIDMap := map[integrationID]struct{}{}
integrations := make([]*controlledIntegration, 0, len(cfg))
NextConfig:
for _, ic := range cfg {
name := ic.Name()
identifier, err := ic.Identifier(globals)
if err != nil {
return fmt.Errorf("could not build identifier for integration %q: %w", name, err)
}
if err := ic.ApplyDefaults(globals); err != nil {
return fmt.Errorf("failed to apply defaults for %s/%s: %w", name, identifier, err)
}
id := integrationID{Name: name, Identifier: identifier}
if _, exist := integrationIDMap[id]; exist {
return fmt.Errorf("multiple instance names %q in integration %q", identifier, name)
}
integrationIDMap[id] = struct{}{}
for _, ci := range c.integrations {
if ci.id != id {
continue
}
if CompareConfigs(ci.c, ic) {
integrations = append(integrations, ci)
continue NextConfig
}
if ui, ok := ci.i.(UpdateIntegration); ok {
if err := ui.ApplyConfig(ic, globals); errors.Is(err, ErrInvalidUpdate) {
level.Warn(c.logger).Log("msg", "failed to dynamically update integration; will recreate", "integration", name, "instance", identifier, "err', err")
break
} else if err != nil {
return fmt.Errorf("failed to update %s integration %q: %w", name, identifier, err)
} else {
integrations = append(integrations, ci)
continue NextConfig
}
}
break
}
logger := log.With(c.logger, "integration", name, "instance", identifier)
integration, err := ic.NewIntegration(logger, globals)
if err != nil {
return fmt.Errorf("failed to construct %s integration %q: %w", name, identifier, err)
}
integrations = append(integrations, &controlledIntegration{
id: id,
i: integration,
c: ic,
})
}
c.runIntegrations <- integrations
c.cfg = cfg
c.globals = globals
c.integrations = integrations
return nil
}
func (c *controller) Handler(prefix string) (http.Handler, error) {
var firstErr error
saveFirstErr := func(err error) {
if firstErr == nil {
firstErr = err
}
}
r := mux.NewRouter()
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
id := ci.id
i, ok := ci.i.(HTTPIntegration)
if !ok {
return
}
handler, err := i.Handler(iprefix + "/")
if err != nil {
saveFirstErr(fmt.Errorf("could not generate HTTP handler for %s integration %q: %w", id.Name, id.Identifier, err))
return
} else if handler == nil {
return
}
hfunc := func(rw http.ResponseWriter, r *http.Request) {
if !ci.Running() {
http.Error(rw, fmt.Sprintf("%s integration intance %q not running", id.Name, id.Identifier), http.StatusServiceUnavailable)
return
}
handler.ServeHTTP(rw, r)
}
r.PathPrefix(iprefix + "/").HandlerFunc(hfunc)
r.HandleFunc(iprefix, hfunc)
})
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to build HTTP handlers", "err", err)
}
return r, firstErr
}
func (c *controller) forEachIntegration(basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
c.mut.Lock()
defer c.mut.Unlock()
identifiersMap := map[string][]string{}
for _, i := range c.integrations {
identifiersMap[i.id.Name] = append(identifiersMap[i.id.Name], i.id.Identifier)
}
usedPrefixes := map[string]struct{}{}
for _, ci := range c.integrations {
id := ci.id
multipleInstances := len(identifiersMap[id.Name]) > 1
var integrationPrefix string
if multipleInstances {
integrationPrefix = path.Join(basePrefix, id.Name, id.Identifier)
} else {
integrationPrefix = path.Join(basePrefix, id.Name)
}
f(ci, integrationPrefix)
if _, exist := usedPrefixes[integrationPrefix]; exist {
return fmt.Errorf("BUG: duplicate integration prefix %q", integrationPrefix)
}
usedPrefixes[integrationPrefix] = struct{}{}
}
return nil
}
func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
type prefixedMetricsIntegration struct {
id integrationID
i MetricsIntegration
ep Endpoint
}
var mm []prefixedMetricsIntegration
err := c.forEachIntegration(ep.Prefix, func(ci *controlledIntegration, iprefix string) {
if !ci.Running() {
return
}
if mi, ok := ci.i.(MetricsIntegration); ok {
ep := Endpoint{Host: ep.Host, Prefix: iprefix}
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, ep: ep})
}
})
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get targets", "err", err)
}
var tgs []*targetGroup
for _, mi := range mm {
if len(opts.Integrations) > 0 && !stringSliceContains(opts.Integrations, mi.id.Name) {
continue
}
if opts.Instance != "" && mi.id.Identifier != opts.Instance {
continue
}
for _, tgt := range mi.i.Targets(mi.ep) {
tgs = append(tgs, (*targetGroup)(tgt))
}
}
sort.Slice(tgs, func(i, j int) bool {
return tgs[i].Source < tgs[j].Source
})
return tgs
}
func stringSliceContains(ss []string, s string) bool {
for _, check := range ss {
if check == s {
return true
}
}
return false
}
type TargetOptions struct {
Integrations []string
Instance string
}
func TargetOptionsFromParams(u url.Values) (TargetOptions, error) {
var to TargetOptions
rawIntegrations := u.Get("integrations")
if rawIntegrations != "" {
rawIntegrations, err := url.QueryUnescape(rawIntegrations)
if err != nil {
return to, fmt.Errorf("invalid value for integrations: %w", err)
}
to.Integrations = strings.Split(rawIntegrations, ",")
}
rawInstance := u.Get("instance")
if rawInstance != "" {
rawInstance, err := url.QueryUnescape(rawInstance)
if err != nil {
return to, fmt.Errorf("invalid value for instance: %w", err)
}
to.Instance = rawInstance
}
return to, nil
}
func (to TargetOptions) ToParams() url.Values {
p := make(url.Values)
if len(to.Integrations) != 0 {
p.Set("integrations", url.QueryEscape(strings.Join(to.Integrations, ",")))
}
if to.Instance != "" {
p.Set("instance", url.QueryEscape(to.Instance))
}
return p
}
func (c *controller) ScrapeConfigs(prefix string, sdConfig *http_sd.SDConfig) []*autoscrape.ScrapeConfig {
type prefixedMetricsIntegration struct {
id integrationID
i MetricsIntegration
prefix string
}
var mm []prefixedMetricsIntegration
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
if mi, ok := ci.i.(MetricsIntegration); ok {
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, prefix: iprefix})
}
})
if err != nil {
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get scrape configs", "err", err)
}
var cfgs []*autoscrape.ScrapeConfig
for _, mi := range mm {
opts := TargetOptions{
Integrations: []string{mi.id.Name},
Instance: mi.id.Identifier,
}
integrationSDConfig := *sdConfig
integrationSDConfig.URL = sdConfig.URL + "?" + opts.ToParams().Encode()
sds := discovery.Configs{&integrationSDConfig}
cfgs = append(cfgs, mi.i.ScrapeConfigs(sds)...)
}
sort.Slice(cfgs, func(i, j int) bool {
return cfgs[i].Config.JobName < cfgs[j].Config.JobName
})
return cfgs
}