package discovery12import (3"context"4"sort"5"sync"6"time"78"github.com/grafana/agent/component"9"github.com/grafana/agent/pkg/cluster"10"github.com/prometheus/prometheus/discovery"11"github.com/prometheus/prometheus/discovery/targetgroup"12"github.com/prometheus/prometheus/model/labels"13"github.com/rfratto/ckit/shard"14)1516// Target refers to a singular discovered endpoint found by a discovery17// component.18type Target map[string]string1920// DistributedTargets uses the node's Lookup method to distribute discovery21// targets when a Flow component runs in a cluster.22type DistributedTargets struct {23useClustering bool24node cluster.Node25targets []Target26}2728// NewDistributedTargets creates the abstraction that allows components to29// dynamically shard targets between components.30func NewDistributedTargets(e bool, n cluster.Node, t []Target) DistributedTargets {31return DistributedTargets{e, n, t}32}3334// Get distributes discovery targets a clustered environment.35//36// If a cluster size is 1, then all targets will be returned.37func (t *DistributedTargets) Get() []Target {38// TODO(@tpaschalis): Make this into a single code-path to simplify logic.39if !t.useClustering || t.node == nil {40return t.targets41}4243res := make([]Target, 0, (len(t.targets)+1)/len(t.node.Peers()))4445// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;46// eg. this determines how clustering behaves when nodes are shutting down.47for _, tgt := range t.targets {48peers, err := t.node.Lookup(shard.StringKey(tgt.Labels().String()), 1, shard.OpReadWrite)49if err != nil {50// This can only fail in case we ask for more owners than the51// available peers. This will never happen, but in any case we fall52// back to owning the target ourselves.53res = append(res, tgt)54}55if peers[0].Self {56res = append(res, tgt)57}58}5960return res61}6263// Labels converts Target into a set of sorted labels.64func (t Target) Labels() labels.Labels {65var lset labels.Labels66for k, v := range t {67lset = append(lset, labels.Label{Name: k, Value: v})68}69sort.Sort(lset)70return lset71}7273// Exports holds values which are exported by all discovery components.74type Exports struct {75Targets []Target `river:"targets,attr"`76}7778// Discoverer is an alias for Prometheus' Discoverer interface, so users of this package don't need79// to import github.com/prometheus/prometheus/discover as well.80type Discoverer discovery.Discoverer8182// Creator is a function provided by an implementation to create a concrete Discoverer instance.83type Creator func(component.Arguments) (Discoverer, error)8485// Component is a reusable component for any discovery implementation.86// it will handle dynamic updates and exporting targets appropriately for a scrape implementation.87type Component struct {88opts component.Options8990discMut sync.Mutex91latestDisc discovery.Discoverer92newDiscoverer chan struct{}9394creator Creator95}9697// New creates a discovery component given arguments and a concrete Discovery implementation function.98func New(o component.Options, args component.Arguments, creator Creator) (*Component, error) {99c := &Component{100opts: o,101creator: creator,102// buffered to avoid deadlock from the first immediate update103newDiscoverer: make(chan struct{}, 1),104}105return c, c.Update(args)106}107108// Run implements component.Component.109func (c *Component) Run(ctx context.Context) error {110var cancel context.CancelFunc111for {112select {113case <-ctx.Done():114return nil115case <-c.newDiscoverer:116// cancel any previously running discovery117if cancel != nil {118cancel()119}120// create new context so we can cancel it if we get any future updates121// since it is derived from the main run context, it only needs to be122// canceled directly if we receive new updates123newCtx, cancelFunc := context.WithCancel(ctx)124cancel = cancelFunc125126// finally run discovery127c.discMut.Lock()128disc := c.latestDisc129c.discMut.Unlock()130go c.runDiscovery(newCtx, disc)131}132}133}134135// Update implements component.Component.136func (c *Component) Update(args component.Arguments) error {137disc, err := c.creator(args)138if err != nil {139return err140}141c.discMut.Lock()142c.latestDisc = disc143c.discMut.Unlock()144145select {146case c.newDiscoverer <- struct{}{}:147default:148}149150return nil151}152153// maxUpdateFrequency is the minimum time to wait between updating targets.154// Currently not settable, since prometheus uses a static threshold, but155// we could reconsider later.156const maxUpdateFrequency = 5 * time.Second157158// runDiscovery is a utility for consuming and forwarding target groups from a discoverer.159// It will handle collating targets (and clearing), as well as time based throttling of updates.160func (c *Component) runDiscovery(ctx context.Context, d Discoverer) {161// all targets we have seen so far162cache := map[string]*targetgroup.Group{}163164ch := make(chan []*targetgroup.Group)165go d.Run(ctx, ch)166167// function to convert and send targets in format scraper expects168send := func() {169allTargets := []Target{}170for _, group := range cache {171for _, target := range group.Targets {172labels := map[string]string{}173// first add the group labels, and then the174// target labels, so that target labels take precedence.175for k, v := range group.Labels {176labels[string(k)] = string(v)177}178for k, v := range target {179labels[string(k)] = string(v)180}181allTargets = append(allTargets, labels)182}183}184c.opts.OnStateChange(Exports{Targets: allTargets})185}186187ticker := time.NewTicker(maxUpdateFrequency)188// true if we have received new targets and need to send.189haveUpdates := false190for {191select {192case <-ticker.C:193if haveUpdates {194send()195haveUpdates = false196}197case <-ctx.Done():198send()199return200case groups := <-ch:201for _, group := range groups {202// Discoverer will send an empty target set to indicate the group (keyed by Source field)203// should be removed204if len(group.Targets) == 0 {205delete(cache, group.Source)206} else {207cache[group.Source] = group208}209}210haveUpdates = true211}212}213}214215216