Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/discovery/discovery.go
4094 views
1
package discovery
2
3
import (
4
"context"
5
"sort"
6
"sync"
7
"time"
8
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/pkg/cluster"
11
"github.com/prometheus/prometheus/discovery"
12
"github.com/prometheus/prometheus/discovery/targetgroup"
13
"github.com/prometheus/prometheus/model/labels"
14
"github.com/rfratto/ckit/shard"
15
)
16
17
// Target refers to a singular discovered endpoint found by a discovery
18
// component.
19
type Target map[string]string
20
21
// DistributedTargets uses the node's Lookup method to distribute discovery
22
// targets when a Flow component runs in a cluster.
23
type DistributedTargets struct {
24
useClustering bool
25
node cluster.Node
26
targets []Target
27
}
28
29
// NewDistributedTargets creates the abstraction that allows components to
30
// dynamically shard targets between components.
31
func NewDistributedTargets(e bool, n cluster.Node, t []Target) DistributedTargets {
32
return DistributedTargets{e, n, t}
33
}
34
35
// Get distributes discovery targets a clustered environment.
36
//
37
// If a cluster size is 1, then all targets will be returned.
38
func (t *DistributedTargets) Get() []Target {
39
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
40
if !t.useClustering || t.node == nil {
41
return t.targets
42
}
43
44
res := make([]Target, 0, (len(t.targets)+1)/len(t.node.Peers()))
45
46
// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;
47
// eg. this determines how clustering behaves when nodes are shutting down.
48
for _, tgt := range t.targets {
49
peers, err := t.node.Lookup(shard.StringKey(tgt.Labels().String()), 1, shard.OpReadWrite)
50
if err != nil {
51
// This can only fail in case we ask for more owners than the
52
// available peers. This will never happen, but in any case we fall
53
// back to owning the target ourselves.
54
res = append(res, tgt)
55
}
56
if peers[0].Self {
57
res = append(res, tgt)
58
}
59
}
60
61
return res
62
}
63
64
// Labels converts Target into a set of sorted labels.
65
func (t Target) Labels() labels.Labels {
66
var lset labels.Labels
67
for k, v := range t {
68
lset = append(lset, labels.Label{Name: k, Value: v})
69
}
70
sort.Sort(lset)
71
return lset
72
}
73
74
// Exports holds values which are exported by all discovery components.
75
type Exports struct {
76
Targets []Target `river:"targets,attr"`
77
}
78
79
// Discoverer is an alias for Prometheus' Discoverer interface, so users of this package don't need
80
// to import github.com/prometheus/prometheus/discover as well.
81
type Discoverer discovery.Discoverer
82
83
// Creator is a function provided by an implementation to create a concrete Discoverer instance.
84
type Creator func(component.Arguments) (Discoverer, error)
85
86
// Component is a reusable component for any discovery implementation.
87
// it will handle dynamic updates and exporting targets appropriately for a scrape implementation.
88
type Component struct {
89
opts component.Options
90
91
discMut sync.Mutex
92
latestDisc discovery.Discoverer
93
newDiscoverer chan struct{}
94
95
creator Creator
96
}
97
98
// New creates a discovery component given arguments and a concrete Discovery implementation function.
99
func New(o component.Options, args component.Arguments, creator Creator) (*Component, error) {
100
c := &Component{
101
opts: o,
102
creator: creator,
103
// buffered to avoid deadlock from the first immediate update
104
newDiscoverer: make(chan struct{}, 1),
105
}
106
return c, c.Update(args)
107
}
108
109
// Run implements component.Component.
110
func (c *Component) Run(ctx context.Context) error {
111
var cancel context.CancelFunc
112
for {
113
select {
114
case <-ctx.Done():
115
return nil
116
case <-c.newDiscoverer:
117
// cancel any previously running discovery
118
if cancel != nil {
119
cancel()
120
}
121
// create new context so we can cancel it if we get any future updates
122
// since it is derived from the main run context, it only needs to be
123
// canceled directly if we receive new updates
124
newCtx, cancelFunc := context.WithCancel(ctx)
125
cancel = cancelFunc
126
127
// finally run discovery
128
c.discMut.Lock()
129
disc := c.latestDisc
130
c.discMut.Unlock()
131
go c.runDiscovery(newCtx, disc)
132
}
133
}
134
}
135
136
// Update implements component.Component.
137
func (c *Component) Update(args component.Arguments) error {
138
disc, err := c.creator(args)
139
if err != nil {
140
return err
141
}
142
c.discMut.Lock()
143
c.latestDisc = disc
144
c.discMut.Unlock()
145
146
select {
147
case c.newDiscoverer <- struct{}{}:
148
default:
149
}
150
151
return nil
152
}
153
154
// maxUpdateFrequency is the minimum time to wait between updating targets.
155
// Currently not settable, since prometheus uses a static threshold, but
156
// we could reconsider later.
157
const maxUpdateFrequency = 5 * time.Second
158
159
// runDiscovery is a utility for consuming and forwarding target groups from a discoverer.
160
// It will handle collating targets (and clearing), as well as time based throttling of updates.
161
func (c *Component) runDiscovery(ctx context.Context, d Discoverer) {
162
// all targets we have seen so far
163
cache := map[string]*targetgroup.Group{}
164
165
ch := make(chan []*targetgroup.Group)
166
go d.Run(ctx, ch)
167
168
// function to convert and send targets in format scraper expects
169
send := func() {
170
allTargets := []Target{}
171
for _, group := range cache {
172
for _, target := range group.Targets {
173
labels := map[string]string{}
174
// first add the group labels, and then the
175
// target labels, so that target labels take precedence.
176
for k, v := range group.Labels {
177
labels[string(k)] = string(v)
178
}
179
for k, v := range target {
180
labels[string(k)] = string(v)
181
}
182
allTargets = append(allTargets, labels)
183
}
184
}
185
c.opts.OnStateChange(Exports{Targets: allTargets})
186
}
187
188
ticker := time.NewTicker(maxUpdateFrequency)
189
// true if we have received new targets and need to send.
190
haveUpdates := false
191
for {
192
select {
193
case <-ticker.C:
194
if haveUpdates {
195
send()
196
haveUpdates = false
197
}
198
case <-ctx.Done():
199
send()
200
return
201
case groups := <-ch:
202
for _, group := range groups {
203
// Discoverer will send an empty target set to indicate the group (keyed by Source field)
204
// should be removed
205
if len(group.Targets) == 0 {
206
delete(cache, group.Source)
207
} else {
208
cache[group.Source] = group
209
}
210
}
211
haveUpdates = true
212
}
213
}
214
}
215
216