Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/register.go
5340 views
1
package integrations
2
3
import (
4
"fmt"
5
"reflect"
6
"strings"
7
"testing"
8
9
"gopkg.in/yaml.v2"
10
11
v1 "github.com/grafana/agent/pkg/integrations"
12
"github.com/grafana/agent/pkg/integrations/v2/common"
13
"github.com/grafana/agent/pkg/util"
14
)
15
16
var (
17
integrationByName = make(map[string]interface{}) // Cache of field names for uniqueness checking.
18
integrationTypes = make(map[reflect.Type]Type) // Map of registered type to Type
19
nameByType = make(map[reflect.Type]string) // Map of Type to registered name
20
21
// Registered integrations. Registered integrations may be either a Config or
22
// a v1.Config. v1.Configs must have a corresponding upgrader for their type.
23
registered = []interface{}{}
24
upgraders = make(map[reflect.Type]UpgradeFunc)
25
26
emptyStructType = reflect.TypeOf(struct{}{})
27
configsType = reflect.TypeOf(Configs{})
28
)
29
30
// Register dynamically registers a new integration. The Config
31
// will represent the configuration that controls the specific integration.
32
// Registered Configs may be loaded using UnmarshalYAML or manually
33
// constructed.
34
//
35
// ty controls how the integration can be unmarshaled from YAML.
36
//
37
// Register panics if cfg is not a pointer.
38
func Register(cfg Config, ty Type) {
39
registerIntegration(cfg, cfg.Name(), ty, nil)
40
}
41
42
func registerIntegration(v interface{}, name string, ty Type, upgrader UpgradeFunc) {
43
if reflect.TypeOf(v).Kind() != reflect.Ptr {
44
panic(fmt.Sprintf("Register must be given a pointer, got %T", v))
45
}
46
if _, exist := integrationByName[name]; exist {
47
panic(fmt.Sprintf("Integration %q registered twice", name))
48
}
49
integrationByName[name] = v
50
51
registered = append(registered, v)
52
53
configTy := reflect.TypeOf(v)
54
integrationTypes[configTy] = ty
55
upgraders[configTy] = upgrader
56
nameByType[configTy] = name
57
}
58
59
// RegisterLegacy registers a v1.Config. upgrader will be used to upgrade it.
60
// upgrader will only be invoked after unmarshaling cfg from YAML, and the
61
// upgraded Config will be unwrapped again when marshaling back to YAML.
62
//
63
// RegisterLegacy only exists for the transition period where the v2
64
// integrations subsystem is an experiment. RegisterLegacy will be removed at a
65
// later date.
66
func RegisterLegacy(cfg v1.Config, ty Type, upgrader UpgradeFunc) {
67
realConfig := upgrader(cfg, common.MetricsConfig{})
68
registerIntegration(cfg, realConfig.Name(), ty, upgrader)
69
}
70
71
// UpgradeFunc upgrades cfg to a UpgradedConfig.
72
type UpgradeFunc func(cfg v1.Config, common common.MetricsConfig) UpgradedConfig
73
74
// UpgradedConfig is a v2 Config that was constructed through a legacy
75
// v1.Config. It allows unwrapping to retrieve the original config for
76
// the purposes of marshaling or unmarshaling.
77
type UpgradedConfig interface {
78
Config
79
80
// LegacyConfig returns the old v1.Config.
81
LegacyConfig() (v1.Config, common.MetricsConfig)
82
}
83
84
// Type determines a specific type of integration.
85
type Type int
86
87
const (
88
// TypeInvalid is an invalid type.
89
TypeInvalid Type = iota
90
91
// TypeSingleton is an integration that can only be defined exactly once in
92
// the config, unmarshaled through "<integration name>"
93
TypeSingleton
94
95
// TypeMultiplex is an integration that can only be defined through an array,
96
// unmarshaled through "<integration name>_configs"
97
TypeMultiplex
98
99
// TypeEither is an integration that can be unmarshaled either as a singleton
100
// or as an array, but not both.
101
//
102
// Deprecated. Use either TypeSingleton or TypeMultiplex for new integrations.
103
TypeEither
104
)
105
106
// setRegistered is used by tests to temporarily set integrations. Registered
107
// integrations will be unregistered after the test completes.
108
//
109
// setRegistered must not be used with parallelized tests.
110
func setRegistered(t *testing.T, cc map[Config]Type) {
111
clear := func() {
112
integrationByName = make(map[string]interface{})
113
integrationTypes = make(map[reflect.Type]Type)
114
registered = registered[:0]
115
upgraders = make(map[reflect.Type]UpgradeFunc)
116
nameByType = make(map[reflect.Type]string)
117
}
118
119
t.Cleanup(clear)
120
clear()
121
122
for c, t := range cc {
123
Register(c, t)
124
}
125
}
126
127
// Registered all Configs that were passed to Register or RegisterLegacy. Each
128
// call will generate a new set of configs.
129
func Registered() []Config {
130
res := make([]Config, 0, len(registered))
131
for _, r := range registered {
132
res = append(res, cloneConfig(r))
133
}
134
return res
135
}
136
137
// RegisteredType returns the registered integrations.Type for c.
138
func RegisteredType(c Config) (Type, bool) {
139
// We want to look up the registered type. Integrations are always registered
140
// as pointers, so we need to add indirection here if a non-pointer is loaded
141
// into the subsystem.
142
cType := reflect.TypeOf(c)
143
if cType.Kind() != reflect.Ptr {
144
cType = reflect.PtrTo(cType)
145
}
146
147
t, ok := integrationTypes[cType]
148
return t, ok
149
}
150
151
func cloneConfig(r interface{}) Config {
152
switch v := r.(type) {
153
case Config:
154
return cloneValue(v).(Config)
155
case v1.Config:
156
mut, ok := upgraders[reflect.TypeOf(v)]
157
if !ok || mut == nil {
158
panic(fmt.Sprintf("Could not find transformer for legacy integration %T", r))
159
}
160
return mut(cloneValue(r).(v1.Config), common.MetricsConfig{})
161
default:
162
panic(fmt.Sprintf("unexpected type %T", r))
163
}
164
}
165
166
func cloneValue(in interface{}) interface{} {
167
return reflect.New(reflect.TypeOf(in).Elem()).Interface()
168
}
169
170
// Configs is a list of integrations. Note that Configs does not implement
171
// yaml.Unmarshaler or yaml.Marshaler. Use the UnmarshalYAML or MarshalYAML
172
// methods to deal with integrations defined from YAML.
173
type Configs []Config
174
175
// MarshalYAML helps implement yaml.Marshaler for structs that have a Configs
176
// field that should be inlined in the YAML string.
177
func MarshalYAML(v interface{}) (interface{}, error) {
178
inVal := reflect.ValueOf(v)
179
for inVal.Kind() == reflect.Ptr {
180
inVal = inVal.Elem()
181
}
182
if inVal.Kind() != reflect.Struct {
183
return nil, fmt.Errorf("integrations: can only marshal a struct, got %T", v)
184
}
185
inType := inVal.Type()
186
187
var (
188
outType = getConfigTypeForIntegrations(inType)
189
outPointer = reflect.New(outType)
190
outVal = outPointer.Elem()
191
)
192
193
// Copy over any existing value from inVal to cfgVal.
194
//
195
// The ordering of fields in inVal and cfgVal match identically up until the
196
// extra fields appended to the end of cfgVal.
197
var configs Configs
198
for i, n := 0, inType.NumField(); i < n; i++ {
199
if inType.Field(i).Type == configsType {
200
configs = inVal.Field(i).Interface().(Configs)
201
if configs == nil {
202
configs = Configs{}
203
}
204
}
205
if outType.Field(i).PkgPath != "" {
206
continue // Field is unexported: ignore.
207
}
208
outVal.Field(i).Set(inVal.Field(i))
209
}
210
if configs == nil {
211
return nil, fmt.Errorf("integrations: Configs field not found in type: %T", v)
212
}
213
214
// Map of discovered singleton integration names. A singleton integration may
215
// not be defined in Configs more than once.
216
uniqueSingletons := make(map[string]struct{})
217
218
for _, c := range configs {
219
fieldName := c.Name()
220
221
var data interface{} = c
222
if wc, ok := c.(UpgradedConfig); ok {
223
data, _ = wc.LegacyConfig()
224
}
225
226
integrationType, ok := integrationTypes[reflect.TypeOf(data)]
227
if !ok {
228
panic(fmt.Sprintf("config not registered: %T", data))
229
}
230
231
if _, exists := uniqueSingletons[fieldName]; exists {
232
return nil, fmt.Errorf("integration %q may not be defined more than once", fieldName)
233
}
234
uniqueSingletons[fieldName] = struct{}{}
235
236
// TODO(rfratto): make sure that TypeSingleton integrations are unique on
237
// marshaling out
238
239
// Generate the *util.RawYAML to marshal out with.
240
var (
241
bb []byte
242
err error
243
)
244
switch v := c.(type) {
245
case UpgradedConfig:
246
inner, common := v.LegacyConfig()
247
bb, err = util.MarshalYAMLMerged(common, inner)
248
default:
249
bb, err = yaml.Marshal(v)
250
}
251
if err != nil {
252
return nil, fmt.Errorf("failed to marshal integration %q: %w", fieldName, err)
253
}
254
raw := util.RawYAML(bb)
255
256
switch integrationType {
257
case TypeSingleton:
258
field := outVal.FieldByName("XXX_Config_" + fieldName)
259
field.Set(reflect.ValueOf(&raw))
260
case TypeMultiplex, TypeEither:
261
field := outVal.FieldByName("XXX_Configs_" + fieldName)
262
field.Set(reflect.Append(field, reflect.ValueOf(&raw)))
263
}
264
}
265
266
return outPointer.Interface(), nil
267
}
268
269
// UnmarshalYAML helps implement yaml.Unmarshaller for structs that have a
270
// Configs field that should be inlined in the YAML string. Code adapted from
271
// Prometheus:
272
//
273
// https://github.com/prometheus/prometheus/blob/511511324adfc4f4178f064cc104c2deac3335de/discovery/registry.go#L111
274
func UnmarshalYAML(out interface{}, unmarshal func(interface{}) error) error {
275
outVal := reflect.ValueOf(out)
276
if outVal.Kind() != reflect.Ptr {
277
return fmt.Errorf("integrations: can only unmarshal into a struct pointer, got %T", out)
278
}
279
outVal = outVal.Elem()
280
if outVal.Kind() != reflect.Struct {
281
return fmt.Errorf("integrations: can only unmarshal into a struct pointer, got %T", out)
282
}
283
outType := outVal.Type()
284
285
var (
286
cfgType = getConfigTypeForIntegrations(outType)
287
cfgPointer = reflect.New(cfgType)
288
cfgVal = cfgPointer.Elem()
289
)
290
291
// Copy over any existing value from outVal to cfgVal.
292
//
293
// The ordering of fields in outVal and cfgVal match identically up until the
294
// extra fields appended to the end of cfgVal.
295
var configs *Configs
296
for i := 0; i < outVal.NumField(); i++ {
297
if outType.Field(i).Type == configsType {
298
if configs != nil {
299
return fmt.Errorf("integrations: Multiple Configs fields found in %T", out)
300
}
301
configs = outVal.Field(i).Addr().Interface().(*Configs)
302
continue
303
}
304
if cfgType.Field(i).PkgPath != "" {
305
// Ignore unexported fields
306
continue
307
}
308
cfgVal.Field(i).Set(outVal.Field(i))
309
}
310
if configs == nil {
311
return fmt.Errorf("integrations: No Configs field found in %T", out)
312
}
313
314
// Unmarshal into our dynamic type.
315
if err := unmarshal(cfgPointer.Interface()); err != nil {
316
return replaceYAMLTypeError(err, cfgType, outType)
317
}
318
319
// Copy back unmarshaled fields that were originally in outVal.
320
for i := 0; i < outVal.NumField(); i++ {
321
if cfgType.Field(i).PkgPath != "" {
322
// Ignore unexported fields
323
continue
324
}
325
outVal.Field(i).Set(cfgVal.Field(i))
326
}
327
328
// Iterate through the remainder of our fields, which should all be
329
// either a Config or a slice of types that implement Config.
330
for i := outVal.NumField(); i < cfgVal.NumField(); i++ {
331
// Our integrations are unmarshaled as *util.RawYAML or []*util.RawYAML. If
332
// it's nil, we treat it as not defined.
333
fieldType := cfgVal.Type().Field(i)
334
field := cfgVal.Field(i)
335
if field.IsNil() {
336
continue
337
}
338
339
switch field.Kind() {
340
case reflect.Slice:
341
configName := strings.TrimPrefix(fieldType.Name, "XXX_Configs_")
342
configReference, ok := integrationByName[configName]
343
if !ok {
344
return fmt.Errorf("integration %q not registered", configName)
345
}
346
347
for i := 0; i < field.Len(); i++ {
348
if field.Index(i).IsNil() {
349
continue
350
}
351
raw := field.Index(i).Interface().(*util.RawYAML)
352
c, err := deferredConfigUnmarshal(*raw, configReference)
353
if err != nil {
354
return err
355
}
356
*configs = append(*configs, c)
357
}
358
default:
359
configName := strings.TrimPrefix(fieldType.Name, "XXX_Config_")
360
configReference, ok := integrationByName[configName]
361
if !ok {
362
return fmt.Errorf("integration %q not registered", configName)
363
}
364
raw := field.Interface().(*util.RawYAML)
365
c, err := deferredConfigUnmarshal(*raw, configReference)
366
if err != nil {
367
return err
368
}
369
*configs = append(*configs, c)
370
}
371
}
372
373
return nil
374
}
375
376
// deferredConfigUnmarshal performs a deferred unmarshal of raw into a Config.
377
// ref must be either Config or v1.Config.
378
func deferredConfigUnmarshal(raw util.RawYAML, ref interface{}) (Config, error) {
379
switch ref := ref.(type) {
380
case Config:
381
out := cloneValue(ref).(Config)
382
err := yaml.UnmarshalStrict(raw, out)
383
return out, err
384
case v1.Config:
385
var (
386
common common.MetricsConfig
387
out = cloneValue(ref).(v1.Config)
388
)
389
mut, ok := upgraders[reflect.TypeOf(out)]
390
if !ok {
391
panic(fmt.Sprintf("unexpected type %T", ref))
392
}
393
err := util.UnmarshalYAMLMerged(raw, &common, out)
394
return mut(out, common), err
395
default:
396
panic(fmt.Sprintf("unexpected type %T", ref))
397
}
398
}
399
400
// getConfigTypeForIntegrations returns a dynamic struct type that has all of
401
// the same fields as out including the fields for the provided integrations.
402
//
403
// integrations are unmarshaled to *util.RawYAML for deferred unmarshaling.
404
func getConfigTypeForIntegrations(out reflect.Type) reflect.Type {
405
// Initial exported fields map one-to-one.
406
var fields []reflect.StructField
407
for i, n := 0, out.NumField(); i < n; i++ {
408
switch field := out.Field(i); {
409
case field.PkgPath == "" && field.Type != configsType:
410
fields = append(fields, field)
411
default:
412
fields = append(fields, reflect.StructField{
413
Name: "_" + field.Name, // Field must be unexported.
414
PkgPath: out.PkgPath(),
415
Type: emptyStructType,
416
})
417
}
418
}
419
420
for _, reg := range registered {
421
// Fields use a prefix that's unlikely to collide with anything else.
422
configTy := reflect.TypeOf(reg)
423
fieldName := nameByType[configTy]
424
425
singletonType := reflect.PtrTo(reflect.TypeOf(util.RawYAML{}))
426
427
fields = append(fields, reflect.StructField{
428
Name: "XXX_Config_" + fieldName,
429
Tag: reflect.StructTag(fmt.Sprintf(`yaml:"%s,omitempty"`, fieldName)),
430
Type: singletonType,
431
})
432
fields = append(fields, reflect.StructField{
433
Name: "XXX_Configs_" + fieldName,
434
Tag: reflect.StructTag(fmt.Sprintf(`yaml:"%s_configs,omitempty"`, fieldName)),
435
Type: reflect.SliceOf(singletonType),
436
})
437
}
438
return reflect.StructOf(fields)
439
}
440
441
func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error {
442
if e, ok := err.(*yaml.TypeError); ok {
443
oldStr := oldTyp.String()
444
newStr := newTyp.String()
445
for i, s := range e.Errors {
446
e.Errors[i] = strings.ReplaceAll(s, oldStr, newStr)
447
}
448
}
449
return err
450
}
451
452