Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/podlogs/controller.go
5302 views
1
package podlogs
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"sync"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2"
13
corev1 "k8s.io/api/core/v1"
14
"k8s.io/apimachinery/pkg/runtime"
15
"k8s.io/client-go/rest"
16
toolscache "k8s.io/client-go/tools/cache"
17
"sigs.k8s.io/controller-runtime/pkg/cache"
18
"sigs.k8s.io/controller-runtime/pkg/client"
19
)
20
21
type controller struct {
22
log log.Logger
23
reconciler *reconciler
24
25
mut sync.RWMutex
26
informers cache.Informers
27
client client.Client
28
reloadCh chan struct{} // Written to when informers or client changes
29
30
reconcileCh chan struct{}
31
doneCh chan struct{}
32
}
33
34
// Generous timeout period for configuring all informers
35
const informerSyncTimeout = 10 * time.Second
36
37
// newController creates a new, unstarted controller. The controller will
38
// request a reconcile when the state of Kubernetes changes.
39
func newController(l log.Logger, reconciler *reconciler) *controller {
40
return &controller{
41
log: l,
42
reconciler: reconciler,
43
reloadCh: make(chan struct{}, 1),
44
45
reconcileCh: make(chan struct{}, 1),
46
doneCh: make(chan struct{}),
47
}
48
}
49
50
func (ctrl *controller) UpdateConfig(cfg *rest.Config) error {
51
scheme := runtime.NewScheme()
52
for _, add := range []func(*runtime.Scheme) error{
53
corev1.AddToScheme,
54
monitoringv1alpha2.AddToScheme,
55
} {
56
if err := add(scheme); err != nil {
57
return fmt.Errorf("unable to register scheme: %w", err)
58
}
59
}
60
61
cli, err := client.New(cfg, client.Options{Scheme: scheme})
62
if err != nil {
63
return err
64
}
65
66
cache, err := cache.New(cfg, cache.Options{Scheme: scheme})
67
if err != nil {
68
return err
69
}
70
71
delegateCli, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
72
CacheReader: cache,
73
Client: cli,
74
})
75
if err != nil {
76
return err
77
}
78
79
// Update the stored informers and client and schedule a reload.
80
ctrl.mut.Lock()
81
ctrl.informers = cache
82
ctrl.client = delegateCli
83
ctrl.mut.Unlock()
84
85
select {
86
case ctrl.reloadCh <- struct{}{}:
87
default:
88
// Reload is already scheduled
89
}
90
return nil
91
}
92
93
// Run the controller.
94
func (ctrl *controller) Run(ctx context.Context) error {
95
var (
96
cancel context.CancelFunc
97
informers cache.Informers
98
)
99
100
for {
101
select {
102
case <-ctx.Done():
103
return nil
104
case <-ctrl.reloadCh:
105
ctrl.mut.RLock()
106
var (
107
newInformers = ctrl.informers
108
newClient = ctrl.client
109
)
110
ctrl.mut.RUnlock()
111
112
// Stop old informers.
113
if informers != nil {
114
cancel()
115
}
116
117
informerContext, informerCancel := context.WithCancel(ctx)
118
119
go func() {
120
if err := ctrl.run(informerContext, newInformers, newClient); err != nil {
121
level.Error(ctrl.log).Log("msg", "failed to run controller", "err", err)
122
}
123
}()
124
125
cancel = informerCancel
126
informers = newInformers
127
}
128
}
129
}
130
131
func (ctrl *controller) run(ctx context.Context, informers cache.Informers, client client.Client) error {
132
level.Info(ctrl.log).Log("msg", "starting controller")
133
defer level.Info(ctrl.log).Log("msg", "controller exiting")
134
135
go func() {
136
err := informers.Start(ctx)
137
if err != nil && ctx.Err() != nil {
138
level.Error(ctrl.log).Log("msg", "failed to start informers", "err", err)
139
}
140
}()
141
142
if !informers.WaitForCacheSync(ctx) {
143
return fmt.Errorf("informer caches failed to sync")
144
}
145
146
if err := ctrl.configureInformers(ctx, informers); err != nil {
147
return fmt.Errorf("failed to configure informers: %w", err)
148
}
149
150
for {
151
select {
152
case <-ctx.Done():
153
return nil
154
case <-ctrl.reconcileCh:
155
if err := ctrl.reconciler.Reconcile(ctx, client); err != nil {
156
level.Error(ctrl.log).Log("msg", "reconcile failed", "err", err)
157
}
158
}
159
}
160
}
161
162
// configureInformers starts the informers used by this controller to perform reconciles.
163
func (ctrl *controller) configureInformers(ctx context.Context, informers cache.Informers) error {
164
// We want to re-reconcile the set of PodLogs whenever namespaces, pods, or
165
// PodLogs changes. Reconciling on namespaces and pods is important so that
166
// we can reevaluate selectors defined in PodLogs.
167
types := []client.Object{
168
&corev1.Namespace{},
169
&corev1.Pod{},
170
&monitoringv1alpha2.PodLogs{},
171
}
172
173
informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)
174
defer cancel()
175
176
for _, ty := range types {
177
informer, err := informers.GetInformer(informerCtx, ty)
178
if err != nil {
179
if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout
180
return fmt.Errorf("Timeout exceeded while configuring informers. Check the connection"+
181
" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", ty)
182
}
183
184
return err
185
}
186
_, err = informer.AddEventHandler(onChangeEventHandler{ChangeFunc: ctrl.RequestReconcile})
187
if err != nil {
188
return err
189
}
190
}
191
return nil
192
}
193
194
func (ctrl *controller) RequestReconcile() {
195
select {
196
case ctrl.reconcileCh <- struct{}{}:
197
default:
198
// Reconcile is already queued; do nothing.
199
}
200
}
201
202
// onChangeEventHandler implements [toolscache.ResourceEventHandler], calling
203
// ChangeFunc when any change occurs. Objects are not sent to the handler.
204
type onChangeEventHandler struct {
205
ChangeFunc func()
206
}
207
208
var _ toolscache.ResourceEventHandler = onChangeEventHandler{}
209
210
func (h onChangeEventHandler) OnAdd(_ interface{}) { h.ChangeFunc() }
211
func (h onChangeEventHandler) OnUpdate(_, _ interface{}) { h.ChangeFunc() }
212
func (h onChangeEventHandler) OnDelete(_ interface{}) { h.ChangeFunc() }
213
214