Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/echo/echo.go
4096 views
1
package echo
2
3
import (
4
"context"
5
"sync"
6
7
"github.com/go-kit/log/level"
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/common/loki"
10
)
11
12
func init() {
13
component.Register(component.Registration{
14
Name: "loki.echo",
15
Args: Arguments{},
16
Exports: Exports{},
17
18
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
19
return New(opts, args.(Arguments))
20
},
21
})
22
}
23
24
// Arguments holds values which are used to configure the loki.echo
25
// component.
26
type Arguments struct{}
27
28
// Exports holds the values exported by the loki.echo component.
29
type Exports struct {
30
Receiver loki.LogsReceiver `river:"receiver,attr"`
31
}
32
33
// DefaultArguments defines the default settings for log scraping.
34
var DefaultArguments = Arguments{}
35
36
// UnmarshalRiver implements river.Unmarshaler.
37
func (arg *Arguments) UnmarshalRiver(f func(interface{}) error) error {
38
*arg = DefaultArguments
39
40
type args Arguments
41
return f((*args)(arg))
42
}
43
44
var (
45
_ component.Component = (*Component)(nil)
46
)
47
48
// Component implements the loki.source.file component.
49
type Component struct {
50
opts component.Options
51
52
mut sync.RWMutex
53
args Arguments
54
receiver loki.LogsReceiver
55
}
56
57
// New creates a new loki.echo component.
58
func New(o component.Options, args Arguments) (*Component, error) {
59
ch := make(chan loki.Entry)
60
c := &Component{
61
opts: o,
62
receiver: ch,
63
}
64
65
// Call to Update() once at the start.
66
if err := c.Update(args); err != nil {
67
return nil, err
68
}
69
70
// Immediately export the receiver which remains the same for the component
71
// lifetime.
72
o.OnStateChange(Exports{Receiver: c.receiver})
73
74
return c, nil
75
}
76
77
// Run implements component.Component.
78
func (c *Component) Run(ctx context.Context) error {
79
for {
80
select {
81
case <-ctx.Done():
82
return nil
83
case entry := <-c.receiver:
84
level.Info(c.opts.Logger).Log("receiver", c.opts.ID, "entry", entry.Line, "labels", entry.Labels.String())
85
}
86
}
87
}
88
89
// Update implements component.Component.
90
func (c *Component) Update(args component.Arguments) error {
91
newArgs := args.(Arguments)
92
93
c.mut.Lock()
94
defer c.mut.Unlock()
95
c.args = newArgs
96
97
return nil
98
}
99
100