Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes_events/kubernetes_events.go
4096 views
1
// Package kubernetes_events implements the loki.source.kubernetes_events
2
// component.
3
package kubernetes_events //nolint:golint
4
5
import (
6
"context"
7
"fmt"
8
"os"
9
"path/filepath"
10
"reflect"
11
"sync"
12
"time"
13
14
"github.com/go-kit/log"
15
"github.com/go-kit/log/level"
16
"github.com/grafana/agent/component"
17
"github.com/grafana/agent/component/common/config"
18
"github.com/grafana/agent/component/common/kubernetes"
19
"github.com/grafana/agent/component/common/loki"
20
"github.com/grafana/agent/component/common/loki/positions"
21
"github.com/grafana/agent/pkg/river"
22
"github.com/grafana/agent/pkg/runner"
23
"github.com/oklog/run"
24
"k8s.io/client-go/rest"
25
)
26
27
// Generous timeout period for configuring informers
28
const informerSyncTimeout = 10 * time.Second
29
30
func init() {
31
component.Register(component.Registration{
32
Name: "loki.source.kubernetes_events",
33
Args: Arguments{},
34
35
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
36
return New(opts, args.(Arguments))
37
},
38
})
39
}
40
41
// Arguments holds values which are used to configure the
42
// loki.source.kubernetes_events component.
43
type Arguments struct {
44
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
45
46
JobName string `river:"job_name,attr,optional"`
47
Namespaces []string `river:"namespaces,attr,optional"`
48
49
// Client settings to connect to Kubernetes.
50
Client kubernetes.ClientArguments `river:"client,block,optional"`
51
}
52
53
var _ river.Unmarshaler = (*Arguments)(nil)
54
55
// DefaultArguments holds default settings for loki.source.kubernetes_events.
56
var DefaultArguments = Arguments{
57
JobName: "loki.source.kubernetes_events",
58
59
Client: kubernetes.ClientArguments{
60
HTTPClientConfig: config.DefaultHTTPClientConfig,
61
},
62
}
63
64
// UnmarshalRiver implements river.Unmarshaler and applies defaults.
65
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
66
*args = DefaultArguments
67
68
type arguments Arguments
69
if err := f((*arguments)(args)); err != nil {
70
return err
71
}
72
73
if args.JobName == "" {
74
return fmt.Errorf("job_name must not be an empty string")
75
}
76
return nil
77
}
78
79
// Component implements the loki.source.kubernetes_events component, which
80
// watches events from Kubernetes and forwards received events to other Loki
81
// components.
82
type Component struct {
83
log log.Logger
84
opts component.Options
85
positions positions.Positions
86
handler loki.LogsReceiver
87
runner *runner.Runner[eventControllerTask]
88
newTasksCh chan struct{}
89
90
mut sync.Mutex
91
args Arguments
92
restConfig *rest.Config
93
94
tasksMut sync.RWMutex
95
tasks []eventControllerTask
96
97
receiversMut sync.RWMutex
98
receivers []loki.LogsReceiver
99
}
100
101
var (
102
_ component.Component = (*Component)(nil)
103
_ component.DebugComponent = (*Component)(nil)
104
)
105
106
// New creates a new loki.source.kubernetes_events component.
107
func New(o component.Options, args Arguments) (*Component, error) {
108
err := os.MkdirAll(o.DataPath, 0750)
109
if err != nil && !os.IsExist(err) {
110
return nil, err
111
}
112
positionsFile, err := positions.New(o.Logger, positions.Config{
113
SyncPeriod: 10 * time.Second,
114
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
115
})
116
if err != nil {
117
return nil, err
118
}
119
120
c := &Component{
121
log: o.Logger,
122
opts: o,
123
positions: positionsFile,
124
handler: make(loki.LogsReceiver),
125
runner: runner.New(func(t eventControllerTask) runner.Worker {
126
return newEventController(t)
127
}),
128
newTasksCh: make(chan struct{}, 1),
129
}
130
if err := c.Update(args); err != nil {
131
return nil, err
132
}
133
return c, nil
134
}
135
136
// Run implements component.Component.
137
func (c *Component) Run(ctx context.Context) error {
138
ctx, cancel := context.WithCancel(ctx)
139
defer cancel()
140
141
defer c.positions.Stop()
142
defer c.runner.Stop()
143
144
var rg run.Group
145
146
// Runner to apply tasks.
147
rg.Add(func() error {
148
for {
149
select {
150
case <-ctx.Done():
151
return nil
152
case <-c.newTasksCh:
153
c.tasksMut.RLock()
154
tasks := c.tasks
155
c.tasksMut.RUnlock()
156
157
if err := c.runner.ApplyTasks(ctx, tasks); err != nil {
158
level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)
159
}
160
}
161
}
162
}, func(_ error) {
163
cancel()
164
})
165
166
// Runner to forward received logs.
167
rg.Add(func() error {
168
for {
169
select {
170
case <-ctx.Done():
171
return nil
172
case entry := <-c.handler:
173
c.receiversMut.RLock()
174
receivers := c.receivers
175
c.receiversMut.RUnlock()
176
177
for _, receiver := range receivers {
178
receiver <- entry
179
}
180
}
181
}
182
}, func(_ error) {
183
cancel()
184
})
185
186
return rg.Run()
187
}
188
189
// Update implements component.Component.
190
func (c *Component) Update(args component.Arguments) error {
191
c.mut.Lock()
192
defer c.mut.Unlock()
193
194
newArgs := args.(Arguments)
195
196
c.receiversMut.Lock()
197
c.receivers = newArgs.ForwardTo
198
c.receiversMut.Unlock()
199
200
restConfig := c.restConfig
201
202
// Create a new restConfig if we don't have one or if our arguments changed.
203
if restConfig == nil || !reflect.DeepEqual(c.args.Client, newArgs.Client) {
204
var err error
205
restConfig, err = newArgs.Client.BuildRESTConfig(c.log)
206
if err != nil {
207
return fmt.Errorf("building Kubernetes client config: %w", err)
208
}
209
}
210
211
// Create a task for each defined namespace.
212
var newTasks []eventControllerTask
213
for _, namespace := range getNamespaces(newArgs) {
214
newTasks = append(newTasks, eventControllerTask{
215
Log: c.log,
216
Config: restConfig,
217
JobName: newArgs.JobName,
218
InstanceName: c.opts.ID,
219
Namespace: namespace,
220
Receiver: c.handler,
221
Positions: c.positions,
222
})
223
}
224
225
c.tasksMut.Lock()
226
c.tasks = newTasks
227
c.tasksMut.Unlock()
228
229
select {
230
case c.newTasksCh <- struct{}{}:
231
default:
232
// no-op: task reload already queued.
233
}
234
235
c.args = newArgs
236
return nil
237
}
238
239
// getNamespaces gets a list of namespaces to watch from the arguments. If the
240
// list of namespaces is empty, returns a slice to watch all namespaces.
241
func getNamespaces(args Arguments) []string {
242
if len(args.Namespaces) == 0 {
243
return []string{""} // Empty string means to watch all namespaces
244
}
245
return args.Namespaces
246
}
247
248
// DebugInfo implements [component.DebugComponent].
249
func (c *Component) DebugInfo() interface{} {
250
type Info struct {
251
Controllers []controllerInfo `river:"event_controller,block,optional"`
252
}
253
254
var info Info
255
for _, worker := range c.runner.Workers() {
256
info.Controllers = append(info.Controllers, worker.(*eventController).DebugInfo())
257
}
258
return info
259
}
260
261