Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/api/api.go
4096 views
1
package api
2
3
import (
4
"context"
5
"fmt"
6
"reflect"
7
"sync"
8
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/loki"
11
fnet "github.com/grafana/agent/component/common/net"
12
"github.com/grafana/agent/component/common/relabel"
13
"github.com/grafana/agent/component/loki/source/api/internal/lokipush"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/prometheus/client_golang/prometheus"
16
"github.com/prometheus/common/model"
17
)
18
19
func init() {
20
component.Register(component.Registration{
21
Name: "loki.source.api",
22
Args: Arguments{},
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
return New(opts, args.(Arguments))
25
},
26
})
27
}
28
29
type Arguments struct {
30
Server *fnet.ServerConfig `river:",squash"`
31
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
32
Labels map[string]string `river:"labels,attr,optional"`
33
RelabelRules relabel.Rules `river:"relabel_rules,attr,optional"`
34
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
35
}
36
37
func (a *Arguments) labelSet() model.LabelSet {
38
labelSet := make(model.LabelSet, len(a.Labels))
39
for k, v := range a.Labels {
40
labelSet[model.LabelName(k)] = model.LabelValue(v)
41
}
42
return labelSet
43
}
44
45
type Component struct {
46
opts component.Options
47
entriesChan chan loki.Entry
48
uncheckedCollector *util.UncheckedCollector
49
50
serverMut sync.Mutex
51
server *lokipush.PushAPIServer
52
53
// Use separate receivers mutex to address potential deadlock when Update drains the current server.
54
// e.g. https://github.com/grafana/agent/issues/3391
55
receiversMut sync.RWMutex
56
receivers []loki.LogsReceiver
57
}
58
59
func New(opts component.Options, args Arguments) (component.Component, error) {
60
c := &Component{
61
opts: opts,
62
entriesChan: make(chan loki.Entry),
63
receivers: args.ForwardTo,
64
uncheckedCollector: util.NewUncheckedCollector(nil),
65
}
66
opts.Registerer.MustRegister(c.uncheckedCollector)
67
err := c.Update(args)
68
if err != nil {
69
return nil, err
70
}
71
return c, nil
72
}
73
74
func (c *Component) Run(ctx context.Context) (err error) {
75
defer c.stop()
76
77
for {
78
select {
79
case entry := <-c.entriesChan:
80
c.receiversMut.RLock()
81
receivers := c.receivers
82
c.receiversMut.RUnlock()
83
84
for _, receiver := range receivers {
85
select {
86
case receiver <- entry:
87
case <-ctx.Done():
88
return
89
}
90
}
91
case <-ctx.Done():
92
return
93
}
94
}
95
}
96
97
func (c *Component) Update(args component.Arguments) error {
98
newArgs, ok := args.(Arguments)
99
if !ok {
100
return fmt.Errorf("invalid type of arguments: %T", args)
101
}
102
103
// if no server config provided, we'll use defaults
104
if newArgs.Server == nil {
105
newArgs.Server = &fnet.ServerConfig{}
106
}
107
// to avoid port conflicts, if no GRPC is configured, make sure we use a random port
108
// also, use localhost IP, so we don't require root to run.
109
if newArgs.Server.GRPC == nil {
110
newArgs.Server.GRPC = &fnet.GRPCConfig{
111
ListenPort: 0,
112
ListenAddress: "127.0.0.1",
113
}
114
}
115
116
c.receiversMut.Lock()
117
c.receivers = newArgs.ForwardTo
118
c.receiversMut.Unlock()
119
120
c.serverMut.Lock()
121
defer c.serverMut.Unlock()
122
serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server.ServerConfig(), *newArgs.Server)
123
if serverNeedsRestarting {
124
if c.server != nil {
125
c.server.Shutdown()
126
}
127
128
// [server.Server] registers new metrics every time it is created. To
129
// avoid issues with re-registering metrics with the same name, we create a
130
// new registry for the server every time we create one, and pass it to an
131
// unchecked collector to bypass uniqueness checking.
132
serverRegistry := prometheus.NewRegistry()
133
c.uncheckedCollector.SetCollector(serverRegistry)
134
135
var err error
136
c.server, err = lokipush.NewPushAPIServer(c.opts.Logger, newArgs.Server, loki.NewEntryHandler(c.entriesChan, func() {}), serverRegistry)
137
if err != nil {
138
return fmt.Errorf("failed to create embedded server: %v", err)
139
}
140
err = c.server.Run()
141
if err != nil {
142
return fmt.Errorf("failed to run embedded server: %v", err)
143
}
144
}
145
146
c.server.SetLabels(newArgs.labelSet())
147
c.server.SetRelabelRules(newArgs.RelabelRules)
148
c.server.SetKeepTimestamp(newArgs.UseIncomingTimestamp)
149
150
return nil
151
}
152
153
func (c *Component) stop() {
154
c.serverMut.Lock()
155
defer c.serverMut.Unlock()
156
if c.server != nil {
157
c.server.Shutdown()
158
c.server = nil
159
}
160
}
161
162