Path: blob/main/component/loki/source/docker/docker.go
4096 views
package docker12import (3"context"4"fmt"5"os"6"path/filepath"7"reflect"8"sync"9"time"1011"github.com/docker/docker/client"12"github.com/go-kit/log"13"github.com/go-kit/log/level"14"github.com/grafana/agent/component"15"github.com/grafana/agent/component/common/loki"16"github.com/grafana/agent/component/common/loki/positions"17flow_relabel "github.com/grafana/agent/component/common/relabel"18"github.com/grafana/agent/component/discovery"19dt "github.com/grafana/agent/component/loki/source/docker/internal/dockertarget"20"github.com/prometheus/common/model"21"github.com/prometheus/prometheus/model/relabel"22)2324func init() {25component.Register(component.Registration{26Name: "loki.source.docker",27Args: Arguments{},2829Build: func(opts component.Options, args component.Arguments) (component.Component, error) {30return New(opts, args.(Arguments))31},32})33}3435const (36dockerLabel = model.MetaLabelPrefix + "docker_"37dockerLabelContainerPrefix = dockerLabel + "container_"38dockerLabelContainerID = dockerLabelContainerPrefix + "id"39)4041// Arguments holds values which are used to configure the loki.source.docker42// component.43type Arguments struct {44Host string `river:"host,attr"`45Targets []discovery.Target `river:"targets,attr"`46ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`47Labels map[string]string `river:"labels,attr,optional"`48RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`49}5051var (52_ component.Component = (*Component)(nil)53_ component.DebugComponent = (*Component)(nil)54)5556// Component implements the loki.source.file component.57type Component struct {58opts component.Options59metrics *dt.Metrics6061mut sync.RWMutex62args Arguments63manager *manager64lastOptions *options65handler loki.LogsReceiver66posFile positions.Positions67rcs []*relabel.Config68defaultLabels model.LabelSet6970receiversMut sync.RWMutex71receivers []loki.LogsReceiver72}7374// New creates a new loki.source.file component.75func New(o component.Options, args Arguments) (*Component, error) {76err := os.MkdirAll(o.DataPath, 0750)77if err != nil && !os.IsExist(err) {78return nil, err79}80positionsFile, err := positions.New(o.Logger, positions.Config{81SyncPeriod: 10 * time.Second,82PositionsFile: filepath.Join(o.DataPath, "positions.yml"),83IgnoreInvalidYaml: false,84ReadOnly: false,85})86if err != nil {87return nil, err88}8990c := &Component{91opts: o,92metrics: dt.NewMetrics(o.Registerer),9394handler: make(loki.LogsReceiver),95manager: newManager(o.Logger, nil),96receivers: args.ForwardTo,97posFile: positionsFile,98}99100// Call to Update() to start readers and set receivers once at the start.101if err := c.Update(args); err != nil {102return nil, err103}104105return c, nil106}107108// Run implements component.Component.109func (c *Component) Run(ctx context.Context) error {110defer c.posFile.Stop()111112defer func() {113c.mut.Lock()114defer c.mut.Unlock()115116// Guard for safety, but it's not possible for Run to be called without117// c.tailer being initialized.118if c.manager != nil {119c.manager.stop()120}121}()122123for {124select {125case <-ctx.Done():126return nil127case entry := <-c.handler:128c.receiversMut.RLock()129receivers := c.receivers130c.receiversMut.RUnlock()131for _, receiver := range receivers {132receiver <- entry133}134}135}136}137138// Update implements component.Component.139func (c *Component) Update(args component.Arguments) error {140newArgs := args.(Arguments)141142// Update the receivers before anything else, just in case something fails.143c.receiversMut.Lock()144c.receivers = newArgs.ForwardTo145c.receiversMut.Unlock()146147c.mut.Lock()148defer c.mut.Unlock()149150managerOpts, err := c.getManagerOptions(newArgs)151if err != nil {152return err153}154155if managerOpts != c.lastOptions {156// Options changed; pass it to the tailer.157// This will never fail because it only fails if the context gets canceled.158_ = c.manager.updateOptions(context.Background(), managerOpts)159c.lastOptions = managerOpts160}161162defaultLabels := make(model.LabelSet, len(newArgs.Labels))163for k, v := range newArgs.Labels {164defaultLabels[model.LabelName(k)] = model.LabelValue(v)165}166c.defaultLabels = defaultLabels167168if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {169c.rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)170} else {171c.rcs = []*relabel.Config{}172}173174// Convert input targets into targets to give to tailer.175targets := make([]*dt.Target, 0, len(newArgs.Targets))176177for _, target := range newArgs.Targets {178containerID, ok := target[dockerLabelContainerID]179if !ok {180level.Debug(c.opts.Logger).Log("msg", "docker target did not include container ID label:"+dockerLabelContainerID)181continue182}183184var labels = make(model.LabelSet)185for k, v := range target {186labels[model.LabelName(k)] = model.LabelValue(v)187}188189tgt, err := dt.NewTarget(190c.metrics,191log.With(c.opts.Logger, "target", fmt.Sprintf("docker/%s", containerID)),192c.manager.opts.handler,193c.manager.opts.positions,194containerID,195labels.Merge(c.defaultLabels),196c.rcs,197c.manager.opts.client,198)199if err != nil {200return err201}202targets = append(targets, tgt)203204// This will never fail because it only fails if the context gets canceled.205_ = c.manager.syncTargets(context.Background(), targets)206}207208c.args = newArgs209return nil210}211212// getTailerOptions gets tailer options from arguments. If args hasn't changed213// from the last call to getTailerOptions, c.lastOptions is returned.214// c.lastOptions must be updated by the caller.215//216// getTailerOptions must only be called when c.mut is held.217func (c *Component) getManagerOptions(args Arguments) (*options, error) {218if reflect.DeepEqual(c.args.Host, args.Host) && c.lastOptions != nil {219return c.lastOptions, nil220}221222opts := []client.Opt{223client.WithHost(args.Host),224client.WithAPIVersionNegotiation(),225}226client, err := client.NewClientWithOpts(opts...)227if err != nil {228level.Error(c.opts.Logger).Log("msg", "could not create new Docker client", "err", err)229return c.lastOptions, fmt.Errorf("failed to build docker client: %w", err)230}231232return &options{233client: client,234handler: loki.NewEntryHandler(c.handler, func() {}),235positions: c.posFile,236}, nil237}238239// DebugInfo returns information about the status of tailed targets.240func (c *Component) DebugInfo() interface{} {241var res readerDebugInfo242for _, tgt := range c.manager.targets() {243details := tgt.Details().(map[string]string)244res.TargetsInfo = append(res.TargetsInfo, targetInfo{245Labels: tgt.Labels().String(),246ID: details["id"],247LastError: details["error"],248IsRunning: details["running"],249ReadOffset: details["position"],250})251}252return res253}254255type readerDebugInfo struct {256TargetsInfo []targetInfo `river:"targets_info,block"`257}258259type targetInfo struct {260ID string `river:"id,attr"`261LastError string `river:"last_error,attr"`262Labels string `river:"labels,attr"`263IsRunning string `river:"is_running,attr"`264ReadOffset string `river:"read_offset,attr"`265}266267268