Path: blob/main/component/loki/source/kubernetes/kubetail/kubetail.go
4096 views
// Package kubetail implements a log file tailer using the Kubernetes API.1package kubetail23import (4"context"5"sync"67"github.com/go-kit/log"8"github.com/go-kit/log/level"9"github.com/grafana/agent/component/common/loki"10"github.com/grafana/agent/component/common/loki/positions"11"github.com/grafana/agent/pkg/runner"12"k8s.io/client-go/kubernetes"13)1415// Options passed to all tailers.16type Options struct {17// Client to use to request logs from Kubernetes.18Client *kubernetes.Clientset1920// Handler to send discovered logs to.21Handler loki.EntryHandler2223// Positions interface so tailers can save/restore offsets in log files.24Positions positions.Positions25}2627// A Manager manages a set of running Tailers.28type Manager struct {29log log.Logger3031mut sync.Mutex32opts *Options33tasks []*tailerTask3435runner *runner.Runner[*tailerTask]36}3738// NewManager returns a new Manager which manages a set of running tailers.39// Options must not be modified after passing it to a Manager.40//41// If NewManager is called with a nil set of options, no targets will be42// scheduled for running until UpdateOptions is called.43func NewManager(l log.Logger, opts *Options) *Manager {44return &Manager{45log: l,46opts: opts,47runner: runner.New(func(t *tailerTask) runner.Worker {48return newTailer(l, t)49}),50}51}5253// SyncTargets synchronizes the set of running tailers to the set specified by54// targets.55func (m *Manager) SyncTargets(ctx context.Context, targets []*Target) error {56m.mut.Lock()57defer m.mut.Unlock()5859// Convert targets into tasks to give to the runner.60tasks := make([]*tailerTask, 0, len(targets))61for _, target := range targets {62tasks = append(tasks, &tailerTask{63Options: m.opts,64Target: target,65})66}6768// Sync our tasks to the runner. If the Manager doesn't have any options, the69// runner will be cleaered of tasks until UpdateOptions is called with a70// non-nil set of options.71switch m.opts {72default:73if err := m.runner.ApplyTasks(ctx, tasks); err != nil {74return err75}76case nil:77if err := m.runner.ApplyTasks(ctx, nil); err != nil {78return err79}80}8182// Delete positions for targets which have gone away.83newEntries := make(map[positions.Entry]struct{}, len(targets))84for _, target := range targets {85newEntries[entryForTarget(target)] = struct{}{}86}8788for _, task := range m.tasks {89ent := entryForTarget(task.Target)9091// The task from the last call to SyncTargets is no longer in newEntries;92// remove it from the positions file. We do this _after_ calling ApplyTasks93// to ensure that the old tailers have shut down, otherwise the tailer94// might write its position again during shutdown after we removed it.95if _, found := newEntries[ent]; !found {96level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels)97m.opts.Positions.Remove(ent.Path, ent.Labels)98}99}100101m.tasks = tasks102return nil103}104105func entryForTarget(t *Target) positions.Entry {106// The positions entry is keyed by UID to ensure that positions from107// completely distinct "namespace/name:container" instances don't interfere108// with each other.109//110// While it's still technically possible for two containers to have the same111// "namespace/name:container" string and UID, it's so wildly unlikely that112// it's probably not worth handling.113//114// The path is fed into positions.CursorKey to treat it as a "cursor";115// otherwise positions.Positions will try to read the path as a file and116// delete the entry when it can't find it.117return positions.Entry{118Path: positions.CursorKey(t.String() + ":" + t.UID()),119Labels: t.Labels().String(),120}121}122123// UpdateOptions updates the Options shared with all Tailers. All Tailers will124// be updated with the new set of Options. Options should not be modified after125// passing to UpdateOptions.126//127// If newOptions is nil, all tasks will be cleared until UpdateOptions is128// called again with a non-nil set of options.129func (m *Manager) UpdateOptions(ctx context.Context, newOptions *Options) error {130m.mut.Lock()131defer m.mut.Unlock()132133// Iterate through the previous set of tasks and create a new task with the134// new set of options.135tasks := make([]*tailerTask, 0, len(m.tasks))136for _, oldTask := range m.tasks {137tasks = append(tasks, &tailerTask{138Options: newOptions,139Target: oldTask.Target,140})141}142143switch newOptions {144case nil:145if err := m.runner.ApplyTasks(ctx, nil); err != nil {146return err147}148default:149if err := m.runner.ApplyTasks(ctx, tasks); err != nil {150return err151}152}153154m.opts = newOptions155m.tasks = tasks156return nil157}158159// Targets returns the set of targets which are actively being tailed. Targets160// for tailers which have terminated are not included. The returned set of161// targets are deduplicated.162func (m *Manager) Targets() []*Target {163tasks := m.runner.Tasks()164165targets := make([]*Target, 0, len(tasks))166for _, task := range tasks {167targets = append(targets, task.Target)168}169return targets170}171172// Stop stops the manager and all running Tailers. It blocks until all Tailers173// have exited.174func (m *Manager) Stop() {175m.runner.Stop()176}177178179