Path: blob/main/component/loki/source/docker/runner.go
4096 views
package docker12import (3"context"4"sync"56"github.com/docker/docker/api/types/container"7"github.com/docker/docker/client"8"github.com/go-kit/log"9"github.com/go-kit/log/level"10"github.com/grafana/agent/component/common/loki"11"github.com/grafana/agent/component/common/loki/positions"12dt "github.com/grafana/agent/component/loki/source/docker/internal/dockertarget"13"github.com/grafana/agent/pkg/runner"14"github.com/prometheus/common/model"15)1617// A manager manages a set of running tailers.18type manager struct {19log log.Logger2021mut sync.Mutex22opts *options23tasks []*tailerTask2425runner *runner.Runner[*tailerTask]26}2728// newManager returns a new Manager which manages a set of running tailers.29// Options must not be modified after passing it to a Manager.30//31// If newManager is called with a nil set of options, no targets will be32// scheduled for running until UpdateOptions is called.33func newManager(l log.Logger, opts *options) *manager {34return &manager{35log: l,36opts: opts,37runner: runner.New(func(t *tailerTask) runner.Worker {38return newTailer(l, t)39}),40}41}4243// options passed to all tailers.44type options struct {45// client to use to request logs from Docker.46client client.APIClient4748// handler to send discovered logs to.49handler loki.EntryHandler5051// positions interface so tailers can save/restore offsets in log files.52positions positions.Positions53}5455// tailerTask is the payload used to create tailers. It implements runner.Task.56type tailerTask struct {57options *options58target *dt.Target59}6061var _ runner.Task = (*tailerTask)(nil)6263func (tt *tailerTask) Hash() uint64 { return tt.target.Hash() }6465func (tt *tailerTask) Equals(other runner.Task) bool {66otherTask := other.(*tailerTask)6768// Quick path: pointers are exactly the same.69if tt == otherTask {70return true71}7273// Slow path: check individual fields which are part of the task.74return tt.options == otherTask.options &&75tt.target.Labels().String() == otherTask.target.Labels().String()76}7778// A tailer tails the logs of a docker container. It is created by a [Manager].79type tailer struct {80log log.Logger81opts *options82target *dt.Target8384lset model.LabelSet85}8687// newTailer returns a new tailer which tails logs from the target specified by88// the task.89func newTailer(l log.Logger, task *tailerTask) *tailer {90return &tailer{91log: log.WithPrefix(l, "target", task.target.Name()),92opts: task.options,93target: task.target,9495lset: task.target.Labels(),96}97}9899func (t *tailer) Run(ctx context.Context) {100ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)101102t.target.StartIfNotRunning()103104select {105case err := <-chErr:106// Error setting up the Wait request from the client; either failed to107// read from /containers/{containerID}/wait, or couldn't parse the108// response. Stop the target and exit the task after logging; if it was109// a transient error, the target will be retried on the next discovery110// refresh.111level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)112t.target.Stop()113return114case <-ch:115t.target.Stop()116return117}118}119120// syncTargets synchronizes the set of running tailers to the set specified by121// targets.122func (m *manager) syncTargets(ctx context.Context, targets []*dt.Target) error {123m.mut.Lock()124defer m.mut.Unlock()125126// Convert targets into tasks to give to the runner.127tasks := make([]*tailerTask, 0, len(targets))128for _, target := range targets {129tasks = append(tasks, &tailerTask{130options: m.opts,131target: target,132})133}134135// Sync our tasks to the runner. If the Manager doesn't have any options,136// the runner will be cleared of tasks until UpdateOptions is called with a137// non-nil set of options.138switch m.opts {139default:140if err := m.runner.ApplyTasks(ctx, tasks); err != nil {141return err142}143case nil:144if err := m.runner.ApplyTasks(ctx, nil); err != nil {145return err146}147}148149// Delete positions for targets which have gone away.150newEntries := make(map[positions.Entry]struct{}, len(targets))151for _, target := range targets {152newEntries[entryForTarget(target)] = struct{}{}153}154155for _, task := range m.tasks {156ent := entryForTarget(task.target)157158// The task from the last call to SyncTargets is no longer in newEntries;159// remove it from the positions file. We do this _after_ calling ApplyTasks160// to ensure that the old tailers have shut down, otherwise the tailer161// might write its position again during shutdown after we removed it.162if _, found := newEntries[ent]; !found {163level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels)164m.opts.positions.Remove(ent.Path, ent.Labels)165}166}167168m.tasks = tasks169return nil170}171172func entryForTarget(t *dt.Target) positions.Entry {173// The positions entry is keyed by container_id; the path is fed into174// positions.CursorKey to treat it as a "cursor"; otherwise175// positions.Positions will try to read the path as a file and delete the176// entry when it can't find it.177return positions.Entry{178Path: positions.CursorKey(t.Name()),179Labels: t.Labels().String(),180}181}182183// updateOptions updates the Options shared with all Tailers. All Tailers will184// be updated with the new set of Options. Options should not be modified after185// passing to updateOptions.186//187// If newOptions is nil, all tasks will be cleared until updateOptions is188// called again with a non-nil set of options.189func (m *manager) updateOptions(ctx context.Context, newOptions *options) error {190m.mut.Lock()191defer m.mut.Unlock()192193// Iterate through the previous set of tasks and create a new task with the194// new set of options.195tasks := make([]*tailerTask, 0, len(m.tasks))196for _, oldTask := range m.tasks {197tasks = append(tasks, &tailerTask{198options: newOptions,199target: oldTask.target,200})201}202203switch newOptions {204case nil:205if err := m.runner.ApplyTasks(ctx, nil); err != nil {206return err207}208default:209if err := m.runner.ApplyTasks(ctx, tasks); err != nil {210return err211}212}213214m.opts = newOptions215m.tasks = tasks216return nil217}218219// targets returns the set of targets which are actively being tailed. targets220// for tailers which have terminated are not included. The returned set of221// targets are deduplicated.222func (m *manager) targets() []*dt.Target {223tasks := m.runner.Tasks()224225targets := make([]*dt.Target, 0, len(tasks))226for _, task := range tasks {227targets = append(targets, task.target)228}229return targets230}231232// stop stops the manager and all running Tailers. It blocks until all Tailers233// have exited.234func (m *manager) stop() {235m.runner.Stop()236}237238239