Path: blob/main/pkg/flow/internal/controller/queue.go
4096 views
package controller12import "sync"34// Queue is an unordered queue of components.5//6// Queue is intended for tracking components that have updated their Exports7// for later reevaluation.8type Queue struct {9mut sync.Mutex10queued map[*ComponentNode]struct{}1112updateCh chan struct{}13}1415// NewQueue returns a new unordered component queue.16func NewQueue() *Queue {17return &Queue{18updateCh: make(chan struct{}, 1),19queued: make(map[*ComponentNode]struct{}),20}21}2223// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the24// component is already in the Queue.25func (q *Queue) Enqueue(c *ComponentNode) {26q.mut.Lock()27defer q.mut.Unlock()28q.queued[c] = struct{}{}29select {30case q.updateCh <- struct{}{}:31default:32}33}3435// Chan returns a channel which is written to when the queue is non-empty.36func (q *Queue) Chan() <-chan struct{} { return q.updateCh }3738// TryDequeue dequeues a randomly queued component. TryDequeue will return nil39// if the queue is empty.40func (q *Queue) TryDequeue() *ComponentNode {41q.mut.Lock()42defer q.mut.Unlock()4344for c := range q.queued {45delete(q.queued, c)46return c47}4849return nil50}515253