Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/docker/runner.go
4096 views
1
package docker
2
3
import (
4
"context"
5
"sync"
6
7
"github.com/docker/docker/api/types/container"
8
"github.com/docker/docker/client"
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component/common/loki"
12
"github.com/grafana/agent/component/common/loki/positions"
13
dt "github.com/grafana/agent/component/loki/source/docker/internal/dockertarget"
14
"github.com/grafana/agent/pkg/runner"
15
"github.com/prometheus/common/model"
16
)
17
18
// A manager manages a set of running tailers.
19
type manager struct {
20
log log.Logger
21
22
mut sync.Mutex
23
opts *options
24
tasks []*tailerTask
25
26
runner *runner.Runner[*tailerTask]
27
}
28
29
// newManager returns a new Manager which manages a set of running tailers.
30
// Options must not be modified after passing it to a Manager.
31
//
32
// If newManager is called with a nil set of options, no targets will be
33
// scheduled for running until UpdateOptions is called.
34
func newManager(l log.Logger, opts *options) *manager {
35
return &manager{
36
log: l,
37
opts: opts,
38
runner: runner.New(func(t *tailerTask) runner.Worker {
39
return newTailer(l, t)
40
}),
41
}
42
}
43
44
// options passed to all tailers.
45
type options struct {
46
// client to use to request logs from Docker.
47
client client.APIClient
48
49
// handler to send discovered logs to.
50
handler loki.EntryHandler
51
52
// positions interface so tailers can save/restore offsets in log files.
53
positions positions.Positions
54
}
55
56
// tailerTask is the payload used to create tailers. It implements runner.Task.
57
type tailerTask struct {
58
options *options
59
target *dt.Target
60
}
61
62
var _ runner.Task = (*tailerTask)(nil)
63
64
func (tt *tailerTask) Hash() uint64 { return tt.target.Hash() }
65
66
func (tt *tailerTask) Equals(other runner.Task) bool {
67
otherTask := other.(*tailerTask)
68
69
// Quick path: pointers are exactly the same.
70
if tt == otherTask {
71
return true
72
}
73
74
// Slow path: check individual fields which are part of the task.
75
return tt.options == otherTask.options &&
76
tt.target.Labels().String() == otherTask.target.Labels().String()
77
}
78
79
// A tailer tails the logs of a docker container. It is created by a [Manager].
80
type tailer struct {
81
log log.Logger
82
opts *options
83
target *dt.Target
84
85
lset model.LabelSet
86
}
87
88
// newTailer returns a new tailer which tails logs from the target specified by
89
// the task.
90
func newTailer(l log.Logger, task *tailerTask) *tailer {
91
return &tailer{
92
log: log.WithPrefix(l, "target", task.target.Name()),
93
opts: task.options,
94
target: task.target,
95
96
lset: task.target.Labels(),
97
}
98
}
99
100
func (t *tailer) Run(ctx context.Context) {
101
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)
102
103
t.target.StartIfNotRunning()
104
105
select {
106
case err := <-chErr:
107
// Error setting up the Wait request from the client; either failed to
108
// read from /containers/{containerID}/wait, or couldn't parse the
109
// response. Stop the target and exit the task after logging; if it was
110
// a transient error, the target will be retried on the next discovery
111
// refresh.
112
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
113
t.target.Stop()
114
return
115
case <-ch:
116
t.target.Stop()
117
return
118
}
119
}
120
121
// syncTargets synchronizes the set of running tailers to the set specified by
122
// targets.
123
func (m *manager) syncTargets(ctx context.Context, targets []*dt.Target) error {
124
m.mut.Lock()
125
defer m.mut.Unlock()
126
127
// Convert targets into tasks to give to the runner.
128
tasks := make([]*tailerTask, 0, len(targets))
129
for _, target := range targets {
130
tasks = append(tasks, &tailerTask{
131
options: m.opts,
132
target: target,
133
})
134
}
135
136
// Sync our tasks to the runner. If the Manager doesn't have any options,
137
// the runner will be cleared of tasks until UpdateOptions is called with a
138
// non-nil set of options.
139
switch m.opts {
140
default:
141
if err := m.runner.ApplyTasks(ctx, tasks); err != nil {
142
return err
143
}
144
case nil:
145
if err := m.runner.ApplyTasks(ctx, nil); err != nil {
146
return err
147
}
148
}
149
150
// Delete positions for targets which have gone away.
151
newEntries := make(map[positions.Entry]struct{}, len(targets))
152
for _, target := range targets {
153
newEntries[entryForTarget(target)] = struct{}{}
154
}
155
156
for _, task := range m.tasks {
157
ent := entryForTarget(task.target)
158
159
// The task from the last call to SyncTargets is no longer in newEntries;
160
// remove it from the positions file. We do this _after_ calling ApplyTasks
161
// to ensure that the old tailers have shut down, otherwise the tailer
162
// might write its position again during shutdown after we removed it.
163
if _, found := newEntries[ent]; !found {
164
level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels)
165
m.opts.positions.Remove(ent.Path, ent.Labels)
166
}
167
}
168
169
m.tasks = tasks
170
return nil
171
}
172
173
func entryForTarget(t *dt.Target) positions.Entry {
174
// The positions entry is keyed by container_id; the path is fed into
175
// positions.CursorKey to treat it as a "cursor"; otherwise
176
// positions.Positions will try to read the path as a file and delete the
177
// entry when it can't find it.
178
return positions.Entry{
179
Path: positions.CursorKey(t.Name()),
180
Labels: t.Labels().String(),
181
}
182
}
183
184
// updateOptions updates the Options shared with all Tailers. All Tailers will
185
// be updated with the new set of Options. Options should not be modified after
186
// passing to updateOptions.
187
//
188
// If newOptions is nil, all tasks will be cleared until updateOptions is
189
// called again with a non-nil set of options.
190
func (m *manager) updateOptions(ctx context.Context, newOptions *options) error {
191
m.mut.Lock()
192
defer m.mut.Unlock()
193
194
// Iterate through the previous set of tasks and create a new task with the
195
// new set of options.
196
tasks := make([]*tailerTask, 0, len(m.tasks))
197
for _, oldTask := range m.tasks {
198
tasks = append(tasks, &tailerTask{
199
options: newOptions,
200
target: oldTask.target,
201
})
202
}
203
204
switch newOptions {
205
case nil:
206
if err := m.runner.ApplyTasks(ctx, nil); err != nil {
207
return err
208
}
209
default:
210
if err := m.runner.ApplyTasks(ctx, tasks); err != nil {
211
return err
212
}
213
}
214
215
m.opts = newOptions
216
m.tasks = tasks
217
return nil
218
}
219
220
// targets returns the set of targets which are actively being tailed. targets
221
// for tailers which have terminated are not included. The returned set of
222
// targets are deduplicated.
223
func (m *manager) targets() []*dt.Target {
224
tasks := m.runner.Tasks()
225
226
targets := make([]*dt.Target, 0, len(tasks))
227
for _, task := range tasks {
228
targets = append(targets, task.target)
229
}
230
return targets
231
}
232
233
// stop stops the manager and all running Tailers. It blocks until all Tailers
234
// have exited.
235
func (m *manager) stop() {
236
m.runner.Stop()
237
}
238
239