Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/operator.go
4094 views
1
package operator
2
3
import (
4
"context"
5
"flag"
6
"fmt"
7
"strings"
8
"sync"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/weaveworks/common/logging"
13
"k8s.io/apimachinery/pkg/labels"
14
"k8s.io/apimachinery/pkg/runtime"
15
controller "sigs.k8s.io/controller-runtime"
16
"sigs.k8s.io/controller-runtime/pkg/builder"
17
"sigs.k8s.io/controller-runtime/pkg/healthz"
18
"sigs.k8s.io/controller-runtime/pkg/manager"
19
"sigs.k8s.io/controller-runtime/pkg/predicate"
20
"sigs.k8s.io/controller-runtime/pkg/reconcile"
21
"sigs.k8s.io/controller-runtime/pkg/source"
22
23
gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"
24
"github.com/grafana/agent/pkg/operator/hierarchy"
25
promop_v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
26
promop "github.com/prometheus-operator/prometheus-operator/pkg/operator"
27
apps_v1 "k8s.io/api/apps/v1"
28
core_v1 "k8s.io/api/core/v1"
29
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
31
// Needed for clients.
32
_ "k8s.io/client-go/plugin/pkg/client/auth"
33
"k8s.io/client-go/rest"
34
)
35
36
// Config controls the configuration of the Operator.
37
type Config struct {
38
LogLevel logging.Level
39
LogFormat logging.Format
40
Labels promop.Labels
41
Controller controller.Options
42
AgentSelector string
43
KubelsetServiceName string
44
45
agentLabelSelector labels.Selector
46
47
// RestConfig used to connect to cluster. One will be generated based on the
48
// environment if not set.
49
RestConfig *rest.Config
50
51
// TODO(rfratto): extra settings from Prometheus Operator:
52
//
53
// 1. Reloader container image/requests/limits
54
// 2. Namespaces allow/denylist.
55
// 3. Namespaces for Prometheus resources.
56
}
57
58
// NewConfig creates a new Config and initializes default values.
59
// Flags will be registered against f if it is non-nil.
60
func NewConfig(f *flag.FlagSet) (*Config, error) {
61
if f == nil {
62
f = flag.NewFlagSet("temp", flag.PanicOnError)
63
}
64
65
var c Config
66
err := c.registerFlags(f)
67
if err != nil {
68
return nil, err
69
}
70
return &c, nil
71
}
72
73
func (c *Config) registerFlags(f *flag.FlagSet) error {
74
c.LogLevel.RegisterFlags(f)
75
c.LogFormat.RegisterFlags(f)
76
f.Var(&c.Labels, "labels", "Labels to add to all created operator resources")
77
f.StringVar(&c.AgentSelector, "agent-selector", "", "Label selector to discover GrafanaAgent CRs. Defaults to all GrafanaAgent CRs.")
78
79
f.StringVar(&c.Controller.Namespace, "namespace", "", "Namespace to restrict the Operator to.")
80
f.StringVar(&c.Controller.Host, "listen-host", "", "Host to listen on. Empty string means all interfaces.")
81
f.IntVar(&c.Controller.Port, "listen-port", 9443, "Port to listen on.")
82
f.StringVar(&c.Controller.MetricsBindAddress, "metrics-listen-address", ":8080", "Address to expose Operator metrics on")
83
f.StringVar(&c.Controller.HealthProbeBindAddress, "health-listen-address", "", "Address to expose Operator health probes on")
84
85
f.StringVar(&c.KubelsetServiceName, "kubelet-service", "", "Service and Endpoints objects to write kubelets into. Allows for monitoring Kubelet and cAdvisor metrics using a ServiceMonitor. Must be in format \"namespace/name\". If empty, nothing will be created.")
86
87
// Custom initial values for the endpoint names.
88
c.Controller.ReadinessEndpointName = "/-/ready"
89
c.Controller.LivenessEndpointName = "/-/healthy"
90
91
c.Controller.Scheme = runtime.NewScheme()
92
for _, add := range []func(*runtime.Scheme) error{
93
core_v1.AddToScheme,
94
apps_v1.AddToScheme,
95
gragent.AddToScheme,
96
promop_v1.AddToScheme,
97
} {
98
if err := add(c.Controller.Scheme); err != nil {
99
return fmt.Errorf("unable to register scheme: %w", err)
100
}
101
}
102
103
return nil
104
}
105
106
// Operator is the Grafana Agent Operator.
107
type Operator struct {
108
log log.Logger
109
manager manager.Manager
110
111
// New creates reconcilers to reconcile creating the kubelet service (if
112
// configured) and Grafana Agent deployments. We store them as
113
// lazyReconcilers so tests can update what the underlying reconciler
114
// implementation is.
115
116
kubeletReconciler *lazyReconciler // Unused if kubelet service unconfigured
117
agentReconciler *lazyReconciler
118
}
119
120
// New creates a new Operator.
121
func New(l log.Logger, c *Config) (*Operator, error) {
122
var (
123
lazyKubeletReconciler, lazyAgentReconciler lazyReconciler
124
)
125
126
restConfig := c.RestConfig
127
if restConfig == nil {
128
restConfig = controller.GetConfigOrDie()
129
}
130
manager, err := controller.NewManager(restConfig, c.Controller)
131
if err != nil {
132
return nil, fmt.Errorf("failed to create manager: %w", err)
133
}
134
135
if err := manager.AddReadyzCheck("running", healthz.Ping); err != nil {
136
level.Warn(l).Log("msg", "failed to set up 'running' readyz check", "err", err)
137
}
138
if err := manager.AddHealthzCheck("running", healthz.Ping); err != nil {
139
level.Warn(l).Log("msg", "failed to set up 'running' healthz check", "err", err)
140
}
141
142
var (
143
agentPredicates []predicate.Predicate
144
145
notifier = hierarchy.NewNotifier(log.With(l, "component", "hierarchy_notifier"), manager.GetClient())
146
notifierHandler = notifier.EventHandler()
147
)
148
149
// Initialize agentPredicates if an GrafanaAgent selector is configured.
150
if c.AgentSelector != "" {
151
sel, err := meta_v1.ParseToLabelSelector(c.AgentSelector)
152
if err != nil {
153
return nil, fmt.Errorf("unable to create predicate for selecting GrafanaAgent CRs: %w", err)
154
}
155
c.agentLabelSelector, err = meta_v1.LabelSelectorAsSelector(sel)
156
if err != nil {
157
return nil, fmt.Errorf("unable to create predicate for selecting GrafanaAgent CRs: %w", err)
158
}
159
selPredicate, err := predicate.LabelSelectorPredicate(*sel)
160
if err != nil {
161
return nil, fmt.Errorf("unable to create predicate for selecting GrafanaAgent CRs: %w", err)
162
}
163
agentPredicates = append(agentPredicates, selPredicate)
164
}
165
166
if c.KubelsetServiceName != "" {
167
parts := strings.Split(c.KubelsetServiceName, "/")
168
if len(parts) != 2 {
169
return nil, fmt.Errorf("invalid format for kubelet-service %q, must be formatted as \"namespace/name\"", c.KubelsetServiceName)
170
}
171
kubeletNamespace := parts[0]
172
kubeletName := parts[1]
173
174
err := controller.NewControllerManagedBy(manager).
175
For(&core_v1.Node{}).
176
Owns(&core_v1.Service{}).
177
Owns(&core_v1.Endpoints{}).
178
Complete(&lazyKubeletReconciler)
179
if err != nil {
180
return nil, fmt.Errorf("failed to create kubelet controller: %w", err)
181
}
182
183
lazyKubeletReconciler.Set(&kubeletReconciler{
184
Client: manager.GetClient(),
185
186
kubeletNamespace: kubeletNamespace,
187
kubeletName: kubeletName,
188
})
189
}
190
191
err = controller.NewControllerManagedBy(manager).
192
For(&gragent.GrafanaAgent{}, builder.WithPredicates(agentPredicates...)).
193
Owns(&apps_v1.StatefulSet{}).
194
Owns(&apps_v1.DaemonSet{}).
195
Owns(&apps_v1.Deployment{}).
196
Owns(&core_v1.Secret{}).
197
Owns(&core_v1.Service{}).
198
Watches(&source.Kind{Type: &core_v1.Secret{}}, notifierHandler).
199
Watches(&source.Kind{Type: &gragent.LogsInstance{}}, notifierHandler).
200
Watches(&source.Kind{Type: &gragent.PodLogs{}}, notifierHandler).
201
Watches(&source.Kind{Type: &gragent.MetricsInstance{}}, notifierHandler).
202
Watches(&source.Kind{Type: &gragent.Integration{}}, notifierHandler).
203
Watches(&source.Kind{Type: &promop_v1.PodMonitor{}}, notifierHandler).
204
Watches(&source.Kind{Type: &promop_v1.Probe{}}, notifierHandler).
205
Watches(&source.Kind{Type: &promop_v1.ServiceMonitor{}}, notifierHandler).
206
Watches(&source.Kind{Type: &core_v1.Secret{}}, notifierHandler).
207
Watches(&source.Kind{Type: &core_v1.ConfigMap{}}, notifierHandler).
208
Complete(&lazyAgentReconciler)
209
if err != nil {
210
return nil, fmt.Errorf("failed to create GrafanaAgent controller: %w", err)
211
}
212
213
lazyAgentReconciler.Set(&reconciler{
214
Client: manager.GetClient(),
215
scheme: manager.GetScheme(),
216
notifier: notifier,
217
config: c,
218
})
219
220
return &Operator{
221
log: l,
222
manager: manager,
223
224
kubeletReconciler: &lazyKubeletReconciler,
225
agentReconciler: &lazyAgentReconciler,
226
}, nil
227
}
228
229
// Start starts the operator. It will run until ctx is canceled.
230
func (o *Operator) Start(ctx context.Context) error {
231
return o.manager.Start(ctx)
232
}
233
234
type lazyReconciler struct {
235
mut sync.RWMutex
236
inner reconcile.Reconciler
237
}
238
239
// Get returns the current reconciler.
240
func (lr *lazyReconciler) Get() reconcile.Reconciler {
241
lr.mut.RLock()
242
defer lr.mut.RUnlock()
243
return lr.inner
244
}
245
246
// Set updates the current reconciler.
247
func (lr *lazyReconciler) Set(inner reconcile.Reconciler) {
248
lr.mut.Lock()
249
defer lr.mut.Unlock()
250
lr.inner = inner
251
}
252
253
// Wrap wraps the current reconciler with a middleware.
254
func (lr *lazyReconciler) Wrap(mw func(next reconcile.Reconciler) reconcile.Reconciler) {
255
lr.mut.Lock()
256
defer lr.mut.Unlock()
257
lr.inner = mw(lr.inner)
258
}
259
260
// Reconcile calls Reconcile against the current reconciler.
261
func (lr *lazyReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
262
lr.mut.RLock()
263
defer lr.mut.RUnlock()
264
if lr.inner == nil {
265
return reconcile.Result{}, fmt.Errorf("no reconciler")
266
}
267
return lr.inner.Reconcile(ctx, req)
268
}
269
270