Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/group_manager.go
4094 views
1
package instance
2
3
import (
4
"crypto/md5"
5
"encoding/hex"
6
"fmt"
7
"sort"
8
"sync"
9
10
"github.com/prometheus/prometheus/config"
11
)
12
13
// A GroupManager wraps around another Manager and groups all incoming Configs
14
// into a smaller set of configs, causing less managed instances to be spawned.
15
//
16
// Configs are grouped by all settings for a Config *except* scrape configs.
17
// Any difference found in any flag will cause a Config to be placed in another
18
// group. One exception to this rule is that remote_writes are compared
19
// unordered, but the sets of remote_writes should otherwise be identical.
20
//
21
// GroupManagers drastically improve the performance of the Agent when a
22
// significant number of instances are spawned, as the overhead of each
23
// instance having its own service discovery, WAL, and remote_write can be
24
// significant.
25
//
26
// The config names of instances within the group will be represented by
27
// that group's hash of settings.
28
type GroupManager struct {
29
inner Manager
30
31
mtx sync.Mutex
32
33
// groups is a map of group name to the grouped configs.
34
groups map[string]groupedConfigs
35
36
// groupLookup is a map of config name to group name.
37
groupLookup map[string]string
38
}
39
40
// groupedConfigs holds a set of grouped configs, keyed by the config name.
41
// They are stored in a map rather than a slice to make overriding an existing
42
// config within the group less error-prone.
43
type groupedConfigs map[string]Config
44
45
// Copy returns a shallow copy of the groupedConfigs.
46
func (g groupedConfigs) Copy() groupedConfigs {
47
res := make(groupedConfigs, len(g))
48
for k, v := range g {
49
res[k] = v
50
}
51
return res
52
}
53
54
// NewGroupManager creates a new GroupManager for combining instances of the
55
// same "group."
56
func NewGroupManager(inner Manager) *GroupManager {
57
return &GroupManager{
58
inner: inner,
59
groups: make(map[string]groupedConfigs),
60
groupLookup: make(map[string]string),
61
}
62
}
63
64
// GetInstance gets the underlying grouped instance for a given name.
65
func (m *GroupManager) GetInstance(name string) (ManagedInstance, error) {
66
m.mtx.Lock()
67
defer m.mtx.Unlock()
68
69
group, ok := m.groupLookup[name]
70
if !ok {
71
return nil, fmt.Errorf("instance %s does not exist", name)
72
}
73
74
inst, err := m.inner.GetInstance(group)
75
if err != nil {
76
return nil, fmt.Errorf("failed to get instance for %s: %w", name, err)
77
}
78
return inst, nil
79
}
80
81
// ListInstances returns all currently grouped managed instances. The key
82
// will be the group's hash of shared settings.
83
func (m *GroupManager) ListInstances() map[string]ManagedInstance {
84
return m.inner.ListInstances()
85
}
86
87
// ListConfigs returns the UNGROUPED instance configs with their original
88
// settings. To see the grouped instances, call ListInstances instead.
89
func (m *GroupManager) ListConfigs() map[string]Config {
90
m.mtx.Lock()
91
defer m.mtx.Unlock()
92
93
cfgs := make(map[string]Config)
94
for _, groupedConfigs := range m.groups {
95
for _, cfg := range groupedConfigs {
96
cfgs[cfg.Name] = cfg
97
}
98
}
99
return cfgs
100
}
101
102
// ApplyConfig will determine the group of the Config before applying it to
103
// the group. If no group exists, one will be created. If a group already
104
// exists, the group will have its settings merged with the Config and
105
// will be updated.
106
func (m *GroupManager) ApplyConfig(c Config) error {
107
m.mtx.Lock()
108
defer m.mtx.Unlock()
109
return m.applyConfig(c)
110
}
111
112
func (m *GroupManager) applyConfig(c Config) (err error) {
113
groupName, err := hashConfig(c)
114
if err != nil {
115
return fmt.Errorf("failed to get group name for config %s: %w", c.Name, err)
116
}
117
118
grouped := m.groups[groupName]
119
if grouped == nil {
120
grouped = make(groupedConfigs)
121
} else {
122
grouped = grouped.Copy()
123
}
124
125
// Add the config to the group. If the config already exists within this
126
// group, it'll be overwritten.
127
grouped[c.Name] = c
128
mergedConfig, err := groupConfigs(groupName, grouped)
129
if err != nil {
130
err = fmt.Errorf("failed to group configs for %s: %w", c.Name, err)
131
return
132
}
133
134
// If this config already exists in another group, we have to delete it.
135
// If we can't delete it from the old group, we also can't apply it.
136
if oldGroup, ok := m.groupLookup[c.Name]; ok && oldGroup != groupName {
137
// There's a few cases here where if something fails, it's safer to crash
138
// out and restart the Agent from scratch than it would be to continue as
139
// normal. The panics here are for truly exceptional cases, otherwise if
140
// something is recoverable, we'll return an error like normal.
141
142
// If we can't find the old config, something got messed up when applying
143
// the config. But it also means that we're not going to be able to restore
144
// the config if something fails. Preemptively we should panic, since the
145
// internal state has gotten messed up and can't be fixed.
146
oldConfig, ok := m.groups[oldGroup][c.Name]
147
if !ok {
148
panic("failed to properly move config to new group. THIS IS A BUG!")
149
}
150
151
err = m.deleteConfig(c.Name)
152
if err != nil {
153
err = fmt.Errorf("cannot apply config %s because deleting it from the old group failed: %w", c.Name, err)
154
return
155
}
156
157
// Now that the config is deleted, we need to restore it in case applying
158
// the new one happens to fail.
159
defer func() {
160
if err == nil {
161
return
162
}
163
164
// If restoring a config fails, we've left the Agent in a really bad
165
// state: the new config can't be applied and the old config can't be
166
// brought back. Just crash and let the Agent start fresh.
167
//
168
// Restoring the config _shouldn't_ fail here since applies only fail
169
// if the config is invalid. Since the config was running before, it
170
// should already be valid. If it does happen to fail, though, the
171
// internal state is left corrupted since we've completely lost a
172
// config.
173
restoreError := m.applyConfig(oldConfig)
174
if restoreError != nil {
175
panic(fmt.Sprintf("failed to properly restore config. THIS IS A BUG! error: %s", restoreError))
176
}
177
}()
178
}
179
180
err = m.inner.ApplyConfig(mergedConfig)
181
if err != nil {
182
err = fmt.Errorf("failed to apply grouped configs for config %s: %w", c.Name, err)
183
return
184
}
185
186
// If the inner apply succeeded, we can update our group and the lookup.
187
m.groups[groupName] = grouped
188
m.groupLookup[c.Name] = groupName
189
return
190
}
191
192
// DeleteConfig will remove a Config from its associated group. If there are
193
// no more Configs within that group after this Config is deleted, the managed
194
// instance will be stopped. Otherwise, the managed instance will be updated
195
// with the new grouped Config that doesn't include the removed one.
196
func (m *GroupManager) DeleteConfig(name string) error {
197
m.mtx.Lock()
198
defer m.mtx.Unlock()
199
return m.deleteConfig(name)
200
}
201
202
func (m *GroupManager) deleteConfig(name string) error {
203
groupName, ok := m.groupLookup[name]
204
if !ok {
205
return fmt.Errorf("config does not exist")
206
}
207
208
// Grab a copy of the stored group and delete our entry. We can
209
// persist it after we successfully remove the config.
210
group := m.groups[groupName].Copy()
211
delete(group, name)
212
213
if len(group) == 0 {
214
// We deleted the last remaining config in that group; we can delete it in
215
// its entirety now.
216
if err := m.inner.DeleteConfig(groupName); err != nil {
217
return fmt.Errorf("failed to delete empty group %s after removing config %s: %w", groupName, name, err)
218
}
219
} else {
220
// We deleted the config but there's still more in the group; apply the new
221
// group that holds the remainder of the configs (minus the one we just
222
// deleted).
223
mergedConfig, err := groupConfigs(groupName, group)
224
if err != nil {
225
return fmt.Errorf("failed to regroup configs without %s: %w", name, err)
226
}
227
228
err = m.inner.ApplyConfig(mergedConfig)
229
if err != nil {
230
return fmt.Errorf("failed to apply new group without %s: %w", name, err)
231
}
232
}
233
234
// Update the stored group and remove the entry from the lookup table.
235
if len(group) == 0 {
236
delete(m.groups, groupName)
237
} else {
238
m.groups[groupName] = group
239
}
240
241
delete(m.groupLookup, name)
242
return nil
243
}
244
245
// Stop stops the Manager and all of its managed instances.
246
func (m *GroupManager) Stop() {
247
m.mtx.Lock()
248
defer m.mtx.Unlock()
249
250
m.inner.Stop()
251
m.groupLookup = make(map[string]string)
252
m.groups = make(map[string]groupedConfigs)
253
}
254
255
// hashConfig determines the hash of a Config used for grouping. It ignores
256
// the name and scrape_configs and also orders remote_writes by name prior to
257
// hashing.
258
func hashConfig(c Config) (string, error) {
259
// We need a deep copy since we're going to mutate the remote_write
260
// pointers.
261
groupable, err := c.Clone()
262
if err != nil {
263
return "", err
264
}
265
266
// Ignore name and scrape configs when hashing
267
groupable.Name = ""
268
groupable.ScrapeConfigs = nil
269
270
// Assign names to remote_write configs if they're not present already.
271
// This is also done in AssignDefaults but is duplicated here for the sake
272
// of simplifying responsibility of GroupManager.
273
for _, cfg := range groupable.RemoteWrite {
274
if cfg != nil {
275
// We don't care if the names are different, just that the other settings
276
// are the same. Blank out the name here before hashing the remote
277
// write config.
278
cfg.Name = ""
279
280
hash, err := getHash(cfg)
281
if err != nil {
282
return "", err
283
}
284
cfg.Name = hash[:6]
285
}
286
}
287
288
// Now sort remote_writes by name and nil-ness.
289
sort.Slice(groupable.RemoteWrite, func(i, j int) bool {
290
switch {
291
case groupable.RemoteWrite[i] == nil:
292
return true
293
case groupable.RemoteWrite[j] == nil:
294
return false
295
default:
296
return groupable.RemoteWrite[i].Name < groupable.RemoteWrite[j].Name
297
}
298
})
299
300
bb, err := MarshalConfig(&groupable, false)
301
if err != nil {
302
return "", err
303
}
304
hash := md5.Sum(bb)
305
return hex.EncodeToString(hash[:]), nil
306
}
307
308
// groupConfig creates a grouped Config where all fields are copied from
309
// the first config except for scrape_configs, which are appended together.
310
func groupConfigs(groupName string, grouped groupedConfigs) (Config, error) {
311
if len(grouped) == 0 {
312
return Config{}, fmt.Errorf("no configs")
313
}
314
315
// Move the map into a slice and sort it by name so this function
316
// consistently does the same thing.
317
cfgs := make([]Config, 0, len(grouped))
318
for _, cfg := range grouped {
319
cfgs = append(cfgs, cfg)
320
}
321
sort.Slice(cfgs, func(i, j int) bool { return cfgs[i].Name < cfgs[j].Name })
322
323
combined, err := cfgs[0].Clone()
324
if err != nil {
325
return Config{}, err
326
}
327
combined.Name = groupName
328
combined.ScrapeConfigs = []*config.ScrapeConfig{}
329
330
// Assign all remote_write configs in the group a consistent set of remote_names.
331
// If the grouped configs are coming from the scraping service, defaults will have
332
// been applied and the remote names will be prefixed with the old instance config name.
333
for _, rwc := range combined.RemoteWrite {
334
// Blank out the existing name before getting the hash so it doesn't take into
335
// account any existing name.
336
rwc.Name = ""
337
338
hash, err := getHash(rwc)
339
if err != nil {
340
return Config{}, err
341
}
342
343
rwc.Name = groupName[:6] + "-" + hash[:6]
344
}
345
346
// Combine all the scrape configs. It's possible that two different ungrouped
347
// configs had a matching job name, but this will be detected and rejected
348
// (as it should be) when the underlying Manager eventually validates the
349
// combined config.
350
//
351
// TODO(rfratto): should we prepend job names with the name of the original
352
// config? (e.g., job_name = "config_name/job_name").
353
for _, cfg := range cfgs {
354
combined.ScrapeConfigs = append(combined.ScrapeConfigs, cfg.ScrapeConfigs...)
355
}
356
357
return combined, nil
358
}
359
360