Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/write/write.go
4096 views
1
package write
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/common/loki"
10
"github.com/grafana/agent/component/common/loki/client"
11
"github.com/grafana/agent/pkg/build"
12
)
13
14
var streamLagLabels = []string{"filename"}
15
16
func init() {
17
component.Register(component.Registration{
18
Name: "loki.write",
19
Args: Arguments{},
20
Exports: Exports{},
21
22
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
23
return New(opts, args.(Arguments))
24
},
25
})
26
27
client.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
28
}
29
30
// Arguments holds values which are used to configure the loki.write component.
31
type Arguments struct {
32
Endpoints []EndpointOptions `river:"endpoint,block,optional"`
33
ExternalLabels map[string]string `river:"external_labels,attr,optional"`
34
MaxStreams int `river:"max_streams,attr,optional"`
35
}
36
37
// Exports holds the receiver that is used to send log entries to the
38
// loki.write component.
39
type Exports struct {
40
Receiver loki.LogsReceiver `river:"receiver,attr"`
41
}
42
43
var (
44
_ component.Component = (*Component)(nil)
45
)
46
47
// Component implements the loki.write component.
48
type Component struct {
49
opts component.Options
50
metrics *client.Metrics
51
52
mut sync.RWMutex
53
args Arguments
54
receiver loki.LogsReceiver
55
clients []client.Client
56
}
57
58
// New creates a new loki.write component.
59
func New(o component.Options, args Arguments) (*Component, error) {
60
c := &Component{
61
opts: o,
62
metrics: client.NewMetrics(o.Registerer, streamLagLabels),
63
}
64
65
// Create and immediately export the receiver which remains the same for
66
// the component's lifetime.
67
c.receiver = make(loki.LogsReceiver)
68
o.OnStateChange(Exports{Receiver: c.receiver})
69
70
// Call to Update() to start readers and set receivers once at the start.
71
if err := c.Update(args); err != nil {
72
return nil, err
73
}
74
75
return c, nil
76
}
77
78
// Run implements component.Component.
79
func (c *Component) Run(ctx context.Context) error {
80
for {
81
select {
82
case <-ctx.Done():
83
return nil
84
case entry := <-c.receiver:
85
for _, client := range c.clients {
86
if client != nil {
87
select {
88
case <-ctx.Done():
89
return nil
90
case client.Chan() <- entry:
91
// no-op
92
}
93
}
94
}
95
}
96
}
97
}
98
99
// Update implements component.Component.
100
func (c *Component) Update(args component.Arguments) error {
101
newArgs := args.(Arguments)
102
103
c.mut.Lock()
104
defer c.mut.Unlock()
105
c.args = newArgs
106
107
for _, client := range c.clients {
108
if client != nil {
109
client.Stop()
110
}
111
}
112
c.clients = make([]client.Client, len(newArgs.Endpoints))
113
114
cfgs := newArgs.convertClientConfigs()
115
// TODO (@tpaschalis) We could use a client.NewMulti here to push the
116
// fanout logic back to the client layer, but I opted to keep it explicit
117
// here a) for easier debugging and b) possible improvements in the future.
118
for _, cfg := range cfgs {
119
client, err := client.New(c.metrics, cfg, streamLagLabels, newArgs.MaxStreams, c.opts.Logger)
120
if err != nil {
121
return err
122
}
123
c.clients = append(c.clients, client)
124
}
125
126
return nil
127
}
128
129