Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/podlogs/podlogs.go
5340 views
1
package podlogs
2
3
import (
4
"context"
5
"fmt"
6
"os"
7
"path/filepath"
8
"reflect"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component"
15
"github.com/grafana/agent/component/common/config"
16
commonk8s "github.com/grafana/agent/component/common/kubernetes"
17
"github.com/grafana/agent/component/common/loki"
18
"github.com/grafana/agent/component/common/loki/positions"
19
"github.com/grafana/agent/component/loki/source/kubernetes"
20
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
21
"github.com/grafana/agent/pkg/river"
22
"github.com/oklog/run"
23
kubeclient "k8s.io/client-go/kubernetes"
24
"k8s.io/client-go/rest"
25
)
26
27
func init() {
28
component.Register(component.Registration{
29
Name: "loki.source.podlogs",
30
Args: Arguments{},
31
32
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
33
return New(opts, args.(Arguments))
34
},
35
})
36
}
37
38
// Arguments holds values which are used to configure the loki.source.podlogs
39
// component.
40
type Arguments struct {
41
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
42
43
// Client settings to connect to Kubernetes.
44
Client commonk8s.ClientArguments `river:"client,block,optional"`
45
46
Selector config.LabelSelector `river:"selector,block,optional"`
47
NamespaceSelector config.LabelSelector `river:"namespace_selector,block,optional"`
48
}
49
50
var _ river.Unmarshaler = (*Arguments)(nil)
51
52
// DefaultArguments holds default settings for loki.source.kubernetes.
53
var DefaultArguments = Arguments{
54
Client: commonk8s.ClientArguments{
55
HTTPClientConfig: config.DefaultHTTPClientConfig,
56
},
57
}
58
59
// UnmarshalRiver implements river.Unmarshaler and applies defaults.
60
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
61
*args = DefaultArguments
62
63
type arguments Arguments
64
return f((*arguments)(args))
65
}
66
67
// Component implements the loki.source.podlogs component.
68
type Component struct {
69
log log.Logger
70
opts component.Options
71
72
tailer *kubetail.Manager
73
reconciler *reconciler
74
controller *controller
75
76
positions positions.Positions
77
handler loki.LogsReceiver
78
79
mut sync.RWMutex
80
args Arguments
81
lastOptions *kubetail.Options
82
restConfig *rest.Config
83
84
receiversMut sync.RWMutex
85
receivers []loki.LogsReceiver
86
}
87
88
var (
89
_ component.Component = (*Component)(nil)
90
_ component.DebugComponent = (*Component)(nil)
91
)
92
93
// New creates a new loki.source.podlogs component.
94
func New(o component.Options, args Arguments) (*Component, error) {
95
err := os.MkdirAll(o.DataPath, 0750)
96
if err != nil && !os.IsExist(err) {
97
return nil, err
98
}
99
positionsFile, err := positions.New(o.Logger, positions.Config{
100
SyncPeriod: 10 * time.Second,
101
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
102
})
103
if err != nil {
104
return nil, err
105
}
106
107
var (
108
tailer = kubetail.NewManager(o.Logger, nil)
109
reconciler = newReconciler(o.Logger, tailer)
110
controller = newController(o.Logger, reconciler)
111
)
112
113
c := &Component{
114
log: o.Logger,
115
opts: o,
116
117
tailer: tailer,
118
reconciler: reconciler,
119
controller: controller,
120
121
positions: positionsFile,
122
handler: make(loki.LogsReceiver),
123
}
124
if err := c.Update(args); err != nil {
125
return nil, err
126
}
127
return c, nil
128
}
129
130
// Run implements component.Component.
131
func (c *Component) Run(ctx context.Context) error {
132
ctx, cancel := context.WithCancel(ctx)
133
defer cancel()
134
135
defer c.positions.Stop()
136
137
defer func() {
138
c.mut.RLock()
139
defer c.mut.RUnlock()
140
141
// Guard for safety, but it's not possible for Run to be called without
142
// c.tailer being initialized.
143
if c.tailer != nil {
144
c.tailer.Stop()
145
}
146
}()
147
148
var g run.Group
149
150
g.Add(func() error {
151
c.runHandler(ctx)
152
return nil
153
}, func(_ error) {
154
cancel()
155
})
156
157
g.Add(func() error {
158
err := c.controller.Run(ctx)
159
if err != nil {
160
level.Error(c.log).Log("msg", "controller exited with error", "err", err)
161
}
162
return err
163
}, func(_ error) {
164
cancel()
165
})
166
167
return g.Run()
168
}
169
170
func (c *Component) runHandler(ctx context.Context) {
171
for {
172
select {
173
case <-ctx.Done():
174
return
175
case entry := <-c.handler:
176
c.receiversMut.RLock()
177
receivers := c.receivers
178
c.receiversMut.RUnlock()
179
180
for _, receiver := range receivers {
181
receiver <- entry
182
}
183
}
184
}
185
}
186
187
// Update implements component.Component.
188
func (c *Component) Update(args component.Arguments) error {
189
newArgs := args.(Arguments)
190
191
// Update the receivers before anything else, just in case something fails.
192
c.receiversMut.Lock()
193
c.receivers = newArgs.ForwardTo
194
c.receiversMut.Unlock()
195
196
c.mut.Lock()
197
defer c.mut.Unlock()
198
199
if err := c.updateTailer(newArgs); err != nil {
200
return err
201
}
202
if err := c.updateReconciler(newArgs); err != nil {
203
return err
204
}
205
if err := c.updateController(newArgs); err != nil {
206
return err
207
}
208
209
c.args = newArgs
210
return nil
211
}
212
213
// updateTailer updates the state of the tailer. mut must be held when calling.
214
func (c *Component) updateTailer(args Arguments) error {
215
if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil {
216
return nil
217
}
218
219
cfg, err := args.Client.BuildRESTConfig(c.log)
220
if err != nil {
221
return fmt.Errorf("building Kubernetes config: %w", err)
222
}
223
clientSet, err := kubeclient.NewForConfig(cfg)
224
if err != nil {
225
return fmt.Errorf("building Kubernetes client: %w", err)
226
}
227
228
managerOpts := &kubetail.Options{
229
Client: clientSet,
230
Handler: loki.NewEntryHandler(c.handler, func() {}),
231
Positions: c.positions,
232
}
233
c.lastOptions = managerOpts
234
235
// Options changed; pass it to the tailer. This will never fail because it
236
// only fails if the context gets canceled.
237
//
238
// TODO(rfratto): should we have a generous update timeout to prevent this
239
// from potentially hanging forever?
240
_ = c.tailer.UpdateOptions(context.Background(), managerOpts)
241
return nil
242
}
243
244
// updateReconciler updates the state of the reconciler. This must only be
245
// called after updateTailer. mut must be held when calling.
246
func (c *Component) updateReconciler(args Arguments) error {
247
var (
248
selectorChanged = !reflect.DeepEqual(c.args.Selector, args.Selector)
249
namespaceSelectorChanged = !reflect.DeepEqual(c.args.NamespaceSelector, args.NamespaceSelector)
250
)
251
if !selectorChanged && !namespaceSelectorChanged {
252
return nil
253
}
254
255
sel, err := args.Selector.BuildSelector()
256
if err != nil {
257
return err
258
}
259
nsSel, err := args.NamespaceSelector.BuildSelector()
260
if err != nil {
261
return err
262
}
263
264
c.reconciler.UpdateSelectors(sel, nsSel)
265
266
// Request a reconcile so the new selectors get applied.
267
c.controller.RequestReconcile()
268
return nil
269
}
270
271
// updateController updates the state of the controller. This must only be
272
// called after updateReconciler. mut must be held when calling.
273
func (c *Component) updateController(args Arguments) error {
274
// We only need to update the controller if we already have a rest config
275
// generated and our client args haven't changed since the last call.
276
if reflect.DeepEqual(c.args.Client, args.Client) && c.restConfig != nil {
277
return nil
278
}
279
280
cfg, err := args.Client.BuildRESTConfig(c.log)
281
if err != nil {
282
return fmt.Errorf("building Kubernetes config: %w", err)
283
}
284
c.restConfig = cfg
285
286
return c.controller.UpdateConfig(cfg)
287
}
288
289
// DebugInfo returns debug information for loki.source.podlogs.
290
func (c *Component) DebugInfo() interface{} {
291
var info DebugInfo
292
293
info.DiscoveredPodLogs = c.reconciler.DebugInfo()
294
295
for _, target := range c.tailer.Targets() {
296
var lastError string
297
if err := target.LastError(); err != nil {
298
lastError = err.Error()
299
}
300
301
info.Targets = append(info.Targets, kubernetes.DebugInfoTarget{
302
Labels: target.Labels().Map(),
303
DiscoveryLabels: target.DiscoveryLabels().Map(),
304
LastError: lastError,
305
UpdateTime: target.LastEntry().Local(),
306
})
307
}
308
309
return info
310
}
311
312
// DebugInfo stores debug information for loki.source.podlogs.
313
type DebugInfo struct {
314
DiscoveredPodLogs []DiscoveredPodLogs `river:"pod_logs,block"`
315
Targets []kubernetes.DebugInfoTarget `river:"target,block,optional"`
316
}
317
318