Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/cloudwatch_exporter/config.go
5376 views
1
package cloudwatch_exporter
2
3
import (
4
"crypto/md5"
5
"encoding/hex"
6
"fmt"
7
"time"
8
9
"github.com/go-kit/log"
10
yaceConf "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
11
yaceModel "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
12
"gopkg.in/yaml.v2"
13
14
"github.com/grafana/agent/pkg/integrations"
15
integrations_v2 "github.com/grafana/agent/pkg/integrations/v2"
16
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
17
)
18
19
const (
20
metricsPerQuery = 500
21
cloudWatchConcurrency = 5
22
tagConcurrency = 5
23
labelsSnakeCase = false
24
)
25
26
// Since we are gathering metrics from CloudWatch and writing them in prometheus during each scrape, the timestamp
27
// used should be the scrape one
28
var addCloudwatchTimestamp = false
29
30
// Avoid producing absence of values in metrics
31
var nilToZero = true
32
33
func init() {
34
integrations.RegisterIntegration(&Config{})
35
integrations_v2.RegisterLegacy(&Config{}, integrations_v2.TypeMultiplex, metricsutils.NewNamedShim("cloudwatch"))
36
}
37
38
// Config is the configuration for the CloudWatch metrics integration
39
type Config struct {
40
STSRegion string `yaml:"sts_region"`
41
FIPSDisabled bool `yaml:"fips_disabled"`
42
Discovery DiscoveryConfig `yaml:"discovery"`
43
Static []StaticJob `yaml:"static"`
44
}
45
46
// DiscoveryConfig configures scraping jobs that will auto-discover metrics dimensions for a given service.
47
type DiscoveryConfig struct {
48
ExportedTags TagsPerNamespace `yaml:"exported_tags"`
49
Jobs []*DiscoveryJob `yaml:"jobs"`
50
}
51
52
// TagsPerNamespace represents for each namespace, a list of tags that will be exported as labels in each metric.
53
type TagsPerNamespace map[string][]string
54
55
// DiscoveryJob configures a discovery job for a given service.
56
type DiscoveryJob struct {
57
InlineRegionAndRoles `yaml:",inline"`
58
InlineCustomTags `yaml:",inline"`
59
SearchTags []Tag `yaml:"search_tags"`
60
Type string `yaml:"type"`
61
Metrics []Metric `yaml:"metrics"`
62
}
63
64
// StaticJob will scrape metrics that match all defined dimensions.
65
type StaticJob struct {
66
InlineRegionAndRoles `yaml:",inline"`
67
InlineCustomTags `yaml:",inline"`
68
Name string `yaml:"name"`
69
Namespace string `yaml:"namespace"`
70
Dimensions []Dimension `yaml:"dimensions"`
71
Metrics []Metric `yaml:"metrics"`
72
}
73
74
// InlineRegionAndRoles exposes for each supported job, the AWS regions and IAM roles in which the agent should perform the
75
// scrape.
76
type InlineRegionAndRoles struct {
77
Regions []string `yaml:"regions"`
78
Roles []Role `yaml:"roles"`
79
}
80
81
type InlineCustomTags struct {
82
CustomTags []Tag `yaml:"custom_tags"`
83
}
84
85
type Role struct {
86
RoleArn string `yaml:"role_arn"`
87
ExternalID string `yaml:"external_id"`
88
}
89
90
type Dimension struct {
91
Name string `yaml:"name"`
92
Value string `yaml:"value"`
93
}
94
95
type Tag struct {
96
Key string `yaml:"key"`
97
Value string `yaml:"value"`
98
}
99
100
type Metric struct {
101
Name string `yaml:"name"`
102
Statistics []string `yaml:"statistics"`
103
Period time.Duration `yaml:"period"`
104
}
105
106
// Name returns the name of the integration this config is for.
107
func (c *Config) Name() string {
108
return "cloudwatch_exporter"
109
}
110
111
func (c *Config) InstanceKey(agentKey string) (string, error) {
112
return getHash(c)
113
}
114
115
// NewIntegration creates a new integration from the config.
116
func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) {
117
exporterConfig, fipsEnabled, err := ToYACEConfig(c)
118
if err != nil {
119
return nil, fmt.Errorf("invalid cloudwatch exporter configuration: %w", err)
120
}
121
return newCloudwatchExporter(c.Name(), l, exporterConfig, fipsEnabled), nil
122
}
123
124
// getHash calculates the MD5 hash of the yaml representation of the config
125
func getHash(c *Config) (string, error) {
126
bytes, err := yaml.Marshal(c)
127
if err != nil {
128
return "", err
129
}
130
hash := md5.Sum(bytes)
131
return hex.EncodeToString(hash[:]), nil
132
}
133
134
// ToYACEConfig converts a Config into YACE's config model. Note that the conversion is not direct, some values
135
// have been opinionated to simplify the config model the agent exposes for this integration.
136
// The returned boolean is whether or not AWS FIPS endpoints will be enabled.
137
func ToYACEConfig(c *Config) (yaceConf.ScrapeConf, bool, error) {
138
discoveryJobs := []*yaceConf.Job{}
139
for _, job := range c.Discovery.Jobs {
140
discoveryJobs = append(discoveryJobs, toYACEDiscoveryJob(job))
141
}
142
staticJobs := []*yaceConf.Static{}
143
for _, stat := range c.Static {
144
staticJobs = append(staticJobs, toYACEStaticJob(stat))
145
}
146
conf := yaceConf.ScrapeConf{
147
APIVersion: "v1alpha1",
148
StsRegion: c.STSRegion,
149
Discovery: yaceConf.Discovery{
150
ExportedTagsOnMetrics: yaceConf.ExportedTagsOnMetrics(c.Discovery.ExportedTags),
151
Jobs: discoveryJobs,
152
},
153
Static: staticJobs,
154
}
155
156
// yaceSess expects a default value of True
157
fipsEnabled := !c.FIPSDisabled
158
159
// Run the exporter's config validation. Between other things, it will check that the service for which a discovery
160
// job is instantiated, it's supported.
161
if err := conf.Validate(); err != nil {
162
return conf, fipsEnabled, err
163
}
164
patchYACEDefaults(&conf)
165
166
return conf, fipsEnabled, nil
167
}
168
169
// patchYACEDefaults overrides some default values YACE applies after validation.
170
func patchYACEDefaults(yc *yaceConf.ScrapeConf) {
171
// YACE doesn't allow during validation a zero-delay in each metrics scrape. Override this behaviour since it's taken
172
// into account by the rounding period.
173
// https://github.com/nerdswords/yet-another-cloudwatch-exporter/blob/7e5949124bb5f26353eeff298724a5897de2a2a4/pkg/config/config.go#L320
174
for _, job := range yc.Discovery.Jobs {
175
for _, metric := range job.Metrics {
176
metric.Delay = 0
177
}
178
}
179
}
180
181
func toYACEStaticJob(job StaticJob) *yaceConf.Static {
182
return &yaceConf.Static{
183
Name: job.Name,
184
Regions: job.Regions,
185
Roles: toYACERoles(job.Roles),
186
Namespace: job.Namespace,
187
CustomTags: toYACETags(job.CustomTags),
188
Dimensions: toYACEDimensions(job.Dimensions),
189
Metrics: toYACEMetrics(job.Metrics),
190
}
191
}
192
193
func toYACEDimensions(dim []Dimension) []yaceConf.Dimension {
194
yaceDims := []yaceConf.Dimension{}
195
for _, d := range dim {
196
yaceDims = append(yaceDims, yaceConf.Dimension{
197
Name: d.Name,
198
Value: d.Value,
199
})
200
}
201
return yaceDims
202
}
203
204
func toYACEDiscoveryJob(job *DiscoveryJob) *yaceConf.Job {
205
roles := toYACERoles(job.Roles)
206
yaceJob := yaceConf.Job{
207
Regions: job.Regions,
208
Roles: roles,
209
CustomTags: toYACETags(job.CustomTags),
210
Type: job.Type,
211
Metrics: toYACEMetrics(job.Metrics),
212
SearchTags: toYACETags(job.SearchTags),
213
214
// By setting RoundingPeriod to nil, the exporter will align the start and end times for retrieving CloudWatch
215
// metrics, with the smallest period in the retrieved batch.
216
RoundingPeriod: nil,
217
218
JobLevelMetricFields: yaceConf.JobLevelMetricFields{
219
// Set to zero job-wide scraping time settings. This should be configured at the metric level to make the data
220
// being fetched more explicit.
221
Period: 0,
222
Length: 0,
223
Delay: 0,
224
NilToZero: &nilToZero,
225
AddCloudwatchTimestamp: &addCloudwatchTimestamp,
226
},
227
}
228
return &yaceJob
229
}
230
231
func toYACEMetrics(metrics []Metric) []*yaceConf.Metric {
232
yaceMetrics := []*yaceConf.Metric{}
233
for _, metric := range metrics {
234
periodSeconds := int64(metric.Period.Seconds())
235
lengthSeconds := periodSeconds
236
yaceMetrics = append(yaceMetrics, &yaceConf.Metric{
237
Name: metric.Name,
238
Statistics: metric.Statistics,
239
240
// Length dictates the size of the window for whom we request metrics, that is, endTime - startTime. Period
241
// dictates the size of the buckets in which we aggregate data, inside that window. Since data will be scraped
242
// by the agent every so often, dictated by the scrapedInterval, CloudWatch should return a single datapoint
243
// for each requested metric. That is if Period >= Length, but is Period > Length, we will be getting not enough
244
// data to fill the whole aggregation bucket. Therefore, Period == Length.
245
Period: periodSeconds,
246
Length: lengthSeconds,
247
248
// Delay moves back the time window for whom CloudWatch is requested data. Since we are already adjusting
249
// this with RoundingPeriod (see toYACEDiscoveryJob), we should omit this setting.
250
Delay: 0,
251
252
NilToZero: &nilToZero,
253
AddCloudwatchTimestamp: &addCloudwatchTimestamp,
254
})
255
}
256
return yaceMetrics
257
}
258
259
func toYACERoles(roles []Role) []yaceConf.Role {
260
yaceRoles := []yaceConf.Role{}
261
// YACE defaults to an empty role, which means the environment configured role is used
262
// https://github.com/nerdswords/yet-another-cloudwatch-exporter/blob/30aeceb2324763cdd024a1311045f83a09c1df36/pkg/config/config.go#L111
263
if len(roles) == 0 {
264
yaceRoles = append(yaceRoles, yaceConf.Role{})
265
}
266
for _, role := range roles {
267
yaceRoles = append(yaceRoles, yaceConf.Role{
268
RoleArn: role.RoleArn,
269
ExternalID: role.ExternalID,
270
})
271
}
272
return yaceRoles
273
}
274
275
func toYACETags(tags []Tag) []yaceModel.Tag {
276
outTags := []yaceModel.Tag{}
277
for _, t := range tags {
278
outTags = append(outTags, yaceModel.Tag{
279
Key: t.Key,
280
Value: t.Value,
281
})
282
}
283
return outTags
284
}
285
286