Path: blob/main/component/loki/source/docker/internal/dockertarget/target.go
4096 views
package dockertarget12// This code is copied from Promtail. The dockertarget package is used to3// configure and run the targets that can read logs from Docker containers and4// forward them to other loki components.56import (7"bufio"8"context"9"fmt"10"io"11"strconv"12"strings"13"sync"14"time"1516docker_types "github.com/docker/docker/api/types"17"github.com/docker/docker/client"18"github.com/docker/docker/pkg/stdcopy"19"github.com/go-kit/log"20"github.com/go-kit/log/level"21"github.com/grafana/agent/component/common/loki"22"github.com/grafana/agent/component/common/loki/positions"23"github.com/grafana/loki/pkg/logproto"24"github.com/prometheus/common/model"25"github.com/prometheus/prometheus/model/labels"26"github.com/prometheus/prometheus/model/relabel"27"go.uber.org/atomic"28)2930const (31// See github.com/prometheus/prometheus/discovery/moby32dockerLabel = model.MetaLabelPrefix + "docker_"33dockerLabelContainerPrefix = dockerLabel + "container_"34dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream"35)3637// Target enables reading Docker container logs.38type Target struct {39logger log.Logger40handler loki.EntryHandler41since int6442positions positions.Positions43containerName string44labels model.LabelSet45relabelConfig []*relabel.Config46metrics *Metrics4748cancel context.CancelFunc49client client.APIClient50wg sync.WaitGroup51running *atomic.Bool52err error53}5455// NewTarget starts a new target to read logs from a given container ID.56func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, containerID string, labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient) (*Target, error) {57pos, err := position.Get(positions.CursorKey(containerID), labels.String())58if err != nil {59return nil, err60}61var since int6462if pos != 0 {63since = pos64}6566t := &Target{67logger: logger,68handler: handler,69since: since,70positions: position,71containerName: containerID,72labels: labels,73relabelConfig: relabelConfig,74metrics: metrics,7576client: client,77running: atomic.NewBool(false),78}7980// NOTE (@tpaschalis) The original Promtail implementation would call81// t.StartIfNotRunning() right here to start tailing.82// We manage targets from a task's Run method.83return t, nil84}8586func (t *Target) processLoop(ctx context.Context) {87t.running.Store(true)88defer t.running.Store(false)8990t.wg.Add(1)91defer t.wg.Done()9293opts := docker_types.ContainerLogsOptions{94ShowStdout: true,95ShowStderr: true,96Follow: true,97Timestamps: true,98Since: strconv.FormatInt(t.since, 10),99}100inspectInfo, err := t.client.ContainerInspect(ctx, t.containerName)101if err != nil {102level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerName, "err", err)103t.err = err104return105}106logs, err := t.client.ContainerLogs(ctx, t.containerName, opts)107if err != nil {108level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err)109t.err = err110return111}112113// Start transferring114rstdout, wstdout := io.Pipe()115rstderr, wstderr := io.Pipe()116t.wg.Add(1)117go func() {118defer func() {119t.wg.Done()120wstdout.Close()121wstderr.Close()122t.Stop()123}()124var written int64125var err error126if inspectInfo.Config.Tty {127written, err = io.Copy(wstdout, logs)128} else {129written, err = stdcopy.StdCopy(wstdout, wstderr, logs)130}131if err != nil {132level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)133} else {134level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName)135}136}()137138// Start processing139t.wg.Add(2)140go t.process(rstdout, "stdout")141go t.process(rstderr, "stderr")142143// Wait until done144<-ctx.Done()145logs.Close()146level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName)147}148149// extractTs tries for read the timestamp from the beginning of the log line.150// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00.151func extractTs(line string) (time.Time, string, error) {152pair := strings.SplitN(line, " ", 2)153if len(pair) != 2 {154return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line)155}156ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])157if err != nil {158return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err)159}160return ts, pair[1], nil161}162163// https://devmarkpro.com/working-big-files-golang164func readLine(r *bufio.Reader) (string, error) {165var (166isPrefix = true167err error168line, ln []byte169)170171for isPrefix && err == nil {172line, isPrefix, err = r.ReadLine()173ln = append(ln, line...)174}175176return string(ln), err177}178179func (t *Target) process(r io.Reader, logStream string) {180defer func() {181t.wg.Done()182}()183184reader := bufio.NewReader(r)185for {186line, err := readLine(reader)187if err != nil {188if err == io.EOF {189break190}191level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)192t.metrics.dockerErrors.Inc()193}194195ts, line, err := extractTs(line)196if err != nil {197level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)198t.metrics.dockerErrors.Inc()199continue200}201202// Add all labels from the config, relabel and filter them.203lb := labels.NewBuilder(nil)204for k, v := range t.labels {205lb.Set(string(k), string(v))206}207lb.Set(dockerLabelLogStream, logStream)208processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)209210filtered := make(model.LabelSet)211for _, lbl := range processed {212if strings.HasPrefix(lbl.Name, "__") {213continue214}215filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)216}217218t.handler.Chan() <- loki.Entry{219Labels: filtered,220Entry: logproto.Entry{221Timestamp: ts,222Line: line,223},224}225t.metrics.dockerEntries.Inc()226227// NOTE(@tpaschalis) We don't save the positions entry with the228// filtered labels, but with the default label set, as this is the one229// used to find the original read offset from the client. This might be230// problematic if we have the same container with a different set of231// labels (e.g. duplicated and relabeled), but this shouldn't be the232// case anyway.233t.positions.Put(positions.CursorKey(t.containerName), t.labels.String(), ts.Unix())234}235}236237// StartIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice.238func (t *Target) StartIfNotRunning() {239if t.running.CompareAndSwap(false, true) {240level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerName)241ctx, cancel := context.WithCancel(context.Background())242t.cancel = cancel243go t.processLoop(ctx)244} else {245level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName)246}247}248249// Stop shuts down the target.250func (t *Target) Stop() {251t.cancel()252t.wg.Wait()253level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName)254}255256// Ready reports whether the target is running.257func (t *Target) Ready() bool {258return t.running.Load()259}260261// Labels reports the target's labels.262func (t *Target) Labels() model.LabelSet {263return t.labels264}265266// Name reports the container name.267func (t *Target) Name() string {268return t.containerName269}270271// Hash is used when comparing targets in tasks.272func (t *Target) Hash() uint64 {273return uint64(t.labels.Fingerprint())274}275276func (t *Target) Path() string {277return t.containerName278}279280// Details returns target-specific details.281func (t *Target) Details() interface{} {282var errMsg string283if t.err != nil {284errMsg = t.err.Error()285}286return map[string]string{287"id": t.containerName,288"error": errMsg,289"position": t.positions.GetString(positions.CursorKey(t.containerName), t.labels.String()),290"running": strconv.FormatBool(t.running.Load()),291}292}293294295