Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/queue.go
4096 views
1
package controller
2
3
import "sync"
4
5
// Queue is an unordered queue of components.
6
//
7
// Queue is intended for tracking components that have updated their Exports
8
// for later reevaluation.
9
type Queue struct {
10
mut sync.Mutex
11
queued map[*ComponentNode]struct{}
12
13
updateCh chan struct{}
14
}
15
16
// NewQueue returns a new unordered component queue.
17
func NewQueue() *Queue {
18
return &Queue{
19
updateCh: make(chan struct{}, 1),
20
queued: make(map[*ComponentNode]struct{}),
21
}
22
}
23
24
// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the
25
// component is already in the Queue.
26
func (q *Queue) Enqueue(c *ComponentNode) {
27
q.mut.Lock()
28
defer q.mut.Unlock()
29
q.queued[c] = struct{}{}
30
select {
31
case q.updateCh <- struct{}{}:
32
default:
33
}
34
}
35
36
// Chan returns a channel which is written to when the queue is non-empty.
37
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }
38
39
// TryDequeue dequeues a randomly queued component. TryDequeue will return nil
40
// if the queue is empty.
41
func (q *Queue) TryDequeue() *ComponentNode {
42
q.mut.Lock()
43
defer q.mut.Unlock()
44
45
for c := range q.queued {
46
delete(q.queued, c)
47
return c
48
}
49
50
return nil
51
}
52
53