Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kubernetes/kubetail/tailer.go
4096 views
1
package kubetail
2
3
import (
4
"bufio"
5
"context"
6
"errors"
7
"fmt"
8
"io"
9
"strings"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component/common/loki"
15
"github.com/grafana/agent/pkg/runner"
16
"github.com/grafana/dskit/backoff"
17
"github.com/grafana/loki/pkg/logproto"
18
"github.com/prometheus/common/model"
19
"github.com/prometheus/prometheus/model/labels"
20
corev1 "k8s.io/api/core/v1"
21
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22
kubetypes "k8s.io/apimachinery/pkg/types"
23
)
24
25
// tailerTask is the payload used to create tailers. It implements runner.Task.
26
type tailerTask struct {
27
Options *Options
28
Target *Target
29
}
30
31
var _ runner.Task = (*tailerTask)(nil)
32
33
func (tt *tailerTask) Hash() uint64 { return tt.Target.Hash() }
34
35
func (tt *tailerTask) Equals(other runner.Task) bool {
36
otherTask := other.(*tailerTask)
37
38
// Quick path: pointers are exactly the same.
39
if tt == otherTask {
40
return true
41
}
42
43
// Slow path: check individual fields which are part of the task.
44
return tt.Options == otherTask.Options &&
45
tt.Target.UID() == otherTask.Target.UID() &&
46
labels.Equal(tt.Target.Labels(), otherTask.Target.Labels())
47
}
48
49
// A tailer tails the logs of a Kubernetes container. It is created by a
50
// [Manager].
51
type tailer struct {
52
log log.Logger
53
opts *Options
54
target *Target
55
56
lset model.LabelSet
57
}
58
59
var _ runner.Worker = (*tailer)(nil)
60
61
// newTailer returns a new Tailer which tails logs from the target specified by
62
// the task.
63
func newTailer(l log.Logger, task *tailerTask) *tailer {
64
return &tailer{
65
log: log.WithPrefix(l, "target", task.Target.String()),
66
opts: task.Options,
67
target: task.Target,
68
69
lset: newLabelSet(task.Target.Labels()),
70
}
71
}
72
73
func newLabelSet(l labels.Labels) model.LabelSet {
74
res := make(model.LabelSet, len(l))
75
for _, pair := range l {
76
res[model.LabelName(pair.Name)] = model.LabelValue(pair.Value)
77
}
78
return res
79
}
80
81
var retailBackoff = backoff.Config{
82
// Since our tailers have a maximum lifetime and are expected to regularly
83
// terminate to refresh their connection to the Kubernetes API, the minimum
84
// backoff starts at zero so there's minimum delay between expected
85
// terminations.
86
MinBackoff: 0,
87
MaxBackoff: time.Minute,
88
}
89
90
func (t *tailer) Run(ctx context.Context) {
91
ctx, cancel := context.WithCancel(ctx)
92
defer cancel()
93
94
level.Info(t.log).Log("msg", "tailer running")
95
defer level.Info(t.log).Log("msg", "tailer exited")
96
97
bo := backoff.New(ctx, retailBackoff)
98
99
handler := loki.NewEntryMutatorHandler(t.opts.Handler, func(e loki.Entry) loki.Entry {
100
// A log line got read, we can reset the backoff period now.
101
bo.Reset()
102
return e
103
})
104
defer handler.Stop()
105
106
for bo.Ongoing() {
107
err := t.tail(ctx, handler)
108
if err == nil {
109
terminated, err := t.containerTerminated(ctx)
110
if terminated {
111
// The container shut down and won't come back; we can stop tailing it.
112
return
113
} else if err != nil {
114
level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err)
115
}
116
}
117
118
if err != nil {
119
t.target.Report(time.Now().UTC(), err)
120
level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err)
121
}
122
bo.Wait()
123
}
124
}
125
126
func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
127
// Set a maximum lifetime of the tail to ensure that connections are
128
// reestablished. This avoids an issue where the Kubernetes API server stops
129
// responding with new logs while the connection is kept open.
130
ctx, cancel := context.WithTimeout(ctx, 1*time.Hour)
131
defer cancel()
132
133
var (
134
key = t.target.NamespacedName()
135
containerName = t.target.ContainerName()
136
137
positionsEnt = entryForTarget(t.target)
138
)
139
140
var lastReadTime time.Time
141
142
if offset, err := t.opts.Positions.Get(positionsEnt.Path, positionsEnt.Labels); err != nil {
143
level.Warn(t.log).Log("msg", "failed to load last read offset", "err", err)
144
} else {
145
lastReadTime = time.UnixMicro(offset)
146
}
147
148
// If the last entry for our target is after the positions cache, use that
149
// instead.
150
if lastEntry := t.target.LastEntry(); lastEntry.After(lastReadTime) {
151
lastReadTime = lastEntry
152
}
153
154
var offsetTime *metav1.Time
155
if !lastReadTime.IsZero() {
156
offsetTime = &metav1.Time{Time: lastReadTime}
157
}
158
159
req := t.opts.Client.CoreV1().Pods(key.Namespace).GetLogs(key.Name, &corev1.PodLogOptions{
160
Follow: true,
161
Container: containerName,
162
SinceTime: offsetTime,
163
Timestamps: true, // Should be forced to true so we can parse the original timestamp back out.
164
})
165
166
stream, err := req.Stream(ctx)
167
if err != nil {
168
return err
169
}
170
go func() {
171
<-ctx.Done()
172
_ = stream.Close()
173
}()
174
175
level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)
176
177
ch := handler.Chan()
178
reader := bufio.NewReader(stream)
179
180
for {
181
line, err := reader.ReadString('\n')
182
183
// Try processing the line before handling the error, since data may still
184
// be returned alongside an EOF.
185
if len(line) != 0 {
186
entryTimestamp, entryLine := parseKubernetesLog(line)
187
if !entryTimestamp.After(lastReadTime) {
188
continue
189
}
190
lastReadTime = entryTimestamp
191
192
entry := loki.Entry{
193
Labels: t.lset.Clone(),
194
Entry: logproto.Entry{
195
Timestamp: entryTimestamp,
196
Line: entryLine,
197
},
198
}
199
200
select {
201
case <-ctx.Done():
202
return nil
203
case ch <- entry:
204
// Save position after it's been sent over the channel.
205
t.opts.Positions.Put(positionsEnt.Path, positionsEnt.Labels, entryTimestamp.UnixMicro())
206
t.target.Report(entryTimestamp, nil)
207
}
208
}
209
210
// Return an error if our stream closed. The caller will reopen the tailer
211
// forever until our tailer is closed.
212
//
213
// Even if EOF is returned, we still want to allow the tailer to retry
214
// until the tailer is shutdown; EOF being returned doesn't necessarily
215
// indicate that the logs are done, and could point to a brief network
216
// outage.
217
if err != nil && (errors.Is(err, io.EOF) || ctx.Err() != nil) {
218
return nil
219
} else if err != nil {
220
return err
221
}
222
}
223
}
224
225
// containerTerminated determines whether the container this tailer was
226
// watching has terminated and won't restart. If containerTerminated returns
227
// true, it means that no more logs will appear for the watched target.
228
func (t *tailer) containerTerminated(ctx context.Context) (terminated bool, err error) {
229
var (
230
key = t.target.NamespacedName()
231
containerName = t.target.ContainerName()
232
)
233
234
podInfo, err := t.opts.Client.CoreV1().Pods(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})
235
if err != nil {
236
return false, err
237
}
238
239
// The pod UID is different than the one we were tailing; our UID has
240
// terminated.
241
if podInfo.GetUID() != kubetypes.UID(t.target.UID()) {
242
return true, nil
243
}
244
245
containerInfo, containerType, found := findContainerStatus(podInfo, containerName)
246
if !found {
247
return false, fmt.Errorf("could not find container %q in pod status", containerName)
248
}
249
250
restartPolicy := podInfo.Spec.RestartPolicy
251
252
switch containerType {
253
case containerTypeApp:
254
// An app container will only restart if:
255
//
256
// * It is in a waiting (meaning it's waiting to run) or running state
257
// (meaning it already restarted before we had a chance to check)
258
// * It terminated with any exit code and restartPolicy is Always
259
// * It terminated with non-zero exit code and restartPolicy is not Never
260
//
261
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy
262
switch {
263
case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:
264
return false, nil // Container will restart
265
case containerInfo.State.Terminated != nil && restartPolicy == corev1.RestartPolicyAlways:
266
return false, nil // Container will restart
267
case containerInfo.State.Terminated != nil && containerInfo.State.Terminated.ExitCode != 0 && restartPolicy != corev1.RestartPolicyNever:
268
return false, nil // Container will restart
269
default:
270
return true, nil // Container will *not* restart
271
}
272
273
case containerTypeInit:
274
// An init container will only restart if:
275
//
276
// * It is in a waiting (meaning it's waiting to run) or running state
277
// (meaning it already restarted before we had a chance to check)
278
// * It terminated with an exit code of non-zero and restartPolicy is not
279
// Never.
280
//
281
// https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#understanding-init-containers
282
switch {
283
case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:
284
return false, nil // Container will restart
285
case containerInfo.State.Terminated != nil && containerInfo.State.Terminated.ExitCode != 0 && restartPolicy != corev1.RestartPolicyNever:
286
return false, nil // Container will restart
287
default:
288
return true, nil // Container will *not* restart
289
}
290
291
case containerTypeEphemeral:
292
// Ephemeral containers never restart.
293
//
294
// https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/
295
switch {
296
case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:
297
return false, nil // Container is running or is about to run
298
default:
299
return true, nil // Container will *not* restart
300
}
301
}
302
303
return false, nil
304
}
305
306
// parseKubernetesLog parses a log line returned from the Kubernetes API,
307
// splitting out the timestamp and the log line. If the timestamp cannot be
308
// parsed, time.Now() is returned with the original log line intact.
309
//
310
// If the timestamp was parsed, it is stripped out of the resulting line of
311
// text.
312
func parseKubernetesLog(input string) (timestamp time.Time, line string) {
313
timestampOffset := strings.IndexByte(input, ' ')
314
if timestampOffset == -1 {
315
return time.Now().UTC(), input
316
}
317
318
var remain string
319
if timestampOffset < len(input) {
320
remain = input[timestampOffset+1:]
321
}
322
323
// Kubernetes can return timestamps in either RFC3339Nano or RFC3339, so we
324
// try both.
325
timestampString := input[:timestampOffset]
326
327
if timestamp, err := time.Parse(time.RFC3339Nano, timestampString); err == nil {
328
return timestamp.UTC(), remain
329
}
330
331
if timestamp, err := time.Parse(time.RFC3339, timestampString); err == nil {
332
return timestamp.UTC(), remain
333
}
334
335
return time.Now().UTC(), input
336
}
337
338