Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/hierarchy/hierarchy.go
4094 views
1
// Package hierarchy provides tools to discover a resource hierarchy. A
2
// resource hierarchy is made when a resource has a set of rules to discover
3
// other resources.
4
package hierarchy
5
6
import (
7
"context"
8
"fmt"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"k8s.io/apimachinery/pkg/runtime/schema"
15
"k8s.io/client-go/util/workqueue"
16
"sigs.k8s.io/controller-runtime/pkg/client"
17
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
18
"sigs.k8s.io/controller-runtime/pkg/event"
19
"sigs.k8s.io/controller-runtime/pkg/handler"
20
"sigs.k8s.io/controller-runtime/pkg/reconcile"
21
)
22
23
// Notifier can be attached to a controller and generate reconciles when
24
// objects inside of a resource hierarchy change.
25
type Notifier struct {
26
log log.Logger
27
client client.Client
28
29
watchersMut sync.RWMutex
30
watchers map[schema.GroupVersionKind][]Watcher
31
}
32
33
// Watcher is something watching for changes to a resource.
34
type Watcher struct {
35
Object client.Object // Object to watch for events against.
36
Owner client.ObjectKey // Owner to receive a reconcile for.
37
Selector Selector // Selector to use to match changed objects.
38
}
39
40
// NewNotifier creates a new Notifier which uses the provided client for
41
// performing hierarchy lookups.
42
func NewNotifier(l log.Logger, cli client.Client) *Notifier {
43
return &Notifier{
44
log: l,
45
client: cli,
46
watchers: make(map[schema.GroupVersionKind][]Watcher),
47
}
48
}
49
50
// EventHandler returns an event handler that can be given to
51
// controller.Watches.
52
//
53
// controller.Watches should be called once per type in the resource hierarchy.
54
// Each call to controller.Watches should use the same Notifier.
55
func (n *Notifier) EventHandler() handler.EventHandler {
56
// TODO(rfratto): It's possible to create a custom implementation of
57
// source.Source so we wouldn't have to call controller.Watches a bunch of
58
// times. I played around a little with an implementation but it was going to
59
// be a lot of work to dynamically spin up/down informers, so I put it aside
60
// for now. Maybe it's an improvement for the future.
61
return &notifierEventHandler{Notifier: n}
62
}
63
64
// Notify configures reconciles to be generated for a set of watchers when
65
// watched resources change.
66
//
67
// Notify appends to the list of watchers. To remove out notifications for a
68
// specific owner, call StopNotify.
69
func (n *Notifier) Notify(watchers ...Watcher) error {
70
n.watchersMut.Lock()
71
defer n.watchersMut.Unlock()
72
73
for _, w := range watchers {
74
gvk, err := apiutil.GVKForObject(w.Object, n.client.Scheme())
75
if err != nil {
76
return fmt.Errorf("could not get GVK: %w", err)
77
}
78
79
n.watchers[gvk] = append(n.watchers[gvk], w)
80
}
81
82
return nil
83
}
84
85
// StopNotify removes all watches for a specific owner.
86
func (n *Notifier) StopNotify(owner client.ObjectKey) {
87
n.watchersMut.Lock()
88
defer n.watchersMut.Unlock()
89
90
for key, watchers := range n.watchers {
91
rem := make([]Watcher, 0, len(watchers))
92
for _, w := range watchers {
93
if w.Owner != owner {
94
rem = append(rem, w)
95
}
96
}
97
n.watchers[key] = rem
98
}
99
}
100
101
type notifierEventHandler struct {
102
*Notifier
103
}
104
105
var _ handler.EventHandler = (*notifierEventHandler)(nil)
106
107
func (h *notifierEventHandler) Create(ev event.CreateEvent, q workqueue.RateLimitingInterface) {
108
h.handleEvent(ev.Object, q)
109
}
110
111
func (h *notifierEventHandler) Update(ev event.UpdateEvent, q workqueue.RateLimitingInterface) {
112
h.handleEvent(ev.ObjectOld, q)
113
h.handleEvent(ev.ObjectNew, q)
114
}
115
116
func (h *notifierEventHandler) Delete(ev event.DeleteEvent, q workqueue.RateLimitingInterface) {
117
h.handleEvent(ev.Object, q)
118
}
119
120
func (h *notifierEventHandler) Generic(ev event.GenericEvent, q workqueue.RateLimitingInterface) {
121
h.handleEvent(ev.Object, q)
122
}
123
124
func (h *notifierEventHandler) handleEvent(obj client.Object, q workqueue.RateLimitingInterface) {
125
h.watchersMut.RLock()
126
defer h.watchersMut.RUnlock()
127
128
gvk, err := apiutil.GVKForObject(obj, h.client.Scheme())
129
if err != nil {
130
level.Error(h.log).Log("msg", "failed to get gvk for object", "err", err)
131
return
132
}
133
134
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
135
defer cancel()
136
137
// Iterate through all of the watchers for the gvk and check to see if we
138
// should trigger a reconcile.
139
for _, watcher := range h.watchers[gvk] {
140
matches, err := watcher.Selector.Matches(ctx, h.client, obj)
141
if err != nil {
142
level.Error(h.log).Log("msg", "failed to handle notifier event", "err", err)
143
return
144
}
145
if matches {
146
q.Add(reconcile.Request{NamespacedName: watcher.Owner})
147
}
148
}
149
}
150
151