Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/gcplog.go
4096 views
1
package gcplog
2
3
import (
4
"context"
5
"fmt"
6
"strings"
7
"sync"
8
9
"github.com/go-kit/log/level"
10
"github.com/prometheus/client_golang/prometheus"
11
"github.com/prometheus/prometheus/model/relabel"
12
13
"github.com/grafana/agent/component"
14
"github.com/grafana/agent/component/common/loki"
15
flow_relabel "github.com/grafana/agent/component/common/relabel"
16
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
17
"github.com/grafana/agent/pkg/util"
18
)
19
20
func init() {
21
component.Register(component.Registration{
22
Name: "loki.source.gcplog",
23
Args: Arguments{},
24
25
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
26
return New(opts, args.(Arguments))
27
},
28
})
29
}
30
31
// Arguments holds values which are used to configure the loki.source.gcplog
32
// component.
33
type Arguments struct {
34
// TODO(@tpaschalis) Having these types defined in an internal package
35
// means that an external caller cannot build this component's Arguments
36
// by hand for now.
37
PullTarget *gt.PullConfig `river:"pull,block,optional"`
38
PushTarget *gt.PushConfig `river:"push,block,optional"`
39
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
40
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
41
}
42
43
// UnmarshalRiver implements the unmarshaller
44
func (a *Arguments) UnmarshalRiver(f func(v interface{}) error) error {
45
*a = Arguments{}
46
type arguments Arguments
47
err := f((*arguments)(a))
48
if err != nil {
49
return err
50
}
51
52
if (a.PullTarget != nil) == (a.PushTarget != nil) {
53
return fmt.Errorf("exactly one of 'push' or 'pull' must be provided")
54
}
55
return nil
56
}
57
58
// Component implements the loki.source.gcplog component.
59
type Component struct {
60
opts component.Options
61
metrics *gt.Metrics
62
serverMetrics *util.UncheckedCollector
63
64
mut sync.RWMutex
65
fanout []loki.LogsReceiver
66
target gt.Target
67
68
handler loki.LogsReceiver
69
}
70
71
// New creates a new loki.source.gcplog component.
72
func New(o component.Options, args Arguments) (*Component, error) {
73
c := &Component{
74
opts: o,
75
metrics: gt.NewMetrics(o.Registerer),
76
handler: make(loki.LogsReceiver),
77
fanout: args.ForwardTo,
78
serverMetrics: util.NewUncheckedCollector(nil),
79
}
80
81
o.Registerer.MustRegister(c.serverMetrics)
82
83
// Call to Update() to start readers and set receivers once at the start.
84
if err := c.Update(args); err != nil {
85
return nil, err
86
}
87
88
return c, nil
89
}
90
91
// Run implements component.Component.
92
func (c *Component) Run(ctx context.Context) error {
93
defer func() {
94
level.Info(c.opts.Logger).Log("msg", "loki.source.gcplog component shutting down, stopping the targets")
95
c.mut.RLock()
96
err := c.target.Stop()
97
if err != nil {
98
level.Error(c.opts.Logger).Log("msg", "error while stopping gcplog target", "err", err)
99
}
100
c.mut.RUnlock()
101
}()
102
103
for {
104
select {
105
case <-ctx.Done():
106
return nil
107
case entry := <-c.handler:
108
c.mut.RLock()
109
for _, receiver := range c.fanout {
110
receiver <- entry
111
}
112
c.mut.RUnlock()
113
}
114
}
115
}
116
117
// Update implements component.Component.
118
func (c *Component) Update(args component.Arguments) error {
119
c.mut.Lock()
120
defer c.mut.Unlock()
121
122
newArgs := args.(Arguments)
123
c.fanout = newArgs.ForwardTo
124
125
var rcs []*relabel.Config
126
if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {
127
rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
128
}
129
130
if c.target != nil {
131
err := c.target.Stop()
132
if err != nil {
133
level.Error(c.opts.Logger).Log("msg", "error while stopping gcplog target", "err", err)
134
}
135
}
136
entryHandler := loki.NewEntryHandler(c.handler, func() {})
137
jobName := strings.Replace(c.opts.ID, ".", "_", -1)
138
139
if newArgs.PullTarget != nil {
140
// TODO(@tpaschalis) Are there any options from "google.golang.org/api/option"
141
// we should expose as configuration and pass here?
142
t, err := gt.NewPullTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PullTarget, rcs)
143
if err != nil {
144
level.Error(c.opts.Logger).Log("msg", "failed to create gcplog target with provided config", "err", err)
145
return err
146
}
147
c.target = t
148
}
149
if newArgs.PushTarget != nil {
150
// [gt.NewPushTarget] registers new metrics every time it is called. To
151
// avoid issues with re-registering metrics with the same name, we create a
152
// new registry for the target every time we create one, and pass it to an
153
// unchecked collector to bypass uniqueness checking.
154
registry := prometheus.NewRegistry()
155
c.serverMetrics.SetCollector(registry)
156
157
t, err := gt.NewPushTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PushTarget, rcs, registry)
158
if err != nil {
159
level.Error(c.opts.Logger).Log("msg", "failed to create gcplog target with provided config", "err", err)
160
return err
161
}
162
c.target = t
163
}
164
165
return nil
166
}
167
168
// DebugInfo returns information about the status of targets.
169
func (c *Component) DebugInfo() interface{} {
170
c.mut.RLock()
171
defer c.mut.RUnlock()
172
return targetDebugInfo{Details: c.target.Details()}
173
}
174
175
type targetDebugInfo struct {
176
Details map[string]string `river:"target_info,attr"`
177
}
178
179