Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/reconciler_metrics.go
4094 views
1
package operator
2
3
import (
4
"bytes"
5
"compress/gzip"
6
"context"
7
"errors"
8
"fmt"
9
"os"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/google/go-jsonnet"
14
gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"
15
"github.com/grafana/agent/pkg/operator/clientutil"
16
"github.com/grafana/agent/pkg/operator/config"
17
apps_v1 "k8s.io/api/apps/v1"
18
core_v1 "k8s.io/api/core/v1"
19
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
"k8s.io/apimachinery/pkg/labels"
21
"k8s.io/apimachinery/pkg/types"
22
"k8s.io/utils/pointer"
23
"sigs.k8s.io/controller-runtime/pkg/client"
24
)
25
26
// createMetricsConfigurationSecret creates the Grafana Agent metrics configuration and stores
27
// it into a secret.
28
func (r *reconciler) createMetricsConfigurationSecret(
29
ctx context.Context,
30
l log.Logger,
31
d gragent.Deployment,
32
) error {
33
34
name := fmt.Sprintf("%s-config", d.Agent.Name)
35
return r.createTelemetryConfigurationSecret(ctx, l, name, d, config.MetricsType)
36
}
37
38
func (r *reconciler) createTelemetryConfigurationSecret(
39
ctx context.Context,
40
l log.Logger,
41
name string,
42
d gragent.Deployment,
43
ty config.Type,
44
) error {
45
46
key := types.NamespacedName{
47
Namespace: d.Agent.Namespace,
48
Name: name,
49
}
50
51
var shouldCreate bool
52
switch ty {
53
case config.MetricsType:
54
shouldCreate = len(d.Metrics) > 0
55
case config.LogsType:
56
shouldCreate = len(d.Logs) > 0
57
case config.IntegrationsType:
58
shouldCreate = len(d.Integrations) > 0
59
default:
60
return fmt.Errorf("unknown telemetry type %s", ty)
61
}
62
63
// Delete the old Secret if one exists and we have nothing to create.
64
if !shouldCreate {
65
var secret core_v1.Secret
66
return deleteManagedResource(ctx, r.Client, key, &secret)
67
}
68
69
rawConfig, err := config.BuildConfig(&d, ty)
70
71
var jsonnetError jsonnet.RuntimeError
72
if errors.As(err, &jsonnetError) {
73
// Dump Jsonnet errors to the console to retain newlines and make them
74
// easier to digest.
75
fmt.Fprintf(os.Stderr, "%s", jsonnetError.Error())
76
}
77
if err != nil {
78
return fmt.Errorf("unable to build config: %w", err)
79
}
80
81
const maxUncompressed = 100 * 1024 // only compress secrets over 100kB
82
rawBytes := []byte(rawConfig)
83
if len(rawBytes) > maxUncompressed {
84
buf := &bytes.Buffer{}
85
w := gzip.NewWriter(buf)
86
if _, err = w.Write(rawBytes); err != nil {
87
return fmt.Errorf("unable to compress config: %w", err)
88
}
89
if err = w.Close(); err != nil {
90
return fmt.Errorf("closing gzip writer: %w", err)
91
}
92
rawBytes = buf.Bytes()
93
}
94
95
secret := core_v1.Secret{
96
ObjectMeta: v1.ObjectMeta{
97
Namespace: key.Namespace,
98
Name: key.Name,
99
Labels: r.config.Labels.Merge(managedByOperatorLabels),
100
OwnerReferences: []v1.OwnerReference{{
101
APIVersion: d.Agent.APIVersion,
102
BlockOwnerDeletion: pointer.Bool(true),
103
Kind: d.Agent.Kind,
104
Name: d.Agent.Name,
105
UID: d.Agent.UID,
106
}},
107
},
108
Data: map[string][]byte{"agent.yml": rawBytes},
109
}
110
111
level.Info(l).Log("msg", "reconciling secret", "secret", secret.Name)
112
err = clientutil.CreateOrUpdateSecret(ctx, r.Client, &secret)
113
if err != nil {
114
return fmt.Errorf("failed to reconcile secret: %w", err)
115
}
116
return nil
117
}
118
119
// createMetricsGoverningService creates the service that governs the (eventual)
120
// StatefulSet. It must be created before the StatefulSet.
121
func (r *reconciler) createMetricsGoverningService(
122
ctx context.Context,
123
l log.Logger,
124
d gragent.Deployment,
125
) error {
126
127
svc := generateMetricsStatefulSetService(r.config, d)
128
129
// Delete the old Service if one exists and we have no prometheus instances.
130
if len(d.Metrics) == 0 {
131
var service core_v1.Service
132
key := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
133
return deleteManagedResource(ctx, r.Client, key, &service)
134
}
135
136
level.Info(l).Log("msg", "reconciling statefulset service", "service", svc.Name)
137
err := clientutil.CreateOrUpdateService(ctx, r.Client, svc)
138
if err != nil {
139
return fmt.Errorf("failed to reconcile statefulset governing service: %w", err)
140
}
141
return nil
142
}
143
144
// createMetricsStatefulSets creates a set of Grafana Agent StatefulSets, one per shard.
145
func (r *reconciler) createMetricsStatefulSets(
146
ctx context.Context,
147
l log.Logger,
148
d gragent.Deployment,
149
) error {
150
151
shards := minShards
152
if reqShards := d.Agent.Spec.Metrics.Shards; reqShards != nil && *reqShards > 1 {
153
shards = *reqShards
154
}
155
156
// Keep track of generated stateful sets so we can delete ones that should
157
// no longer exist.
158
generated := make(map[string]struct{})
159
160
for shard := int32(0); shard < shards; shard++ {
161
// Don't generate anything if there weren't any instances.
162
if len(d.Metrics) == 0 {
163
continue
164
}
165
166
name := d.Agent.Name
167
if shard > 0 {
168
name = fmt.Sprintf("%s-shard-%d", name, shard)
169
}
170
171
ss, err := generateMetricsStatefulSet(r.config, name, d, shard)
172
if err != nil {
173
return fmt.Errorf("failed to generate statefulset for shard: %w", err)
174
}
175
176
level.Info(l).Log("msg", "reconciling statefulset", "statefulset", ss.Name)
177
err = clientutil.CreateOrUpdateStatefulSet(ctx, r.Client, ss, l)
178
if err != nil {
179
return fmt.Errorf("failed to reconcile statefulset for shard: %w", err)
180
}
181
generated[ss.Name] = struct{}{}
182
}
183
184
// Clean up statefulsets that should no longer exist.
185
var statefulSets apps_v1.StatefulSetList
186
err := r.List(ctx, &statefulSets, &client.ListOptions{
187
LabelSelector: labels.SelectorFromSet(labels.Set{
188
managedByOperatorLabel: managedByOperatorLabelValue,
189
agentNameLabelName: d.Agent.Name,
190
}),
191
})
192
if err != nil {
193
return fmt.Errorf("failed to list statefulsets: %w", err)
194
}
195
for _, ss := range statefulSets.Items {
196
if _, keep := generated[ss.Name]; keep || !isManagedResource(&ss) {
197
continue
198
}
199
level.Info(l).Log("msg", "deleting stale statefulset", "name", ss.Name)
200
if err := r.Delete(ctx, &ss); err != nil {
201
return fmt.Errorf("failed to delete stale statefulset %s: %w", ss.Name, err)
202
}
203
}
204
205
return nil
206
}
207
208