Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/agent.go
4093 views
1
// Package metrics implements a Prometheus-lite client for service discovery,
2
// scraping metrics into a WAL, and remote_write. Clients are broken into a
3
// set of instances, each of which contain their own set of configs.
4
package metrics
5
6
import (
7
"errors"
8
"flag"
9
"fmt"
10
"sync"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
"github.com/prometheus/client_golang/prometheus"
16
"go.uber.org/atomic"
17
"google.golang.org/grpc"
18
19
"github.com/grafana/agent/pkg/metrics/cluster"
20
"github.com/grafana/agent/pkg/metrics/cluster/client"
21
"github.com/grafana/agent/pkg/metrics/instance"
22
"github.com/grafana/agent/pkg/util"
23
"github.com/prometheus/prometheus/discovery"
24
)
25
26
// DefaultConfig is the default settings for the Prometheus-lite client.
27
var DefaultConfig = Config{
28
Global: instance.DefaultGlobalConfig,
29
InstanceRestartBackoff: instance.DefaultBasicManagerConfig.InstanceRestartBackoff,
30
WALDir: "data-agent/",
31
WALCleanupAge: DefaultCleanupAge,
32
WALCleanupPeriod: DefaultCleanupPeriod,
33
ServiceConfig: cluster.DefaultConfig,
34
ServiceClientConfig: client.DefaultConfig,
35
InstanceMode: instance.DefaultMode,
36
}
37
38
// Config defines the configuration for the entire set of Prometheus client
39
// instances, along with a global configuration.
40
type Config struct {
41
Global instance.GlobalConfig `yaml:"global,omitempty"`
42
WALDir string `yaml:"wal_directory,omitempty"`
43
WALCleanupAge time.Duration `yaml:"wal_cleanup_age,omitempty"`
44
WALCleanupPeriod time.Duration `yaml:"wal_cleanup_period,omitempty"`
45
ServiceConfig cluster.Config `yaml:"scraping_service,omitempty"`
46
ServiceClientConfig client.Config `yaml:"scraping_service_client,omitempty"`
47
Configs []instance.Config `yaml:"configs,omitempty"`
48
InstanceRestartBackoff time.Duration `yaml:"instance_restart_backoff,omitempty"`
49
InstanceMode instance.Mode `yaml:"instance_mode,omitempty"`
50
DisableKeepAlives bool `yaml:"http_disable_keepalives,omitempty"`
51
IdleConnTimeout time.Duration `yaml:"http_idle_conn_timeout,omitempty"`
52
53
// Unmarshaled is true when the Config was unmarshaled from YAML.
54
Unmarshaled bool `yaml:"-"`
55
}
56
57
// UnmarshalYAML implements yaml.Unmarshaler.
58
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
59
*c = DefaultConfig
60
util.DefaultConfigFromFlags(c)
61
c.Unmarshaled = true
62
63
type plain Config
64
err := unmarshal((*plain)(c))
65
if err != nil {
66
return err
67
}
68
69
c.ServiceConfig.Client = c.ServiceClientConfig
70
return nil
71
}
72
73
// ApplyDefaults applies default values to the Config and validates it.
74
func (c *Config) ApplyDefaults() error {
75
needWAL := len(c.Configs) > 0 || c.ServiceConfig.Enabled
76
if needWAL && c.WALDir == "" {
77
return errors.New("no wal_directory configured")
78
}
79
80
if c.ServiceConfig.Enabled && len(c.Configs) > 0 {
81
return errors.New("cannot use configs when scraping_service mode is enabled")
82
}
83
84
c.Global.DisableKeepAlives = c.DisableKeepAlives
85
c.Global.IdleConnTimeout = c.IdleConnTimeout
86
usedNames := map[string]struct{}{}
87
88
for i := range c.Configs {
89
name := c.Configs[i].Name
90
if err := c.Configs[i].ApplyDefaults(c.Global); err != nil {
91
// Try to show a helpful name in the error
92
if name == "" {
93
name = fmt.Sprintf("at index %d", i)
94
}
95
96
return fmt.Errorf("error validating instance %s: %w", name, err)
97
}
98
99
if _, ok := usedNames[name]; ok {
100
return fmt.Errorf(
101
"prometheus instance names must be unique. found multiple instances with name %s",
102
name,
103
)
104
}
105
usedNames[name] = struct{}{}
106
}
107
108
return nil
109
}
110
111
// RegisterFlags defines flags corresponding to the Config.
112
func (c *Config) RegisterFlags(f *flag.FlagSet) {
113
c.RegisterFlagsWithPrefix("metrics.", f)
114
}
115
116
// RegisterFlagsWithPrefix defines flags with the provided prefix.
117
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
118
f.StringVar(&c.WALDir, prefix+"wal-directory", DefaultConfig.WALDir, "base directory to store the WAL in")
119
f.DurationVar(&c.WALCleanupAge, prefix+"wal-cleanup-age", DefaultConfig.WALCleanupAge, "remove abandoned (unused) WALs older than this")
120
f.DurationVar(&c.WALCleanupPeriod, prefix+"wal-cleanup-period", DefaultConfig.WALCleanupPeriod, "how often to check for abandoned WALs")
121
f.DurationVar(&c.InstanceRestartBackoff, prefix+"instance-restart-backoff", DefaultConfig.InstanceRestartBackoff, "how long to wait before restarting a failed Prometheus instance")
122
123
c.ServiceConfig.RegisterFlagsWithPrefix(prefix+"service.", f)
124
c.ServiceClientConfig.RegisterFlagsWithPrefix(prefix, f)
125
}
126
127
// Agent is an agent for collecting Prometheus metrics. It acts as a
128
// Prometheus-lite; only running the service discovery, remote_write, and WAL
129
// components of Prometheus. It is broken down into a series of Instances, each
130
// of which perform metric collection.
131
type Agent struct {
132
mut sync.RWMutex
133
cfg Config
134
logger log.Logger
135
reg prometheus.Registerer
136
137
// Store both the basic manager and the modal manager, so we can update their
138
// settings independently. Only the ModalManager should be used for mutating
139
// configs.
140
bm *instance.BasicManager
141
mm *instance.ModalManager
142
cleaner *WALCleaner
143
144
instanceFactory instanceFactory
145
146
cluster *cluster.Cluster
147
148
stopped bool
149
stopOnce sync.Once
150
actor chan func()
151
152
initialBootDone atomic.Bool
153
}
154
155
// New creates and starts a new Agent.
156
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (*Agent, error) {
157
// This registers discovery metrics with the default registry which should be the reg specified above.
158
discovery.RegisterMetrics()
159
return newAgent(reg, cfg, logger, defaultInstanceFactory)
160
}
161
162
func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact instanceFactory) (*Agent, error) {
163
a := &Agent{
164
logger: log.With(logger, "agent", "prometheus"),
165
instanceFactory: fact,
166
reg: reg,
167
actor: make(chan func(), 1),
168
}
169
170
a.bm = instance.NewBasicManager(instance.BasicManagerConfig{
171
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
172
}, a.logger, a.newInstance)
173
174
var err error
175
a.mm, err = instance.NewModalManager(a.reg, a.logger, a.bm, cfg.InstanceMode)
176
if err != nil {
177
return nil, fmt.Errorf("failed to create modal instance manager: %w", err)
178
}
179
180
a.cluster, err = cluster.New(a.logger, reg, cfg.ServiceConfig, a.mm, a.Validate)
181
if err != nil {
182
return nil, err
183
}
184
185
if err := a.ApplyConfig(cfg); err != nil {
186
return nil, err
187
}
188
go a.run()
189
return a, nil
190
}
191
192
// newInstance creates a new Instance given a config.
193
func (a *Agent) newInstance(c instance.Config) (instance.ManagedInstance, error) {
194
a.mut.RLock()
195
defer a.mut.RUnlock()
196
197
// Controls the label
198
instanceLabel := "instance_name"
199
if a.cfg.InstanceMode == instance.ModeShared {
200
instanceLabel = "instance_group_name"
201
}
202
203
reg := prometheus.WrapRegistererWith(prometheus.Labels{
204
instanceLabel: c.Name,
205
}, a.reg)
206
207
return a.instanceFactory(reg, c, a.cfg.WALDir, a.logger)
208
}
209
210
// Validate will validate the incoming Config and mutate it to apply defaults.
211
func (a *Agent) Validate(c *instance.Config) error {
212
a.mut.RLock()
213
defer a.mut.RUnlock()
214
215
if a.cfg.WALDir == "" {
216
return fmt.Errorf("no wal_directory configured")
217
}
218
219
if err := c.ApplyDefaults(a.cfg.Global); err != nil {
220
return fmt.Errorf("failed to apply defaults to %q: %w", c.Name, err)
221
}
222
return nil
223
}
224
225
// ApplyConfig applies config changes to the Agent.
226
func (a *Agent) ApplyConfig(cfg Config) error {
227
a.mut.Lock()
228
defer a.mut.Unlock()
229
230
if util.CompareYAML(a.cfg, cfg) {
231
return nil
232
}
233
234
if a.stopped {
235
return fmt.Errorf("agent stopped")
236
}
237
238
// The ordering here is done to minimze the number of instances that need to
239
// be restarted. We update components from lowest to highest level:
240
//
241
// 1. WAL Cleaner
242
// 2. Basic manager
243
// 3. Modal Manager
244
// 4. Cluster
245
// 5. Local configs
246
247
if a.cleaner != nil {
248
a.cleaner.Stop()
249
a.cleaner = nil
250
}
251
if cfg.WALDir != "" {
252
a.cleaner = NewWALCleaner(
253
a.logger,
254
a.mm,
255
cfg.WALDir,
256
cfg.WALCleanupAge,
257
cfg.WALCleanupPeriod,
258
)
259
}
260
261
a.bm.UpdateManagerConfig(instance.BasicManagerConfig{
262
InstanceRestartBackoff: cfg.InstanceRestartBackoff,
263
})
264
265
if err := a.mm.SetMode(cfg.InstanceMode); err != nil {
266
return err
267
}
268
269
if err := a.cluster.ApplyConfig(cfg.ServiceConfig); err != nil {
270
return fmt.Errorf("failed to apply cluster config: %w", err)
271
}
272
273
// Queue an actor in the background to sync the instances. This is required
274
// because creating both this function and newInstance grab the mutex.
275
oldConfig := a.cfg
276
277
a.actor <- func() {
278
a.syncInstances(oldConfig, cfg)
279
a.initialBootDone.Store(true)
280
}
281
282
a.cfg = cfg
283
return nil
284
}
285
286
// syncInstances syncs the state of the instance manager to newConfig by
287
// applying all configs from newConfig and deleting any configs from oldConfig
288
// that are not in newConfig.
289
func (a *Agent) syncInstances(oldConfig, newConfig Config) {
290
// Apply the new configs
291
for _, c := range newConfig.Configs {
292
if err := a.mm.ApplyConfig(c); err != nil {
293
level.Error(a.logger).Log("msg", "failed to apply config", "name", c.Name, "err", err)
294
}
295
}
296
297
// Remove any configs from oldConfig that aren't in newConfig.
298
for _, oc := range oldConfig.Configs {
299
foundConfig := false
300
for _, nc := range newConfig.Configs {
301
if nc.Name == oc.Name {
302
foundConfig = true
303
break
304
}
305
}
306
if foundConfig {
307
continue
308
}
309
310
if err := a.mm.DeleteConfig(oc.Name); err != nil {
311
level.Error(a.logger).Log("msg", "failed to delete old config", "name", oc.Name, "err", err)
312
}
313
}
314
}
315
316
// run calls received actor functions in the background.
317
func (a *Agent) run() {
318
for f := range a.actor {
319
f()
320
}
321
}
322
323
// Ready returns true if both the agent and all instances
324
// spawned by a Manager have completed startup.
325
func (a *Agent) Ready() bool {
326
// Wait for the initial load to complete so the instance manager has at least
327
// the base set of expected instances.
328
if !a.initialBootDone.Load() {
329
return false
330
}
331
332
for _, inst := range a.mm.ListInstances() {
333
if !inst.Ready() {
334
return false
335
}
336
}
337
338
return true
339
}
340
341
// WireGRPC wires gRPC services into the provided server.
342
func (a *Agent) WireGRPC(s *grpc.Server) {
343
a.cluster.WireGRPC(s)
344
}
345
346
// Config returns the configuration of this Agent.
347
func (a *Agent) Config() Config { return a.cfg }
348
349
// InstanceManager returns the instance manager used by this Agent.
350
func (a *Agent) InstanceManager() instance.Manager { return a.mm }
351
352
// Stop stops the agent and all its instances.
353
func (a *Agent) Stop() {
354
a.mut.Lock()
355
defer a.mut.Unlock()
356
357
// Close the actor channel to stop run.
358
a.stopOnce.Do(func() {
359
close(a.actor)
360
})
361
362
a.cluster.Stop()
363
364
if a.cleaner != nil {
365
a.cleaner.Stop()
366
}
367
368
// Only need to stop the ModalManager, which will pass through everything to the
369
// BasicManager.
370
a.mm.Stop()
371
372
a.stopped = true
373
}
374
375
type instanceFactory = func(reg prometheus.Registerer, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error)
376
377
func defaultInstanceFactory(reg prometheus.Registerer, cfg instance.Config, walDir string, logger log.Logger) (instance.ManagedInstance, error) {
378
return instance.New(reg, cfg, walDir, logger)
379
}
380
381