Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes/kubernetes.go
4096 views
1
// Package kubernetes implements the loki.source.kubernetes component.
2
package kubernetes
3
4
import (
5
"context"
6
"fmt"
7
"os"
8
"path/filepath"
9
"reflect"
10
"sync"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
"github.com/grafana/agent/component"
16
"github.com/grafana/agent/component/common/config"
17
commonk8s "github.com/grafana/agent/component/common/kubernetes"
18
"github.com/grafana/agent/component/common/loki"
19
"github.com/grafana/agent/component/common/loki/positions"
20
"github.com/grafana/agent/component/discovery"
21
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
22
"github.com/grafana/agent/pkg/river"
23
"k8s.io/client-go/kubernetes"
24
)
25
26
func init() {
27
component.Register(component.Registration{
28
Name: "loki.source.kubernetes",
29
Args: Arguments{},
30
31
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
32
return New(opts, args.(Arguments))
33
},
34
})
35
}
36
37
// Arguments holds values which are used to configure the loki.source.kubernetes
38
// component.
39
type Arguments struct {
40
Targets []discovery.Target `river:"targets,attr"`
41
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
42
43
// Client settings to connect to Kubernetes.
44
Client commonk8s.ClientArguments `river:"client,block,optional"`
45
}
46
47
var _ river.Unmarshaler = (*Arguments)(nil)
48
49
// DefaultArguments holds default settings for loki.source.kubernetes.
50
var DefaultArguments = Arguments{
51
Client: commonk8s.ClientArguments{
52
HTTPClientConfig: config.DefaultHTTPClientConfig,
53
},
54
}
55
56
// UnmarshalRiver implements river.Unmarshaler and applies defaults.
57
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
58
*args = DefaultArguments
59
60
type arguments Arguments
61
return f((*arguments)(args))
62
}
63
64
// Component implements the loki.source.kubernetes component.
65
type Component struct {
66
log log.Logger
67
opts component.Options
68
positions positions.Positions
69
70
mut sync.Mutex
71
args Arguments
72
tailer *kubetail.Manager
73
lastOptions *kubetail.Options
74
75
handler loki.LogsReceiver
76
77
receiversMut sync.RWMutex
78
receivers []loki.LogsReceiver
79
}
80
81
var (
82
_ component.Component = (*Component)(nil)
83
_ component.DebugComponent = (*Component)(nil)
84
)
85
86
// New creates a new loki.source.kubernetes component.
87
func New(o component.Options, args Arguments) (*Component, error) {
88
err := os.MkdirAll(o.DataPath, 0750)
89
if err != nil && !os.IsExist(err) {
90
return nil, err
91
}
92
positionsFile, err := positions.New(o.Logger, positions.Config{
93
SyncPeriod: 10 * time.Second,
94
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
95
})
96
97
if err != nil {
98
return nil, err
99
}
100
101
c := &Component{
102
log: o.Logger,
103
opts: o,
104
handler: make(loki.LogsReceiver),
105
positions: positionsFile,
106
}
107
if err := c.Update(args); err != nil {
108
return nil, err
109
}
110
return c, nil
111
}
112
113
// Run implements component.Component.
114
func (c *Component) Run(ctx context.Context) error {
115
defer c.positions.Stop()
116
117
defer func() {
118
c.mut.Lock()
119
defer c.mut.Unlock()
120
121
// Guard for safety, but it's not possible for Run to be called without
122
// c.tailer being initialized.
123
if c.tailer != nil {
124
c.tailer.Stop()
125
}
126
}()
127
128
for {
129
select {
130
case <-ctx.Done():
131
return nil
132
case entry := <-c.handler:
133
c.receiversMut.RLock()
134
receivers := c.receivers
135
c.receiversMut.RUnlock()
136
137
for _, receiver := range receivers {
138
receiver <- entry
139
}
140
}
141
}
142
}
143
144
// Update implements component.Component.
145
func (c *Component) Update(args component.Arguments) error {
146
newArgs := args.(Arguments)
147
148
// Update the receivers before anything else, just in case something fails.
149
c.receiversMut.Lock()
150
c.receivers = newArgs.ForwardTo
151
c.receiversMut.Unlock()
152
153
c.mut.Lock()
154
defer c.mut.Unlock()
155
156
managerOpts, err := c.getTailerOptions(newArgs)
157
if err != nil {
158
return err
159
}
160
161
switch {
162
case c.tailer == nil:
163
// First call to Update; build the tailer.
164
c.tailer = kubetail.NewManager(c.log, managerOpts)
165
166
case managerOpts != c.lastOptions:
167
// Options changed; pass it to the tailer.
168
//
169
// This will never fail because it only fails if the context gets canceled.
170
//
171
// TODO(rfratto): should we have a generous update timeout to prevent this
172
// from potentially hanging forever?
173
_ = c.tailer.UpdateOptions(context.Background(), managerOpts)
174
c.lastOptions = managerOpts
175
176
default:
177
// No-op: manager already exists and options didn't change.
178
}
179
180
// Convert input targets into targets to give to tailer.
181
targets := make([]*kubetail.Target, 0, len(newArgs.Targets))
182
183
for _, inTarget := range newArgs.Targets {
184
lset := inTarget.Labels()
185
processed, err := kubetail.PrepareLabels(lset, c.opts.ID)
186
if err != nil {
187
// TODO(rfratto): should this set the health of the component?
188
level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err)
189
continue
190
}
191
targets = append(targets, kubetail.NewTarget(lset, processed))
192
}
193
194
// This will never fail because it only fails if the context gets canceled.
195
//
196
// TODO(rfratto): should we have a generous update timeout to prevent this
197
// from potentially hanging forever?
198
_ = c.tailer.SyncTargets(context.Background(), targets)
199
200
c.args = newArgs
201
return nil
202
}
203
204
// getTailerOptions gets tailer options from arguments. If args hasn't changed
205
// from the last call to getTailerOptions, c.lastOptions is returned.
206
// c.lastOptions must be updated by the caller.
207
//
208
// getTailerOptions must only be called when c.mut is held.
209
func (c *Component) getTailerOptions(args Arguments) (*kubetail.Options, error) {
210
if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil {
211
return c.lastOptions, nil
212
}
213
214
cfg, err := args.Client.BuildRESTConfig(c.log)
215
if err != nil {
216
return c.lastOptions, fmt.Errorf("building Kubernetes config: %w", err)
217
}
218
clientSet, err := kubernetes.NewForConfig(cfg)
219
if err != nil {
220
return c.lastOptions, fmt.Errorf("building Kubernetes client: %w", err)
221
}
222
223
return &kubetail.Options{
224
Client: clientSet,
225
Handler: loki.NewEntryHandler(c.handler, func() {}),
226
Positions: c.positions,
227
}, nil
228
}
229
230
// DebugInfo returns debug information for loki.source.kubernetes.
231
func (c *Component) DebugInfo() interface{} {
232
var info DebugInfo
233
234
for _, target := range c.tailer.Targets() {
235
var lastError string
236
if err := target.LastError(); err != nil {
237
lastError = err.Error()
238
}
239
240
info.Targets = append(info.Targets, DebugInfoTarget{
241
Labels: target.Labels().Map(),
242
DiscoveryLabels: target.DiscoveryLabels().Map(),
243
LastError: lastError,
244
UpdateTime: target.LastEntry().Local(),
245
})
246
}
247
248
return info
249
}
250
251
// DebugInfo represents debug information for loki.source.kubernetes.
252
type DebugInfo struct {
253
Targets []DebugInfoTarget `river:"target,block,optional"`
254
}
255
256
// DebugInfoTarget is debug information for an individual target being tailed
257
// for logs.
258
type DebugInfoTarget struct {
259
Labels map[string]string `river:"labels,attr,optional"`
260
DiscoveryLabels map[string]string `river:"discovery_labels,attr,optional"`
261
LastError string `river:"last_error,attr,optional"`
262
UpdateTime time.Time `river:"update_time,attr,optional"`
263
}
264
265