Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/loki/internal/convert/convert.go
4100 views
1
// Package convert implements conversion utilities to convert between
2
// OpenTelemetry Collector and Loki data.
3
//
4
// It follows the [OpenTelemetry Logs Data Model] and the [loki translator]
5
// package for implementing the conversion.
6
//
7
// [OpenTelemetry Logs Data Model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md
8
// [loki translator]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/loki
9
package convert
10
11
import (
12
"context"
13
"fmt"
14
"strings"
15
"sync"
16
17
"github.com/go-kit/log"
18
"github.com/go-kit/log/level"
19
"github.com/grafana/agent/component/common/loki"
20
"github.com/prometheus/client_golang/prometheus"
21
"go.opentelemetry.io/collector/consumer"
22
"go.opentelemetry.io/collector/pdata/pcommon"
23
"go.opentelemetry.io/collector/pdata/plog"
24
)
25
26
// Converter implements consumer.Logs and converts received OTel logs into
27
// Loki-compatible log entries.
28
type Converter struct {
29
log log.Logger
30
metrics *metrics
31
32
mut sync.RWMutex
33
next []loki.LogsReceiver // Location to write converted logs.
34
}
35
36
var _ consumer.Logs = (*Converter)(nil)
37
38
// New returns a new Converter. Converted logs are passed to the provided list
39
// of LogsReceivers.
40
func New(l log.Logger, r prometheus.Registerer, next []loki.LogsReceiver) *Converter {
41
if l == nil {
42
l = log.NewNopLogger()
43
}
44
m := newMetrics(r)
45
return &Converter{log: l, metrics: m, next: next}
46
}
47
48
// Capabilities implements consumer.Logs.
49
func (conv *Converter) Capabilities() consumer.Capabilities {
50
return consumer.Capabilities{
51
MutatesData: false,
52
}
53
}
54
55
// ConsumeLogs converts the provided OpenTelemetry Collector-formatted logs
56
// into Loki-compatible entries. Each call to ConsumeLogs will forward
57
// converted entries to the list of channels in the `next` field.
58
// This is reusing the logic from the OpenTelemetry Collector "contrib"
59
// distribution and its LogsToLokiRequests function.
60
func (conv *Converter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
61
var entries []loki.Entry
62
63
rls := ld.ResourceLogs()
64
for i := 0; i < rls.Len(); i++ {
65
ills := rls.At(i).ScopeLogs()
66
for j := 0; j < ills.Len(); j++ {
67
logs := ills.At(j).LogRecords()
68
for k := 0; k < logs.Len(); k++ {
69
conv.metrics.entriesTotal.Inc()
70
71
// we may remove attributes, so to avoid mutating the original
72
// log entry, we make our own copy and change that instead.
73
log := plog.NewLogRecord()
74
logs.At(k).CopyTo(log)
75
76
// similarly, we may remove resources, so to avoid mutating the
77
// original log entry, we make and use our own copy instead.
78
resource := pcommon.NewResource()
79
rls.At(i).Resource().CopyTo(resource)
80
81
// adds level attribute from log.severityNumber
82
addLogLevelAttributeAndHint(log)
83
84
// TODO (@tpaschalis) If we want to pre-populate a tenant
85
// label from the OTel hint, it should happen here. with the
86
// upstream getTenantFromTenantHint helper.
87
88
format := getFormatFromFormatHint(log.Attributes(), resource.Attributes())
89
90
mergedLabels := convertAttributesAndMerge(log.Attributes(), resource.Attributes())
91
// remove the attributes that were promoted to labels
92
removeAttributes(log.Attributes(), mergedLabels)
93
removeAttributes(resource.Attributes(), mergedLabels)
94
95
entry, err := convertLogToLokiEntry(log, resource, format)
96
if err != nil {
97
level.Error(conv.log).Log("msg", "failed to convert log to loki entry", "err", err)
98
conv.metrics.entriesFailed.Inc()
99
continue
100
}
101
102
conv.metrics.entriesProcessed.Inc()
103
entries = append(entries, loki.Entry{
104
Labels: mergedLabels,
105
Entry: *entry,
106
})
107
}
108
}
109
}
110
111
for _, entry := range entries {
112
conv.mut.RLock()
113
for _, receiver := range conv.next {
114
select {
115
case <-ctx.Done():
116
return nil
117
case receiver <- entry:
118
// no-op, send the entry along
119
}
120
}
121
conv.mut.RUnlock()
122
}
123
return nil
124
}
125
126
// UpdateFanout sets the locations the converter forwards log entries to.
127
func (conv *Converter) UpdateFanout(fanout []loki.LogsReceiver) {
128
conv.mut.Lock()
129
defer conv.mut.Unlock()
130
131
conv.next = fanout
132
}
133
134
func addLogLevelAttributeAndHint(log plog.LogRecord) {
135
if log.SeverityNumber() == plog.SeverityNumberUnspecified {
136
return
137
}
138
addHint(log)
139
if _, found := log.Attributes().Get(levelAttributeName); !found {
140
level := severityNumberToLevel[log.SeverityNumber().String()]
141
log.Attributes().PutStr(levelAttributeName, level)
142
}
143
}
144
145
func addHint(log plog.LogRecord) {
146
if value, found := log.Attributes().Get(hintAttributes); found && !strings.Contains(value.AsString(), levelAttributeName) {
147
log.Attributes().PutStr(hintAttributes, fmt.Sprintf("%s,%s", value.AsString(), levelAttributeName))
148
} else {
149
log.Attributes().PutStr(hintAttributes, levelAttributeName)
150
}
151
}
152
153
var severityNumberToLevel = map[string]string{
154
plog.SeverityNumberUnspecified.String(): "UNSPECIFIED",
155
plog.SeverityNumberTrace.String(): "TRACE",
156
plog.SeverityNumberTrace2.String(): "TRACE2",
157
plog.SeverityNumberTrace3.String(): "TRACE3",
158
plog.SeverityNumberTrace4.String(): "TRACE4",
159
plog.SeverityNumberDebug.String(): "DEBUG",
160
plog.SeverityNumberDebug2.String(): "DEBUG2",
161
plog.SeverityNumberDebug3.String(): "DEBUG3",
162
plog.SeverityNumberDebug4.String(): "DEBUG4",
163
plog.SeverityNumberInfo.String(): "INFO",
164
plog.SeverityNumberInfo2.String(): "INFO2",
165
plog.SeverityNumberInfo3.String(): "INFO3",
166
plog.SeverityNumberInfo4.String(): "INFO4",
167
plog.SeverityNumberWarn.String(): "WARN",
168
plog.SeverityNumberWarn2.String(): "WARN2",
169
plog.SeverityNumberWarn3.String(): "WARN3",
170
plog.SeverityNumberWarn4.String(): "WARN4",
171
plog.SeverityNumberError.String(): "ERROR",
172
plog.SeverityNumberError2.String(): "ERROR2",
173
plog.SeverityNumberError3.String(): "ERROR3",
174
plog.SeverityNumberError4.String(): "ERROR4",
175
plog.SeverityNumberFatal.String(): "FATAL",
176
plog.SeverityNumberFatal2.String(): "FATAL2",
177
plog.SeverityNumberFatal3.String(): "FATAL3",
178
plog.SeverityNumberFatal4.String(): "FATAL4",
179
}
180
181
func getFormatFromFormatHint(logAttr pcommon.Map, resourceAttr pcommon.Map) string {
182
format := formatJSON
183
formatVal, found := resourceAttr.Get(hintFormat)
184
if !found {
185
formatVal, found = logAttr.Get(hintFormat)
186
}
187
188
if found {
189
format = formatVal.AsString()
190
}
191
return format
192
}
193
194