Path: blob/main/component/loki/source/kubernetes/kubetail/tailer.go
4096 views
package kubetail12import (3"bufio"4"context"5"errors"6"fmt"7"io"8"strings"9"time"1011"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/grafana/agent/component/common/loki"14"github.com/grafana/agent/pkg/runner"15"github.com/grafana/dskit/backoff"16"github.com/grafana/loki/pkg/logproto"17"github.com/prometheus/common/model"18"github.com/prometheus/prometheus/model/labels"19corev1 "k8s.io/api/core/v1"20metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"21kubetypes "k8s.io/apimachinery/pkg/types"22)2324// tailerTask is the payload used to create tailers. It implements runner.Task.25type tailerTask struct {26Options *Options27Target *Target28}2930var _ runner.Task = (*tailerTask)(nil)3132func (tt *tailerTask) Hash() uint64 { return tt.Target.Hash() }3334func (tt *tailerTask) Equals(other runner.Task) bool {35otherTask := other.(*tailerTask)3637// Quick path: pointers are exactly the same.38if tt == otherTask {39return true40}4142// Slow path: check individual fields which are part of the task.43return tt.Options == otherTask.Options &&44tt.Target.UID() == otherTask.Target.UID() &&45labels.Equal(tt.Target.Labels(), otherTask.Target.Labels())46}4748// A tailer tails the logs of a Kubernetes container. It is created by a49// [Manager].50type tailer struct {51log log.Logger52opts *Options53target *Target5455lset model.LabelSet56}5758var _ runner.Worker = (*tailer)(nil)5960// newTailer returns a new Tailer which tails logs from the target specified by61// the task.62func newTailer(l log.Logger, task *tailerTask) *tailer {63return &tailer{64log: log.WithPrefix(l, "target", task.Target.String()),65opts: task.Options,66target: task.Target,6768lset: newLabelSet(task.Target.Labels()),69}70}7172func newLabelSet(l labels.Labels) model.LabelSet {73res := make(model.LabelSet, len(l))74for _, pair := range l {75res[model.LabelName(pair.Name)] = model.LabelValue(pair.Value)76}77return res78}7980var retailBackoff = backoff.Config{81// Since our tailers have a maximum lifetime and are expected to regularly82// terminate to refresh their connection to the Kubernetes API, the minimum83// backoff starts at zero so there's minimum delay between expected84// terminations.85MinBackoff: 0,86MaxBackoff: time.Minute,87}8889func (t *tailer) Run(ctx context.Context) {90ctx, cancel := context.WithCancel(ctx)91defer cancel()9293level.Info(t.log).Log("msg", "tailer running")94defer level.Info(t.log).Log("msg", "tailer exited")9596bo := backoff.New(ctx, retailBackoff)9798handler := loki.NewEntryMutatorHandler(t.opts.Handler, func(e loki.Entry) loki.Entry {99// A log line got read, we can reset the backoff period now.100bo.Reset()101return e102})103defer handler.Stop()104105for bo.Ongoing() {106err := t.tail(ctx, handler)107if err == nil {108terminated, err := t.containerTerminated(ctx)109if terminated {110// The container shut down and won't come back; we can stop tailing it.111return112} else if err != nil {113level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err)114}115}116117if err != nil {118t.target.Report(time.Now().UTC(), err)119level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err)120}121bo.Wait()122}123}124125func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {126// Set a maximum lifetime of the tail to ensure that connections are127// reestablished. This avoids an issue where the Kubernetes API server stops128// responding with new logs while the connection is kept open.129ctx, cancel := context.WithTimeout(ctx, 1*time.Hour)130defer cancel()131132var (133key = t.target.NamespacedName()134containerName = t.target.ContainerName()135136positionsEnt = entryForTarget(t.target)137)138139var lastReadTime time.Time140141if offset, err := t.opts.Positions.Get(positionsEnt.Path, positionsEnt.Labels); err != nil {142level.Warn(t.log).Log("msg", "failed to load last read offset", "err", err)143} else {144lastReadTime = time.UnixMicro(offset)145}146147// If the last entry for our target is after the positions cache, use that148// instead.149if lastEntry := t.target.LastEntry(); lastEntry.After(lastReadTime) {150lastReadTime = lastEntry151}152153var offsetTime *metav1.Time154if !lastReadTime.IsZero() {155offsetTime = &metav1.Time{Time: lastReadTime}156}157158req := t.opts.Client.CoreV1().Pods(key.Namespace).GetLogs(key.Name, &corev1.PodLogOptions{159Follow: true,160Container: containerName,161SinceTime: offsetTime,162Timestamps: true, // Should be forced to true so we can parse the original timestamp back out.163})164165stream, err := req.Stream(ctx)166if err != nil {167return err168}169go func() {170<-ctx.Done()171_ = stream.Close()172}()173174level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)175176ch := handler.Chan()177reader := bufio.NewReader(stream)178179for {180line, err := reader.ReadString('\n')181182// Try processing the line before handling the error, since data may still183// be returned alongside an EOF.184if len(line) != 0 {185entryTimestamp, entryLine := parseKubernetesLog(line)186if !entryTimestamp.After(lastReadTime) {187continue188}189lastReadTime = entryTimestamp190191entry := loki.Entry{192Labels: t.lset.Clone(),193Entry: logproto.Entry{194Timestamp: entryTimestamp,195Line: entryLine,196},197}198199select {200case <-ctx.Done():201return nil202case ch <- entry:203// Save position after it's been sent over the channel.204t.opts.Positions.Put(positionsEnt.Path, positionsEnt.Labels, entryTimestamp.UnixMicro())205t.target.Report(entryTimestamp, nil)206}207}208209// Return an error if our stream closed. The caller will reopen the tailer210// forever until our tailer is closed.211//212// Even if EOF is returned, we still want to allow the tailer to retry213// until the tailer is shutdown; EOF being returned doesn't necessarily214// indicate that the logs are done, and could point to a brief network215// outage.216if err != nil && (errors.Is(err, io.EOF) || ctx.Err() != nil) {217return nil218} else if err != nil {219return err220}221}222}223224// containerTerminated determines whether the container this tailer was225// watching has terminated and won't restart. If containerTerminated returns226// true, it means that no more logs will appear for the watched target.227func (t *tailer) containerTerminated(ctx context.Context) (terminated bool, err error) {228var (229key = t.target.NamespacedName()230containerName = t.target.ContainerName()231)232233podInfo, err := t.opts.Client.CoreV1().Pods(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})234if err != nil {235return false, err236}237238// The pod UID is different than the one we were tailing; our UID has239// terminated.240if podInfo.GetUID() != kubetypes.UID(t.target.UID()) {241return true, nil242}243244containerInfo, containerType, found := findContainerStatus(podInfo, containerName)245if !found {246return false, fmt.Errorf("could not find container %q in pod status", containerName)247}248249restartPolicy := podInfo.Spec.RestartPolicy250251switch containerType {252case containerTypeApp:253// An app container will only restart if:254//255// * It is in a waiting (meaning it's waiting to run) or running state256// (meaning it already restarted before we had a chance to check)257// * It terminated with any exit code and restartPolicy is Always258// * It terminated with non-zero exit code and restartPolicy is not Never259//260// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy261switch {262case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:263return false, nil // Container will restart264case containerInfo.State.Terminated != nil && restartPolicy == corev1.RestartPolicyAlways:265return false, nil // Container will restart266case containerInfo.State.Terminated != nil && containerInfo.State.Terminated.ExitCode != 0 && restartPolicy != corev1.RestartPolicyNever:267return false, nil // Container will restart268default:269return true, nil // Container will *not* restart270}271272case containerTypeInit:273// An init container will only restart if:274//275// * It is in a waiting (meaning it's waiting to run) or running state276// (meaning it already restarted before we had a chance to check)277// * It terminated with an exit code of non-zero and restartPolicy is not278// Never.279//280// https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#understanding-init-containers281switch {282case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:283return false, nil // Container will restart284case containerInfo.State.Terminated != nil && containerInfo.State.Terminated.ExitCode != 0 && restartPolicy != corev1.RestartPolicyNever:285return false, nil // Container will restart286default:287return true, nil // Container will *not* restart288}289290case containerTypeEphemeral:291// Ephemeral containers never restart.292//293// https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/294switch {295case containerInfo.State.Waiting != nil || containerInfo.State.Running != nil:296return false, nil // Container is running or is about to run297default:298return true, nil // Container will *not* restart299}300}301302return false, nil303}304305// parseKubernetesLog parses a log line returned from the Kubernetes API,306// splitting out the timestamp and the log line. If the timestamp cannot be307// parsed, time.Now() is returned with the original log line intact.308//309// If the timestamp was parsed, it is stripped out of the resulting line of310// text.311func parseKubernetesLog(input string) (timestamp time.Time, line string) {312timestampOffset := strings.IndexByte(input, ' ')313if timestampOffset == -1 {314return time.Now().UTC(), input315}316317var remain string318if timestampOffset < len(input) {319remain = input[timestampOffset+1:]320}321322// Kubernetes can return timestamps in either RFC3339Nano or RFC3339, so we323// try both.324timestampString := input[:timestampOffset]325326if timestamp, err := time.Parse(time.RFC3339Nano, timestampString); err == nil {327return timestamp.UTC(), remain328}329330if timestamp, err := time.Parse(time.RFC3339, timestampString); err == nil {331return timestamp.UTC(), remain332}333334return time.Now().UTC(), input335}336337338