Path: blob/main/pkg/integrations/v2/app_agent_receiver/logs_exporter.go
5393 views
package app_agent_receiver12import (3"context"4"fmt"5"time"67kitlog "github.com/go-kit/log"8"github.com/go-kit/log/level"9"github.com/go-logfmt/logfmt"10"github.com/grafana/agent/pkg/logs"11"github.com/grafana/loki/clients/pkg/promtail/api"12"github.com/grafana/loki/pkg/logproto"13prommodel "github.com/prometheus/common/model"14)1516// logsInstance is an interface with capability to send log entries17type logsInstance interface {18SendEntry(entry api.Entry, dur time.Duration) bool19}2021// logsInstanceGetter is a function that returns a LogsInstance to send log entries to22type logsInstanceGetter func() (logsInstance, error)2324// LogsExporterConfig holds the configuration of the logs exporter25type LogsExporterConfig struct {26SendEntryTimeout time.Duration27GetLogsInstance logsInstanceGetter28Labels map[string]string29}3031// LogsExporter will send logs & errors to loki32type LogsExporter struct {33getLogsInstance logsInstanceGetter34sendEntryTimeout time.Duration35logger kitlog.Logger36labels map[string]string37sourceMapStore SourceMapStore38}3940// NewLogsExporter creates a new logs exporter with the given41// configuration42func NewLogsExporter(logger kitlog.Logger, conf LogsExporterConfig, sourceMapStore SourceMapStore) appAgentReceiverExporter {43return &LogsExporter{44logger: logger,45getLogsInstance: conf.GetLogsInstance,46sendEntryTimeout: conf.SendEntryTimeout,47labels: conf.Labels,48sourceMapStore: sourceMapStore,49}50}5152// Name of the exporter, for logging purposes53func (le *LogsExporter) Name() string {54return "logs exporter"55}5657// Export implements the AppDataExporter interface58func (le *LogsExporter) Export(ctx context.Context, payload Payload) error {59meta := payload.Meta.KeyVal()6061var err error6263// log events64for _, logItem := range payload.Logs {65kv := logItem.KeyVal()66MergeKeyVal(kv, meta)67err = le.sendKeyValsToLogsPipeline(kv)68}6970// exceptions71for _, exception := range payload.Exceptions {72transformedException := TransformException(le.sourceMapStore, le.logger, &exception, payload.Meta.App.Release)73kv := transformedException.KeyVal()74MergeKeyVal(kv, meta)75err = le.sendKeyValsToLogsPipeline(kv)76}7778// measurements79for _, measurement := range payload.Measurements {80kv := measurement.KeyVal()81MergeKeyVal(kv, meta)82err = le.sendKeyValsToLogsPipeline(kv)83}8485// events86for _, event := range payload.Events {87kv := event.KeyVal()88MergeKeyVal(kv, meta)89err = le.sendKeyValsToLogsPipeline(kv)90}9192return err93}9495func (le *LogsExporter) sendKeyValsToLogsPipeline(kv *KeyVal) error {96line, err := logfmt.MarshalKeyvals(KeyValToInterfaceSlice(kv)...)97if err != nil {98level.Error(le.logger).Log("msg", "failed to logfmt a frontend log event", "err", err)99return err100}101instance, err := le.getLogsInstance()102if err != nil {103return err104}105sent := instance.SendEntry(api.Entry{106Labels: le.labelSet(kv),107Entry: logproto.Entry{108Timestamp: time.Now(),109Line: string(line),110},111}, le.sendEntryTimeout)112if !sent {113level.Warn(le.logger).Log("msg", "failed to log frontend log event to logs pipeline")114return fmt.Errorf("failed to send app event to logs pipeline")115}116return nil117}118119func (le *LogsExporter) labelSet(kv *KeyVal) prommodel.LabelSet {120set := make(prommodel.LabelSet, len(le.labels))121122for k, v := range le.labels {123if len(v) > 0 {124set[prommodel.LabelName(k)] = prommodel.LabelValue(v)125} else {126if val, ok := kv.Get(k); ok {127set[prommodel.LabelName(k)] = prommodel.LabelValue(fmt.Sprint(val))128}129}130}131132return set133}134135// Static typecheck tests136var (137_ appAgentReceiverExporter = (*LogsExporter)(nil)138_ logsInstance = (*logs.Instance)(nil)139)140141142