Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/resources_integrations.go
4094 views
1
package operator
2
3
import (
4
"fmt"
5
6
gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"
7
"github.com/grafana/agent/pkg/operator/assets"
8
"github.com/grafana/agent/pkg/operator/config"
9
apps_v1 "k8s.io/api/apps/v1"
10
core_v1 "k8s.io/api/core/v1"
11
)
12
13
func newIntegrationsDaemonSet(cfg *Config, name string, d gragent.Deployment) (*apps_v1.DaemonSet, error) {
14
opts := integrationsPodTemplateOptions(name, d, true)
15
tmpl, selector, err := generatePodTemplate(cfg, name, d, opts)
16
if err != nil {
17
return nil, err
18
}
19
20
spec := apps_v1.DaemonSetSpec{
21
UpdateStrategy: apps_v1.DaemonSetUpdateStrategy{
22
Type: apps_v1.RollingUpdateDaemonSetStrategyType,
23
},
24
Selector: selector,
25
Template: tmpl,
26
}
27
28
return &apps_v1.DaemonSet{
29
ObjectMeta: metadataFromPodTemplate(name, d, tmpl),
30
Spec: spec,
31
}, nil
32
}
33
34
func newIntegrationsDeployment(cfg *Config, name string, d gragent.Deployment) (*apps_v1.Deployment, error) {
35
opts := integrationsPodTemplateOptions(name, d, false)
36
tmpl, selector, err := generatePodTemplate(cfg, name, d, opts)
37
if err != nil {
38
return nil, err
39
}
40
41
spec := apps_v1.DeploymentSpec{
42
Strategy: apps_v1.DeploymentStrategy{
43
Type: apps_v1.RollingUpdateDeploymentStrategyType,
44
},
45
Selector: selector,
46
Template: tmpl,
47
}
48
49
return &apps_v1.Deployment{
50
ObjectMeta: metadataFromPodTemplate(name, d, tmpl),
51
Spec: spec,
52
}, nil
53
}
54
55
func integrationsPodTemplateOptions(name string, d gragent.Deployment, daemonset bool) podTemplateOptions {
56
// Integrations expect that the metrics and logs instances exist. This means
57
// that we have to merge the podTemplateOptions used for metrics and logs
58
// with the options used for integrations.
59
60
// Since integrations may be running as a DaemonSet, it's not possible for us
61
// to rely on a PVC template that metrics might be using. We'll force the WAL
62
// to use an empty volume.
63
d.Agent.Spec.Storage = nil
64
65
integrationOpts := podTemplateOptions{
66
ExtraSelectorLabels: map[string]string{
67
agentTypeLabel: "integrations",
68
},
69
Privileged: daemonset,
70
}
71
72
// We need to iterate over all of our integrations to append extra Volumes,
73
// VolumesMounts, and references to Secrets or ConfigMaps from the resource
74
// hierarchy.
75
var (
76
secretsPaths []core_v1.KeyToPath
77
mountedKeys = map[assets.Key]struct{}{}
78
)
79
80
for _, i := range d.Integrations {
81
inst := i.Instance
82
volumePrefix := fmt.Sprintf("%s-%s-", inst.Namespace, inst.Name)
83
84
for _, v := range inst.Spec.Volumes {
85
// Prefix the key of the Integration CR so it doesn't potentially collide
86
// with other loaded Integration CRs.
87
v = *v.DeepCopy()
88
v.Name = volumePrefix + v.Name
89
90
integrationOpts.ExtraVolumes = append(integrationOpts.ExtraVolumes, v)
91
}
92
for _, vm := range inst.Spec.VolumeMounts {
93
// Prefix the key of the Integration CR so it doesn't potentially collide
94
// with other loaded Integration CRs.
95
vm = *vm.DeepCopy()
96
vm.Name = volumePrefix + vm.Name
97
98
integrationOpts.ExtraVolumeMounts = append(integrationOpts.ExtraVolumeMounts, vm)
99
}
100
101
for _, s := range inst.Spec.Secrets {
102
// We need to determine what the value for this Secret was in the shared
103
// Secret resource.
104
key := assets.KeyForSecret(inst.Namespace, &s)
105
if _, mounted := mountedKeys[key]; mounted {
106
continue
107
}
108
mountedKeys[key] = struct{}{}
109
110
secretsPaths = append(secretsPaths, core_v1.KeyToPath{
111
Key: config.SanitizeLabelName(string(key)),
112
Path: fmt.Sprintf("secrets/%s/%s/%s", inst.Namespace, s.Name, s.Key),
113
})
114
}
115
116
for _, cm := range inst.Spec.ConfigMaps {
117
// We need to determine what the value for this ConfigMap was in the shared
118
// Secret resource.
119
key := assets.KeyForConfigMap(inst.Namespace, &cm)
120
if _, mounted := mountedKeys[key]; mounted {
121
continue
122
}
123
mountedKeys[key] = struct{}{}
124
125
secretsPaths = append(secretsPaths, core_v1.KeyToPath{
126
Key: config.SanitizeLabelName(string(key)),
127
Path: fmt.Sprintf("configMaps/%s/%s/%s", inst.Namespace, cm.Name, cm.Key),
128
})
129
}
130
}
131
132
if len(secretsPaths) > 0 {
133
// Load in references to Secrets and ConfigMaps.
134
integrationSecretsName := fmt.Sprintf("%s-integrations-secrets", d.Agent.Name)
135
136
integrationOpts.ExtraVolumes = append(integrationOpts.ExtraVolumes, core_v1.Volume{
137
Name: integrationSecretsName,
138
VolumeSource: core_v1.VolumeSource{
139
Secret: &core_v1.SecretVolumeSource{
140
// The reconcile-wide Secret holds all secrets and config maps
141
// integrations may have used.
142
SecretName: fmt.Sprintf("%s-secrets", d.Agent.Name),
143
Items: secretsPaths,
144
},
145
},
146
})
147
148
integrationOpts.ExtraVolumeMounts = append(integrationOpts.ExtraVolumeMounts, core_v1.VolumeMount{
149
Name: integrationSecretsName,
150
MountPath: "/etc/grafana-agent/integrations",
151
ReadOnly: true,
152
})
153
}
154
155
// Extra options to merge in.
156
//
157
// NOTE(rfratto): Merge order is important, as subsequent podTemplateOptions
158
// have placeholders necessary to generate configs.
159
var (
160
metricsOpts = metricsPodTemplateOptions(name, d, 0)
161
logsOpts = logsPodTemplateOptions()
162
)
163
return mergePodTemplateOptions(&integrationOpts, &metricsOpts, &logsOpts)
164
}
165
166
// mergePodTemplateOptions merges the provided inputs into a single
167
// podTemplateOptions. Precedence for existing values is taken in input order;
168
// if an environment variable is defined in both inputs[0] and inputs[1], the
169
// value from inputs[0] is used.
170
func mergePodTemplateOptions(inputs ...*podTemplateOptions) podTemplateOptions {
171
res := podTemplateOptions{
172
ExtraSelectorLabels: make(map[string]string),
173
}
174
175
// Volumes are unique by both mount path or name. If a mount path already
176
// exists, we want to ignore that volume and the respective volume mount
177
// that uses it.
178
179
var (
180
mountNames = map[string]struct{}{} // Consumed mount names
181
mountPaths = map[string]struct{}{} // Consumed mount paths
182
volumeNames = map[string]struct{}{} // Consumed volume names
183
varNames = map[string]struct{}{} // Consumed variable names
184
)
185
186
for _, input := range inputs {
187
for k, v := range input.ExtraSelectorLabels {
188
if _, exist := res.ExtraSelectorLabels[k]; exist {
189
continue
190
}
191
res.ExtraSelectorLabels[k] = v
192
}
193
194
// Merge in VolumeMounts before Volumes, allowing us to detect what volume
195
// names specific to this input should be ignored.
196
ignoreVolumes := map[string]struct{}{}
197
198
for _, vm := range input.ExtraVolumeMounts {
199
// Ignore a volume if the mount path or volume name already exists.
200
var (
201
_, exists = mountNames[vm.Name]
202
_, mounted = mountPaths[vm.MountPath]
203
)
204
if exists || mounted {
205
ignoreVolumes[vm.Name] = struct{}{}
206
continue
207
}
208
209
res.ExtraVolumeMounts = append(res.ExtraVolumeMounts, vm)
210
mountNames[vm.Name] = struct{}{}
211
mountPaths[vm.MountPath] = struct{}{}
212
}
213
214
// Merge in volumes that haven't been ignored or have a unique name.
215
for _, v := range input.ExtraVolumes {
216
if _, ignored := ignoreVolumes[v.Name]; ignored {
217
continue
218
} else if _, exists := volumeNames[v.Name]; exists {
219
continue
220
}
221
222
res.ExtraVolumes = append(res.ExtraVolumes, v)
223
volumeNames[v.Name] = struct{}{}
224
}
225
226
for _, ev := range input.ExtraEnvVars {
227
if _, exists := varNames[ev.Name]; exists {
228
continue
229
}
230
231
res.ExtraEnvVars = append(res.ExtraEnvVars, ev)
232
varNames[ev.Name] = struct{}{}
233
}
234
}
235
236
return res
237
}
238
239