Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/internal/scheduler/scheduler.go
4096 views
1
// Package scheduler exposes utilities for scheduling and running OpenTelemetry
2
// Collector components.
3
package scheduler
4
5
import (
6
"context"
7
"fmt"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/grafana/agent/component"
14
otelcomponent "go.opentelemetry.io/collector/component"
15
"go.uber.org/multierr"
16
)
17
18
// Scheduler implements manages a set of OpenTelemetry Collector components.
19
// Scheduler is intended to be used from Flow components which need to schedule
20
// OpenTelemetry Collector components; it does not implement the full
21
// component.Component interface.
22
//
23
// Each OpenTelemetry Collector component has one instance per supported
24
// telemetry signal, which is why Scheduler supports multiple components. For
25
// example, when creating the otlpreceiver component, you would have three
26
// total instances: one for logs, one for metrics, and one for traces.
27
// Scheduler should only be used to manage the different signals of the same
28
// OpenTelemetry Collector component; this means that otlpreceiver and
29
// jaegerreceiver should not share the same Scheduler.
30
type Scheduler struct {
31
log log.Logger
32
33
healthMut sync.RWMutex
34
health component.Health
35
36
schedMut sync.Mutex
37
schedComponents []otelcomponent.Component // Most recently created components
38
host otelcomponent.Host
39
40
// newComponentsCh is written to when schedComponents gets updated.
41
newComponentsCh chan struct{}
42
}
43
44
// New creates a new unstarted Scheduler. Call Run to start it, and call
45
// Schedule to schedule components to run.
46
func New(l log.Logger) *Scheduler {
47
return &Scheduler{
48
log: l,
49
newComponentsCh: make(chan struct{}, 1),
50
}
51
}
52
53
// Schedule schedules a new set of OpenTelemetry Components to run. Components
54
// will only be scheduled when the Scheduler is running.
55
//
56
// Schedule completely overrides the set of previously running components;
57
// components which have been removed since the last call to Schedule will be
58
// stopped.
59
func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
60
cs.schedMut.Lock()
61
defer cs.schedMut.Unlock()
62
63
cs.schedComponents = cc
64
cs.host = h
65
66
select {
67
case cs.newComponentsCh <- struct{}{}:
68
// Queued new message.
69
default:
70
// A message is already queued for refreshing running components so we
71
// don't have to do anything here.
72
}
73
}
74
75
// Run starts the Scheduler. Run will watch for schedule components to appear
76
// and run them, terminating previously running components if they exist.
77
func (cs *Scheduler) Run(ctx context.Context) error {
78
var components []otelcomponent.Component
79
80
// Make sure we terminate all of our running components on shutdown.
81
defer func() {
82
cs.stopComponents(context.Background(), components...)
83
}()
84
85
// Wait for a write to cs.newComponentsCh. The initial list of components is
86
// always empty so there's nothing to do until cs.newComponentsCh is written
87
// to.
88
for {
89
select {
90
case <-ctx.Done():
91
return nil
92
case <-cs.newComponentsCh:
93
// Stop the old components before running new scheduled ones.
94
cs.stopComponents(ctx, components...)
95
96
cs.schedMut.Lock()
97
components = cs.schedComponents
98
host := cs.host
99
cs.schedMut.Unlock()
100
101
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
102
components = cs.startComponents(ctx, host, components...)
103
}
104
}
105
}
106
107
func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
108
for _, c := range cc {
109
if err := c.Shutdown(ctx); err != nil {
110
level.Error(cs.log).Log("msg", "failed to stop scheduled component; future updates may fail", "err", err)
111
}
112
}
113
}
114
115
// startComponent schedules the provided components from cc. It then returns
116
// the list of components which started successfully.
117
func (cs *Scheduler) startComponents(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) (started []otelcomponent.Component) {
118
var errs error
119
120
for _, c := range cc {
121
if err := c.Start(ctx, h); err != nil {
122
level.Error(cs.log).Log("msg", "failed to start scheduled component", "err", err)
123
errs = multierr.Append(errs, err)
124
} else {
125
started = append(started, c)
126
}
127
}
128
129
if errs != nil {
130
cs.setHealth(component.Health{
131
Health: component.HealthTypeUnhealthy,
132
Message: fmt.Sprintf("failed to create components: %s", errs),
133
UpdateTime: time.Now(),
134
})
135
} else {
136
cs.setHealth(component.Health{
137
Health: component.HealthTypeHealthy,
138
Message: "started scheduled components",
139
UpdateTime: time.Now(),
140
})
141
}
142
143
return started
144
}
145
146
// CurrentHealth implements component.HealthComponent. The component is
147
// reported as healthy when the most recent set of scheduled components were
148
// started successfully.
149
func (cs *Scheduler) CurrentHealth() component.Health {
150
cs.healthMut.RLock()
151
defer cs.healthMut.RUnlock()
152
return cs.health
153
}
154
155
func (cs *Scheduler) setHealth(h component.Health) {
156
cs.healthMut.Lock()
157
defer cs.healthMut.Unlock()
158
cs.health = h
159
}
160
161