Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/operator/common/component.go
5295 views
1
package common
2
3
import (
4
"context"
5
"sync"
6
"time"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/prometheus/operator"
11
)
12
13
type Component struct {
14
mut sync.Mutex
15
config *operator.Arguments
16
manager *crdManager
17
18
onUpdate chan struct{}
19
opts component.Options
20
healthMut sync.RWMutex
21
health component.Health
22
23
kind string
24
}
25
26
func New(o component.Options, args component.Arguments, kind string) (*Component, error) {
27
c := &Component{
28
opts: o,
29
onUpdate: make(chan struct{}, 1),
30
kind: kind,
31
}
32
return c, c.Update(args)
33
}
34
35
func (c *Component) CurrentHealth() component.Health {
36
c.healthMut.RLock()
37
defer c.healthMut.RUnlock()
38
return c.health
39
}
40
41
// Run implements component.Component.
42
func (c *Component) Run(ctx context.Context) error {
43
// innerCtx gets passed to things we create, so we can restart everything anytime we get an update.
44
// Ideally, this component has very little dynamic config, and won't have frequent updates.
45
var innerCtx context.Context
46
// cancel is the func we use to trigger a stop to all downstream processors we create
47
var cancel func()
48
defer func() {
49
if cancel != nil {
50
cancel()
51
}
52
}()
53
54
c.reportHealth(nil)
55
errChan := make(chan error, 1)
56
for {
57
select {
58
case <-ctx.Done():
59
if cancel != nil {
60
cancel()
61
}
62
return nil
63
case err := <-errChan:
64
c.reportHealth(err)
65
case <-c.onUpdate:
66
if cancel != nil {
67
cancel()
68
}
69
innerCtx, cancel = context.WithCancel(ctx)
70
c.mut.Lock()
71
componentCfg := c.config
72
manager := newCrdManager(c.opts, c.opts.Logger, componentCfg, c.kind)
73
c.manager = manager
74
c.mut.Unlock()
75
go func() {
76
if err := manager.Run(innerCtx); err != nil {
77
level.Error(c.opts.Logger).Log("msg", "error running crd manager", "err", err)
78
errChan <- err
79
}
80
}()
81
}
82
}
83
}
84
85
// Update implements component.Component.
86
func (c *Component) Update(args component.Arguments) error {
87
// TODO(jcreixell): Initialize manager here so we can return errors back early to the caller.
88
// See https://github.com/grafana/agent/pull/2688#discussion_r1152384425
89
c.mut.Lock()
90
cfg := args.(operator.Arguments)
91
c.config = &cfg
92
c.mut.Unlock()
93
select {
94
case c.onUpdate <- struct{}{}:
95
default:
96
}
97
return nil
98
}
99
100
// DebugInfo returns debug information for this component.
101
func (c *Component) DebugInfo() interface{} {
102
return c.manager.DebugInfo()
103
}
104
105
func (c *Component) reportHealth(err error) {
106
c.healthMut.Lock()
107
defer c.healthMut.Unlock()
108
109
if err != nil {
110
c.health = component.Health{
111
Health: component.HealthTypeUnhealthy,
112
Message: err.Error(),
113
UpdateTime: time.Now(),
114
}
115
return
116
} else {
117
c.health = component.Health{
118
Health: component.HealthTypeHealthy,
119
UpdateTime: time.Now(),
120
}
121
}
122
}
123
124