Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/process/process.go
4096 views
1
package process
2
3
import (
4
"context"
5
"reflect"
6
"sync"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/common/loki"
10
"github.com/grafana/agent/component/loki/process/internal/stages"
11
)
12
13
func init() {
14
component.Register(component.Registration{
15
Name: "loki.process",
16
Args: Arguments{},
17
Exports: Exports{},
18
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
19
return New(opts, args.(Arguments))
20
},
21
})
22
}
23
24
// Arguments holds values which are used to configure the loki.process
25
// component.
26
type Arguments struct {
27
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
28
Stages []stages.StageConfig `river:"stage,enum,optional"`
29
}
30
31
// Exports exposes the receiver that can be used to send log entries to
32
// loki.process.
33
type Exports struct {
34
Receiver loki.LogsReceiver `river:"receiver,attr"`
35
}
36
37
var (
38
_ component.Component = (*Component)(nil)
39
)
40
41
// Component implements the loki.process component.
42
type Component struct {
43
opts component.Options
44
45
mut sync.RWMutex
46
receiver loki.LogsReceiver
47
fanout []loki.LogsReceiver
48
processIn chan<- loki.Entry
49
processOut chan loki.Entry
50
entryHandler loki.EntryHandler
51
stages []stages.StageConfig
52
}
53
54
// New creates a new loki.process component.
55
func New(o component.Options, args Arguments) (*Component, error) {
56
c := &Component{
57
opts: o,
58
}
59
60
// Create and immediately export the receiver which remains the same for
61
// the component's lifetime.
62
c.receiver = make(loki.LogsReceiver)
63
c.processOut = make(loki.LogsReceiver)
64
o.OnStateChange(Exports{Receiver: c.receiver})
65
66
// Call to Update() to start readers and set receivers once at the start.
67
if err := c.Update(args); err != nil {
68
return nil, err
69
}
70
71
return c, nil
72
}
73
74
// Run implements component.Component.
75
func (c *Component) Run(ctx context.Context) error {
76
defer func() {
77
c.mut.RLock()
78
if c.entryHandler != nil {
79
c.entryHandler.Stop()
80
}
81
close(c.processOut)
82
close(c.processIn)
83
c.mut.RUnlock()
84
}()
85
wg := &sync.WaitGroup{}
86
wg.Add(2)
87
go c.handleIn(ctx, wg)
88
go c.handleOut(ctx, wg)
89
90
wg.Wait()
91
return nil
92
}
93
94
// Update implements component.Component.
95
func (c *Component) Update(args component.Arguments) error {
96
newArgs := args.(Arguments)
97
98
c.mut.Lock()
99
defer c.mut.Unlock()
100
101
// We want to create a new pipeline if the config changed or if this is the
102
// first load. This will allow a component with no stages to function
103
// properly.
104
if stagesChanged(c.stages, newArgs.Stages) || c.stages == nil {
105
if c.entryHandler != nil {
106
c.entryHandler.Stop()
107
}
108
109
pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer)
110
if err != nil {
111
return err
112
}
113
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
114
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
115
c.stages = newArgs.Stages
116
}
117
118
c.fanout = newArgs.ForwardTo
119
120
return nil
121
}
122
123
func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
124
defer wg.Done()
125
for {
126
select {
127
case <-ctx.Done():
128
return
129
case entry := <-c.receiver:
130
c.mut.RLock()
131
select {
132
case <-ctx.Done():
133
return
134
case c.processIn <- entry:
135
// no-op
136
}
137
c.mut.RUnlock()
138
}
139
}
140
}
141
142
func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
143
defer wg.Done()
144
for {
145
select {
146
case <-ctx.Done():
147
return
148
case entry := <-c.processOut:
149
c.mut.RLock()
150
for _, f := range c.fanout {
151
select {
152
case <-ctx.Done():
153
return
154
case f <- entry:
155
// no-op
156
}
157
}
158
c.mut.RUnlock()
159
}
160
}
161
}
162
163
func stagesChanged(prev, next []stages.StageConfig) bool {
164
if len(prev) != len(next) {
165
return true
166
}
167
for i := range prev {
168
if !reflect.DeepEqual(prev[i], next[i]) {
169
return true
170
}
171
}
172
return false
173
}
174
175