Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/cloudflare/cloudflare.go
4096 views
1
package cloudflare
2
3
import (
4
"context"
5
"fmt"
6
"os"
7
"path/filepath"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/agent/component/common/loki/positions"
15
cft "github.com/grafana/agent/component/loki/source/cloudflare/internal/cloudflaretarget"
16
"github.com/grafana/agent/pkg/river"
17
"github.com/grafana/agent/pkg/river/rivertypes"
18
"github.com/prometheus/common/model"
19
)
20
21
func init() {
22
component.Register(component.Registration{
23
Name: "loki.source.cloudflare",
24
Args: Arguments{},
25
26
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
27
return New(opts, args.(Arguments))
28
},
29
})
30
}
31
32
// Arguments holds values which are used to configure the
33
// loki.source.cloudflare component.
34
type Arguments struct {
35
APIToken rivertypes.Secret `river:"api_token,attr"`
36
ZoneID string `river:"zone_id,attr"`
37
Labels map[string]string `river:"labels,attr,optional"`
38
Workers int `river:"workers,attr,optional"`
39
PullRange time.Duration `river:"pull_range,attr,optional"`
40
FieldsType string `river:"fields_type,attr,optional"`
41
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
42
}
43
44
// Convert returns a cloudflaretarget Config struct from the Arguments.
45
func (c Arguments) Convert() *cft.Config {
46
lbls := make(model.LabelSet, len(c.Labels))
47
for k, v := range c.Labels {
48
lbls[model.LabelName(k)] = model.LabelValue(v)
49
}
50
return &cft.Config{
51
APIToken: string(c.APIToken),
52
ZoneID: c.ZoneID,
53
Labels: lbls,
54
Workers: c.Workers,
55
PullRange: model.Duration(c.PullRange),
56
FieldsType: c.FieldsType,
57
}
58
}
59
60
// DefaultArguments sets the configuration defaults.
61
var DefaultArguments = Arguments{
62
Workers: 3,
63
PullRange: 1 * time.Minute,
64
FieldsType: string(cft.FieldsTypeDefault),
65
}
66
67
var _ river.Unmarshaler = (*Arguments)(nil)
68
69
// UnmarshalRiver implements the unmarshaller
70
func (c *Arguments) UnmarshalRiver(f func(v interface{}) error) error {
71
*c = DefaultArguments
72
type args Arguments
73
err := f((*args)(c))
74
if err != nil {
75
return err
76
}
77
if c.PullRange < 0 {
78
return fmt.Errorf("pull_range must be a positive duration")
79
}
80
_, err = cft.Fields(cft.FieldsType(c.FieldsType))
81
if err != nil {
82
return fmt.Errorf("invalid fields_type set; the available values are 'default', 'minimal', 'extended' and 'all'")
83
}
84
return nil
85
}
86
87
// Component implements the loki.source.cloudflare component.
88
type Component struct {
89
opts component.Options
90
metrics *cft.Metrics
91
92
mut sync.RWMutex
93
fanout []loki.LogsReceiver
94
target *cft.Target
95
96
posFile positions.Positions
97
handler loki.LogsReceiver
98
}
99
100
// New creates a new loki.source.cloudflare component.
101
func New(o component.Options, args Arguments) (*Component, error) {
102
err := os.MkdirAll(o.DataPath, 0750)
103
if err != nil && !os.IsExist(err) {
104
return nil, err
105
}
106
positionsFile, err := positions.New(o.Logger, positions.Config{
107
SyncPeriod: 10 * time.Second,
108
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
109
IgnoreInvalidYaml: false,
110
ReadOnly: false,
111
})
112
if err != nil {
113
return nil, err
114
}
115
116
c := &Component{
117
opts: o,
118
metrics: cft.NewMetrics(o.Registerer),
119
handler: make(loki.LogsReceiver),
120
fanout: args.ForwardTo,
121
posFile: positionsFile,
122
}
123
124
// Call to Update() to start readers and set receivers once at the start.
125
if err := c.Update(args); err != nil {
126
return nil, err
127
}
128
129
return c, nil
130
}
131
132
// Run implements component.Component.
133
func (c *Component) Run(ctx context.Context) error {
134
defer func() {
135
c.mut.RLock()
136
level.Info(c.opts.Logger).Log("msg", "loki.source.cloudflare component shutting down, stopping the target")
137
c.target.Stop()
138
c.mut.RUnlock()
139
}()
140
141
for {
142
select {
143
case <-ctx.Done():
144
return nil
145
case entry := <-c.handler:
146
c.mut.RLock()
147
for _, receiver := range c.fanout {
148
receiver <- entry
149
}
150
c.mut.RUnlock()
151
}
152
}
153
}
154
155
// Update implements component.Component.
156
func (c *Component) Update(args component.Arguments) error {
157
c.mut.Lock()
158
defer c.mut.Unlock()
159
160
newArgs := args.(Arguments)
161
c.fanout = newArgs.ForwardTo
162
163
if c.target != nil {
164
c.target.Stop()
165
}
166
entryHandler := loki.NewEntryHandler(c.handler, func() {})
167
168
t, err := cft.NewTarget(c.metrics, c.opts.Logger, entryHandler, c.posFile, newArgs.Convert())
169
if err != nil {
170
level.Error(c.opts.Logger).Log("msg", "failed to create cloudflare target with provided config", "err", err)
171
return err
172
}
173
c.target = t
174
175
return nil
176
}
177
178
// DebugInfo returns information about the status of targets.
179
func (c *Component) DebugInfo() interface{} {
180
c.mut.RLock()
181
defer c.mut.RUnlock()
182
183
lbls := make(map[string]string, len(c.target.Labels()))
184
for k, v := range c.target.Labels() {
185
lbls[string(k)] = string(v)
186
}
187
return targetDebugInfo{
188
Ready: c.target.Ready(),
189
Details: c.target.Details(),
190
}
191
}
192
193
type targetDebugInfo struct {
194
Ready bool `river:"ready,attr"`
195
Details map[string]string `river:"target_info,attr"`
196
}
197
198