package hierarchy
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type Notifier struct {
log log.Logger
client client.Client
watchersMut sync.RWMutex
watchers map[schema.GroupVersionKind][]Watcher
}
type Watcher struct {
Object client.Object
Owner client.ObjectKey
Selector Selector
}
func NewNotifier(l log.Logger, cli client.Client) *Notifier {
return &Notifier{
log: l,
client: cli,
watchers: make(map[schema.GroupVersionKind][]Watcher),
}
}
func (n *Notifier) EventHandler() handler.EventHandler {
return ¬ifierEventHandler{Notifier: n}
}
func (n *Notifier) Notify(watchers ...Watcher) error {
n.watchersMut.Lock()
defer n.watchersMut.Unlock()
for _, w := range watchers {
gvk, err := apiutil.GVKForObject(w.Object, n.client.Scheme())
if err != nil {
return fmt.Errorf("could not get GVK: %w", err)
}
n.watchers[gvk] = append(n.watchers[gvk], w)
}
return nil
}
func (n *Notifier) StopNotify(owner client.ObjectKey) {
n.watchersMut.Lock()
defer n.watchersMut.Unlock()
for key, watchers := range n.watchers {
rem := make([]Watcher, 0, len(watchers))
for _, w := range watchers {
if w.Owner != owner {
rem = append(rem, w)
}
}
n.watchers[key] = rem
}
}
type notifierEventHandler struct {
*Notifier
}
var _ handler.EventHandler = (*notifierEventHandler)(nil)
func (h *notifierEventHandler) Create(ev event.CreateEvent, q workqueue.RateLimitingInterface) {
h.handleEvent(ev.Object, q)
}
func (h *notifierEventHandler) Update(ev event.UpdateEvent, q workqueue.RateLimitingInterface) {
h.handleEvent(ev.ObjectOld, q)
h.handleEvent(ev.ObjectNew, q)
}
func (h *notifierEventHandler) Delete(ev event.DeleteEvent, q workqueue.RateLimitingInterface) {
h.handleEvent(ev.Object, q)
}
func (h *notifierEventHandler) Generic(ev event.GenericEvent, q workqueue.RateLimitingInterface) {
h.handleEvent(ev.Object, q)
}
func (h *notifierEventHandler) handleEvent(obj client.Object, q workqueue.RateLimitingInterface) {
h.watchersMut.RLock()
defer h.watchersMut.RUnlock()
gvk, err := apiutil.GVKForObject(obj, h.client.Scheme())
if err != nil {
level.Error(h.log).Log("msg", "failed to get gvk for object", "err", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, watcher := range h.watchers[gvk] {
matches, err := watcher.Selector.Matches(ctx, h.client, obj)
if err != nil {
level.Error(h.log).Log("msg", "failed to handle notifier event", "err", err)
return
}
if matches {
q.Add(reconcile.Request{NamespacedName: watcher.Owner})
}
}
}