Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/app_agent_receiver/logs_exporter.go
5393 views
1
package app_agent_receiver
2
3
import (
4
"context"
5
"fmt"
6
"time"
7
8
kitlog "github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/go-logfmt/logfmt"
11
"github.com/grafana/agent/pkg/logs"
12
"github.com/grafana/loki/clients/pkg/promtail/api"
13
"github.com/grafana/loki/pkg/logproto"
14
prommodel "github.com/prometheus/common/model"
15
)
16
17
// logsInstance is an interface with capability to send log entries
18
type logsInstance interface {
19
SendEntry(entry api.Entry, dur time.Duration) bool
20
}
21
22
// logsInstanceGetter is a function that returns a LogsInstance to send log entries to
23
type logsInstanceGetter func() (logsInstance, error)
24
25
// LogsExporterConfig holds the configuration of the logs exporter
26
type LogsExporterConfig struct {
27
SendEntryTimeout time.Duration
28
GetLogsInstance logsInstanceGetter
29
Labels map[string]string
30
}
31
32
// LogsExporter will send logs & errors to loki
33
type LogsExporter struct {
34
getLogsInstance logsInstanceGetter
35
sendEntryTimeout time.Duration
36
logger kitlog.Logger
37
labels map[string]string
38
sourceMapStore SourceMapStore
39
}
40
41
// NewLogsExporter creates a new logs exporter with the given
42
// configuration
43
func NewLogsExporter(logger kitlog.Logger, conf LogsExporterConfig, sourceMapStore SourceMapStore) appAgentReceiverExporter {
44
return &LogsExporter{
45
logger: logger,
46
getLogsInstance: conf.GetLogsInstance,
47
sendEntryTimeout: conf.SendEntryTimeout,
48
labels: conf.Labels,
49
sourceMapStore: sourceMapStore,
50
}
51
}
52
53
// Name of the exporter, for logging purposes
54
func (le *LogsExporter) Name() string {
55
return "logs exporter"
56
}
57
58
// Export implements the AppDataExporter interface
59
func (le *LogsExporter) Export(ctx context.Context, payload Payload) error {
60
meta := payload.Meta.KeyVal()
61
62
var err error
63
64
// log events
65
for _, logItem := range payload.Logs {
66
kv := logItem.KeyVal()
67
MergeKeyVal(kv, meta)
68
err = le.sendKeyValsToLogsPipeline(kv)
69
}
70
71
// exceptions
72
for _, exception := range payload.Exceptions {
73
transformedException := TransformException(le.sourceMapStore, le.logger, &exception, payload.Meta.App.Release)
74
kv := transformedException.KeyVal()
75
MergeKeyVal(kv, meta)
76
err = le.sendKeyValsToLogsPipeline(kv)
77
}
78
79
// measurements
80
for _, measurement := range payload.Measurements {
81
kv := measurement.KeyVal()
82
MergeKeyVal(kv, meta)
83
err = le.sendKeyValsToLogsPipeline(kv)
84
}
85
86
// events
87
for _, event := range payload.Events {
88
kv := event.KeyVal()
89
MergeKeyVal(kv, meta)
90
err = le.sendKeyValsToLogsPipeline(kv)
91
}
92
93
return err
94
}
95
96
func (le *LogsExporter) sendKeyValsToLogsPipeline(kv *KeyVal) error {
97
line, err := logfmt.MarshalKeyvals(KeyValToInterfaceSlice(kv)...)
98
if err != nil {
99
level.Error(le.logger).Log("msg", "failed to logfmt a frontend log event", "err", err)
100
return err
101
}
102
instance, err := le.getLogsInstance()
103
if err != nil {
104
return err
105
}
106
sent := instance.SendEntry(api.Entry{
107
Labels: le.labelSet(kv),
108
Entry: logproto.Entry{
109
Timestamp: time.Now(),
110
Line: string(line),
111
},
112
}, le.sendEntryTimeout)
113
if !sent {
114
level.Warn(le.logger).Log("msg", "failed to log frontend log event to logs pipeline")
115
return fmt.Errorf("failed to send app event to logs pipeline")
116
}
117
return nil
118
}
119
120
func (le *LogsExporter) labelSet(kv *KeyVal) prommodel.LabelSet {
121
set := make(prommodel.LabelSet, len(le.labels))
122
123
for k, v := range le.labels {
124
if len(v) > 0 {
125
set[prommodel.LabelName(k)] = prommodel.LabelValue(v)
126
} else {
127
if val, ok := kv.Get(k); ok {
128
set[prommodel.LabelName(k)] = prommodel.LabelValue(fmt.Sprint(val))
129
}
130
}
131
}
132
133
return set
134
}
135
136
// Static typecheck tests
137
var (
138
_ appAgentReceiverExporter = (*LogsExporter)(nil)
139
_ logsInstance = (*logs.Instance)(nil)
140
)
141
142