Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/manager.go
4094 views
1
package instance
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"sync"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/prometheus/client_golang/prometheus"
13
"github.com/prometheus/client_golang/prometheus/promauto"
14
"github.com/prometheus/prometheus/scrape"
15
"github.com/prometheus/prometheus/storage"
16
)
17
18
var (
19
instanceAbnormalExits = promauto.NewCounterVec(prometheus.CounterOpts{
20
Name: "agent_metrics_instance_abnormal_exits_total",
21
Help: "Total number of times a Prometheus instance exited unexpectedly, causing it to be restarted.",
22
}, []string{"instance_name"})
23
24
currentActiveInstances = promauto.NewGauge(prometheus.GaugeOpts{
25
Name: "agent_metrics_active_instances",
26
Help: "Current number of active instances being used by the agent.",
27
})
28
29
// DefaultBasicManagerConfig is the default config for the BasicManager.
30
DefaultBasicManagerConfig = BasicManagerConfig{
31
InstanceRestartBackoff: 5 * time.Second,
32
}
33
)
34
35
// Manager represents a set of methods for manipulating running instances at
36
// runtime.
37
type Manager interface {
38
// GetInstance retrieves a ManagedInstance by name.
39
GetInstance(name string) (ManagedInstance, error)
40
41
// ListInstances returns all currently managed instances running
42
// within the Manager. The key will be the instance name from their config.
43
ListInstances() map[string]ManagedInstance
44
45
// ListConfigs returns the config objects associated with a managed
46
// instance. The key will be the Name field from Config.
47
ListConfigs() map[string]Config
48
49
// ApplyConfig creates a new Config or updates an existing Config if
50
// one with Config.Name already exists.
51
ApplyConfig(Config) error
52
53
// DeleteConfig deletes a given managed instance based on its Config.Name.
54
DeleteConfig(name string) error
55
56
// Stop stops the Manager and all managed instances.
57
Stop()
58
}
59
60
// ManagedInstance is implemented by Instance. It is defined as an interface
61
// for the sake of testing from Manager implementations.
62
type ManagedInstance interface {
63
Run(ctx context.Context) error
64
Ready() bool
65
Update(c Config) error
66
TargetsActive() map[string][]*scrape.Target
67
StorageDirectory() string
68
Appender(ctx context.Context) storage.Appender
69
}
70
71
// BasicManagerConfig controls the operations of a BasicManager.
72
type BasicManagerConfig struct {
73
InstanceRestartBackoff time.Duration
74
}
75
76
// BasicManager creates a new BasicManager, implementing the Manager interface.
77
// BasicManager will directly launch instances and perform no extra processing.
78
//
79
// Other implementations of Manager usually wrap a BasicManager.
80
type BasicManager struct {
81
cfgMut sync.Mutex
82
cfg BasicManagerConfig
83
logger log.Logger
84
85
// Take care when locking mut: if you hold onto a lock of mut while calling
86
// Stop on a process, you will deadlock.
87
mut sync.Mutex
88
processes map[string]*managedProcess
89
90
launch Factory
91
}
92
93
// managedProcess represents a goroutine running a ManagedInstance. cancel
94
// requests that the goroutine should shutdown. done will be closed after the
95
// goroutine exists.
96
type managedProcess struct {
97
cfg Config
98
inst ManagedInstance
99
cancel context.CancelFunc
100
done chan bool
101
}
102
103
func (p managedProcess) Stop() {
104
p.cancel()
105
<-p.done
106
}
107
108
// Factory should return an unstarted instance given some config.
109
type Factory func(c Config) (ManagedInstance, error)
110
111
// NewBasicManager creates a new BasicManager. The launch function will be
112
// invoked any time a new Config is applied.
113
//
114
// The lifecycle of any ManagedInstance returned by the launch function will
115
// be handled by the BasicManager. Instances will be automatically restarted
116
// if stopped, updated if the config changes, or removed when the Config is
117
// deleted.
118
func NewBasicManager(cfg BasicManagerConfig, logger log.Logger, launch Factory) *BasicManager {
119
return &BasicManager{
120
cfg: cfg,
121
logger: logger,
122
processes: make(map[string]*managedProcess),
123
launch: launch,
124
}
125
}
126
127
// UpdateManagerConfig updates the BasicManagerConfig.
128
func (m *BasicManager) UpdateManagerConfig(c BasicManagerConfig) {
129
m.cfgMut.Lock()
130
defer m.cfgMut.Unlock()
131
m.cfg = c
132
}
133
134
// GetInstance returns the given instance by name.
135
func (m *BasicManager) GetInstance(name string) (ManagedInstance, error) {
136
m.mut.Lock()
137
defer m.mut.Unlock()
138
139
process, ok := m.processes[name]
140
if !ok {
141
return nil, fmt.Errorf("instance %s does not exist", name)
142
}
143
return process.inst, nil
144
}
145
146
// ListInstances returns the current active instances managed by BasicManager.
147
func (m *BasicManager) ListInstances() map[string]ManagedInstance {
148
m.mut.Lock()
149
defer m.mut.Unlock()
150
151
res := make(map[string]ManagedInstance, len(m.processes))
152
for name, process := range m.processes {
153
if process == nil {
154
continue
155
}
156
res[name] = process.inst
157
}
158
return res
159
}
160
161
// ListConfigs lists the current active configs managed by BasicManager.
162
func (m *BasicManager) ListConfigs() map[string]Config {
163
m.mut.Lock()
164
defer m.mut.Unlock()
165
166
res := make(map[string]Config, len(m.processes))
167
for name, process := range m.processes {
168
res[name] = process.cfg
169
}
170
return res
171
}
172
173
// ApplyConfig takes a Config and either starts a new managed instance or
174
// updates an existing managed instance. The value for Name in c is used to
175
// uniquely identify the Config and determine whether the Config has an
176
// existing associated managed instance.
177
func (m *BasicManager) ApplyConfig(c Config) error {
178
m.mut.Lock()
179
defer m.mut.Unlock()
180
181
// If the config already exists, we need to update it.
182
proc, ok := m.processes[c.Name]
183
if ok {
184
err := proc.inst.Update(c)
185
186
// If the instance could not be dynamically updated, we need to force the
187
// update by restarting it. If it failed for another reason, something
188
// serious went wrong and we'll completely give up without stopping the
189
// existing job.
190
if errors.Is(err, ErrInvalidUpdate{}) {
191
level.Info(m.logger).Log("msg", "could not dynamically update instance, will manually restart", "instance", c.Name, "reason", err)
192
193
// NOTE: we don't return here; we fall through to spawn the new instance.
194
proc.Stop()
195
} else if err != nil {
196
return fmt.Errorf("failed to update instance %s: %w", c.Name, err)
197
} else {
198
level.Info(m.logger).Log("msg", "dynamically updated instance", "instance", c.Name)
199
200
proc.cfg = c
201
return nil
202
}
203
}
204
205
// Spawn a new process for the new config.
206
err := m.spawnProcess(c)
207
if err != nil {
208
return err
209
}
210
211
currentActiveInstances.Inc()
212
return nil
213
}
214
215
func (m *BasicManager) spawnProcess(c Config) error {
216
inst, err := m.launch(c)
217
if err != nil {
218
return err
219
}
220
221
ctx, cancel := context.WithCancel(context.Background())
222
done := make(chan bool)
223
224
proc := &managedProcess{
225
cancel: cancel,
226
done: done,
227
cfg: c,
228
inst: inst,
229
}
230
m.processes[c.Name] = proc
231
232
go func() {
233
m.runProcess(ctx, c.Name, inst)
234
close(done)
235
236
// Now that the process has stopped, we can remove it from our managed
237
// list.
238
//
239
// However, it's possible that a new Config may have been applied and
240
// overwrote the initial value in our map. We only want to delete the
241
// process from the map if it hasn't changed from what we initially
242
// set it to.
243
//
244
// We only use the instance for comparing (which will never change) because
245
// the instance may have dynamically been given a new config since this
246
// goroutine started.
247
m.mut.Lock()
248
if storedProc, exist := m.processes[c.Name]; exist && storedProc.inst == inst {
249
delete(m.processes, c.Name)
250
}
251
m.mut.Unlock()
252
253
currentActiveInstances.Dec()
254
}()
255
256
return nil
257
}
258
259
// runProcess runs and instance and keeps it alive until it is explicitly stopped
260
// by cancelling the context.
261
func (m *BasicManager) runProcess(ctx context.Context, name string, inst ManagedInstance) {
262
for {
263
err := inst.Run(ctx)
264
if err != nil && err != context.Canceled {
265
backoff := m.instanceRestartBackoff()
266
267
instanceAbnormalExits.WithLabelValues(name).Inc()
268
level.Error(m.logger).Log("msg", "instance stopped abnormally, restarting after backoff period", "err", err, "backoff", backoff, "instance", name)
269
time.Sleep(backoff)
270
} else {
271
level.Info(m.logger).Log("msg", "stopped instance", "instance", name)
272
break
273
}
274
}
275
}
276
277
func (m *BasicManager) instanceRestartBackoff() time.Duration {
278
m.cfgMut.Lock()
279
defer m.cfgMut.Unlock()
280
return m.cfg.InstanceRestartBackoff
281
}
282
283
// DeleteConfig removes a managed instance by its config name. Returns an error
284
// if there is no such managed instance with the given name.
285
func (m *BasicManager) DeleteConfig(name string) error {
286
m.mut.Lock()
287
proc, ok := m.processes[name]
288
if !ok {
289
m.mut.Unlock()
290
return errors.New("config does not exist")
291
}
292
m.mut.Unlock()
293
294
// spawnProcess is responsible for removing the process from the map after it
295
// stops so we don't need to delete anything from m.processes here.
296
proc.Stop()
297
return nil
298
}
299
300
// Stop stops the BasicManager and stops all active processes for configs.
301
func (m *BasicManager) Stop() {
302
var wg sync.WaitGroup
303
304
// We don't need to change m.processes here; processes remove themselves
305
// from the map (in spawnProcess).
306
m.mut.Lock()
307
wg.Add(len(m.processes))
308
for _, proc := range m.processes {
309
go func(proc *managedProcess) {
310
proc.Stop()
311
wg.Done()
312
}(proc)
313
}
314
m.mut.Unlock()
315
316
wg.Wait()
317
}
318
319
// MockManager exposes methods of the Manager interface as struct fields.
320
// Useful for tests.
321
type MockManager struct {
322
GetInstanceFunc func(name string) (ManagedInstance, error)
323
ListInstancesFunc func() map[string]ManagedInstance
324
ListConfigsFunc func() map[string]Config
325
ApplyConfigFunc func(Config) error
326
DeleteConfigFunc func(name string) error
327
StopFunc func()
328
}
329
330
// GetInstance implements Manager.
331
func (m MockManager) GetInstance(name string) (ManagedInstance, error) {
332
if m.GetInstanceFunc != nil {
333
return m.GetInstanceFunc(name)
334
}
335
panic("GetInstanceFunc not implemented")
336
}
337
338
// ListInstances implements Manager.
339
func (m MockManager) ListInstances() map[string]ManagedInstance {
340
if m.ListInstancesFunc != nil {
341
return m.ListInstancesFunc()
342
}
343
panic("ListInstancesFunc not implemented")
344
}
345
346
// ListConfigs implements Manager.
347
func (m MockManager) ListConfigs() map[string]Config {
348
if m.ListConfigsFunc != nil {
349
return m.ListConfigsFunc()
350
}
351
panic("ListConfigsFunc not implemented")
352
}
353
354
// ApplyConfig implements Manager.
355
func (m MockManager) ApplyConfig(c Config) error {
356
if m.ApplyConfigFunc != nil {
357
return m.ApplyConfigFunc(c)
358
}
359
panic("ApplyConfigFunc not implemented")
360
}
361
362
// DeleteConfig implements Manager.
363
func (m MockManager) DeleteConfig(name string) error {
364
if m.DeleteConfigFunc != nil {
365
return m.DeleteConfigFunc(name)
366
}
367
panic("DeleteConfigFunc not implemented")
368
}
369
370
// Stop implements Manager.
371
func (m MockManager) Stop() {
372
if m.StopFunc != nil {
373
m.StopFunc()
374
return
375
}
376
panic("StopFunc not implemented")
377
}
378
379