Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/workers.go
5304 views
1
package integrations
2
3
import (
4
"context"
5
"sync"
6
7
"github.com/go-kit/log"
8
"github.com/go-kit/log/level"
9
)
10
11
type workerPool struct {
12
log log.Logger
13
parentCtx context.Context
14
15
mut sync.Mutex
16
workers map[*controlledIntegration]worker
17
18
runningWorkers sync.WaitGroup
19
}
20
21
type worker struct {
22
ci *controlledIntegration
23
stop context.CancelFunc
24
exited chan struct{}
25
}
26
27
func newWorkerPool(ctx context.Context, l log.Logger) *workerPool {
28
return &workerPool{
29
log: l,
30
parentCtx: ctx,
31
32
workers: make(map[*controlledIntegration]worker),
33
}
34
}
35
36
func (p *workerPool) Reload(newIntegrations []*controlledIntegration) {
37
p.mut.Lock()
38
defer p.mut.Unlock()
39
40
level.Debug(p.log).Log("msg", "updating running integrations", "prev_count", len(p.workers), "new_count", len(newIntegrations))
41
42
// Shut down workers whose integrations have gone away.
43
var stopped []worker
44
for ci, w := range p.workers {
45
var found bool
46
for _, current := range newIntegrations {
47
if ci == current {
48
found = true
49
break
50
}
51
}
52
if !found {
53
w.stop()
54
stopped = append(stopped, w)
55
}
56
}
57
for _, w := range stopped {
58
// Wait for stopped integrations to fully exit. We do this in a separate
59
// loop so context cancellations can be handled simultaneously, allowing
60
// the wait to complete faster.
61
<-w.exited
62
}
63
64
// Spawn new workers for integrations that don't have them.
65
for _, current := range newIntegrations {
66
if _, workerExists := p.workers[current]; workerExists {
67
continue
68
}
69
// This integration doesn't have an existing worker; schedule a new one.
70
p.scheduleWorker(current)
71
}
72
}
73
74
func (p *workerPool) Close() {
75
p.mut.Lock()
76
defer p.mut.Unlock()
77
78
level.Debug(p.log).Log("msg", "stopping all integrations")
79
80
defer p.runningWorkers.Wait()
81
for _, w := range p.workers {
82
w.stop()
83
}
84
}
85
86
func (p *workerPool) scheduleWorker(ci *controlledIntegration) {
87
p.runningWorkers.Add(1)
88
89
ctx, cancel := context.WithCancel(p.parentCtx)
90
91
w := worker{
92
ci: ci,
93
stop: cancel,
94
exited: make(chan struct{}),
95
}
96
p.workers[ci] = w
97
98
go func() {
99
ci.running.Store(true)
100
101
// When the integration stops running, we want to free any of our
102
// resources that will notify watchers waiting for the worker to stop.
103
//
104
// Afterwards, we'll block until we remove ourselves from the map; having
105
// a worker remove itself on shutdown allows exited integrations to
106
// re-start when the config is reloaded.
107
defer func() {
108
ci.running.Store(false)
109
close(w.exited)
110
p.runningWorkers.Done()
111
112
p.mut.Lock()
113
defer p.mut.Unlock()
114
delete(p.workers, ci)
115
}()
116
117
err := ci.i.RunIntegration(ctx)
118
if err != nil {
119
level.Error(p.log).Log("msg", "integration exited with error", "id", ci.id, "err", err)
120
}
121
}()
122
}
123
124