Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes/kubetail/kubetail.go
4096 views
1
// Package kubetail implements a log file tailer using the Kubernetes API.
2
package kubetail
3
4
import (
5
"context"
6
"sync"
7
8
"github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/grafana/agent/component/common/loki"
11
"github.com/grafana/agent/component/common/loki/positions"
12
"github.com/grafana/agent/pkg/runner"
13
"k8s.io/client-go/kubernetes"
14
)
15
16
// Options passed to all tailers.
17
type Options struct {
18
// Client to use to request logs from Kubernetes.
19
Client *kubernetes.Clientset
20
21
// Handler to send discovered logs to.
22
Handler loki.EntryHandler
23
24
// Positions interface so tailers can save/restore offsets in log files.
25
Positions positions.Positions
26
}
27
28
// A Manager manages a set of running Tailers.
29
type Manager struct {
30
log log.Logger
31
32
mut sync.Mutex
33
opts *Options
34
tasks []*tailerTask
35
36
runner *runner.Runner[*tailerTask]
37
}
38
39
// NewManager returns a new Manager which manages a set of running tailers.
40
// Options must not be modified after passing it to a Manager.
41
//
42
// If NewManager is called with a nil set of options, no targets will be
43
// scheduled for running until UpdateOptions is called.
44
func NewManager(l log.Logger, opts *Options) *Manager {
45
return &Manager{
46
log: l,
47
opts: opts,
48
runner: runner.New(func(t *tailerTask) runner.Worker {
49
return newTailer(l, t)
50
}),
51
}
52
}
53
54
// SyncTargets synchronizes the set of running tailers to the set specified by
55
// targets.
56
func (m *Manager) SyncTargets(ctx context.Context, targets []*Target) error {
57
m.mut.Lock()
58
defer m.mut.Unlock()
59
60
// Convert targets into tasks to give to the runner.
61
tasks := make([]*tailerTask, 0, len(targets))
62
for _, target := range targets {
63
tasks = append(tasks, &tailerTask{
64
Options: m.opts,
65
Target: target,
66
})
67
}
68
69
// Sync our tasks to the runner. If the Manager doesn't have any options, the
70
// runner will be cleaered of tasks until UpdateOptions is called with a
71
// non-nil set of options.
72
switch m.opts {
73
default:
74
if err := m.runner.ApplyTasks(ctx, tasks); err != nil {
75
return err
76
}
77
case nil:
78
if err := m.runner.ApplyTasks(ctx, nil); err != nil {
79
return err
80
}
81
}
82
83
// Delete positions for targets which have gone away.
84
newEntries := make(map[positions.Entry]struct{}, len(targets))
85
for _, target := range targets {
86
newEntries[entryForTarget(target)] = struct{}{}
87
}
88
89
for _, task := range m.tasks {
90
ent := entryForTarget(task.Target)
91
92
// The task from the last call to SyncTargets is no longer in newEntries;
93
// remove it from the positions file. We do this _after_ calling ApplyTasks
94
// to ensure that the old tailers have shut down, otherwise the tailer
95
// might write its position again during shutdown after we removed it.
96
if _, found := newEntries[ent]; !found {
97
level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels)
98
m.opts.Positions.Remove(ent.Path, ent.Labels)
99
}
100
}
101
102
m.tasks = tasks
103
return nil
104
}
105
106
func entryForTarget(t *Target) positions.Entry {
107
// The positions entry is keyed by UID to ensure that positions from
108
// completely distinct "namespace/name:container" instances don't interfere
109
// with each other.
110
//
111
// While it's still technically possible for two containers to have the same
112
// "namespace/name:container" string and UID, it's so wildly unlikely that
113
// it's probably not worth handling.
114
//
115
// The path is fed into positions.CursorKey to treat it as a "cursor";
116
// otherwise positions.Positions will try to read the path as a file and
117
// delete the entry when it can't find it.
118
return positions.Entry{
119
Path: positions.CursorKey(t.String() + ":" + t.UID()),
120
Labels: t.Labels().String(),
121
}
122
}
123
124
// UpdateOptions updates the Options shared with all Tailers. All Tailers will
125
// be updated with the new set of Options. Options should not be modified after
126
// passing to UpdateOptions.
127
//
128
// If newOptions is nil, all tasks will be cleared until UpdateOptions is
129
// called again with a non-nil set of options.
130
func (m *Manager) UpdateOptions(ctx context.Context, newOptions *Options) error {
131
m.mut.Lock()
132
defer m.mut.Unlock()
133
134
// Iterate through the previous set of tasks and create a new task with the
135
// new set of options.
136
tasks := make([]*tailerTask, 0, len(m.tasks))
137
for _, oldTask := range m.tasks {
138
tasks = append(tasks, &tailerTask{
139
Options: newOptions,
140
Target: oldTask.Target,
141
})
142
}
143
144
switch newOptions {
145
case nil:
146
if err := m.runner.ApplyTasks(ctx, nil); err != nil {
147
return err
148
}
149
default:
150
if err := m.runner.ApplyTasks(ctx, tasks); err != nil {
151
return err
152
}
153
}
154
155
m.opts = newOptions
156
m.tasks = tasks
157
return nil
158
}
159
160
// Targets returns the set of targets which are actively being tailed. Targets
161
// for tailers which have terminated are not included. The returned set of
162
// targets are deduplicated.
163
func (m *Manager) Targets() []*Target {
164
tasks := m.runner.Tasks()
165
166
targets := make([]*Target, 0, len(tasks))
167
for _, task := range tasks {
168
targets = append(targets, task.Target)
169
}
170
return targets
171
}
172
173
// Stop stops the manager and all running Tailers. It blocks until all Tailers
174
// have exited.
175
func (m *Manager) Stop() {
176
m.runner.Stop()
177
}
178
179