Path: blob/main/component/otelcol/exporter/loki/internal/convert/convert.go
4100 views
// Package convert implements conversion utilities to convert between1// OpenTelemetry Collector and Loki data.2//3// It follows the [OpenTelemetry Logs Data Model] and the [loki translator]4// package for implementing the conversion.5//6// [OpenTelemetry Logs Data Model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md7// [loki translator]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/loki8package convert910import (11"context"12"fmt"13"strings"14"sync"1516"github.com/go-kit/log"17"github.com/go-kit/log/level"18"github.com/grafana/agent/component/common/loki"19"github.com/prometheus/client_golang/prometheus"20"go.opentelemetry.io/collector/consumer"21"go.opentelemetry.io/collector/pdata/pcommon"22"go.opentelemetry.io/collector/pdata/plog"23)2425// Converter implements consumer.Logs and converts received OTel logs into26// Loki-compatible log entries.27type Converter struct {28log log.Logger29metrics *metrics3031mut sync.RWMutex32next []loki.LogsReceiver // Location to write converted logs.33}3435var _ consumer.Logs = (*Converter)(nil)3637// New returns a new Converter. Converted logs are passed to the provided list38// of LogsReceivers.39func New(l log.Logger, r prometheus.Registerer, next []loki.LogsReceiver) *Converter {40if l == nil {41l = log.NewNopLogger()42}43m := newMetrics(r)44return &Converter{log: l, metrics: m, next: next}45}4647// Capabilities implements consumer.Logs.48func (conv *Converter) Capabilities() consumer.Capabilities {49return consumer.Capabilities{50MutatesData: false,51}52}5354// ConsumeLogs converts the provided OpenTelemetry Collector-formatted logs55// into Loki-compatible entries. Each call to ConsumeLogs will forward56// converted entries to the list of channels in the `next` field.57// This is reusing the logic from the OpenTelemetry Collector "contrib"58// distribution and its LogsToLokiRequests function.59func (conv *Converter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {60var entries []loki.Entry6162rls := ld.ResourceLogs()63for i := 0; i < rls.Len(); i++ {64ills := rls.At(i).ScopeLogs()65for j := 0; j < ills.Len(); j++ {66logs := ills.At(j).LogRecords()67for k := 0; k < logs.Len(); k++ {68conv.metrics.entriesTotal.Inc()6970// we may remove attributes, so to avoid mutating the original71// log entry, we make our own copy and change that instead.72log := plog.NewLogRecord()73logs.At(k).CopyTo(log)7475// similarly, we may remove resources, so to avoid mutating the76// original log entry, we make and use our own copy instead.77resource := pcommon.NewResource()78rls.At(i).Resource().CopyTo(resource)7980// adds level attribute from log.severityNumber81addLogLevelAttributeAndHint(log)8283// TODO (@tpaschalis) If we want to pre-populate a tenant84// label from the OTel hint, it should happen here. with the85// upstream getTenantFromTenantHint helper.8687format := getFormatFromFormatHint(log.Attributes(), resource.Attributes())8889mergedLabels := convertAttributesAndMerge(log.Attributes(), resource.Attributes())90// remove the attributes that were promoted to labels91removeAttributes(log.Attributes(), mergedLabels)92removeAttributes(resource.Attributes(), mergedLabels)9394entry, err := convertLogToLokiEntry(log, resource, format)95if err != nil {96level.Error(conv.log).Log("msg", "failed to convert log to loki entry", "err", err)97conv.metrics.entriesFailed.Inc()98continue99}100101conv.metrics.entriesProcessed.Inc()102entries = append(entries, loki.Entry{103Labels: mergedLabels,104Entry: *entry,105})106}107}108}109110for _, entry := range entries {111conv.mut.RLock()112for _, receiver := range conv.next {113select {114case <-ctx.Done():115return nil116case receiver <- entry:117// no-op, send the entry along118}119}120conv.mut.RUnlock()121}122return nil123}124125// UpdateFanout sets the locations the converter forwards log entries to.126func (conv *Converter) UpdateFanout(fanout []loki.LogsReceiver) {127conv.mut.Lock()128defer conv.mut.Unlock()129130conv.next = fanout131}132133func addLogLevelAttributeAndHint(log plog.LogRecord) {134if log.SeverityNumber() == plog.SeverityNumberUnspecified {135return136}137addHint(log)138if _, found := log.Attributes().Get(levelAttributeName); !found {139level := severityNumberToLevel[log.SeverityNumber().String()]140log.Attributes().PutStr(levelAttributeName, level)141}142}143144func addHint(log plog.LogRecord) {145if value, found := log.Attributes().Get(hintAttributes); found && !strings.Contains(value.AsString(), levelAttributeName) {146log.Attributes().PutStr(hintAttributes, fmt.Sprintf("%s,%s", value.AsString(), levelAttributeName))147} else {148log.Attributes().PutStr(hintAttributes, levelAttributeName)149}150}151152var severityNumberToLevel = map[string]string{153plog.SeverityNumberUnspecified.String(): "UNSPECIFIED",154plog.SeverityNumberTrace.String(): "TRACE",155plog.SeverityNumberTrace2.String(): "TRACE2",156plog.SeverityNumberTrace3.String(): "TRACE3",157plog.SeverityNumberTrace4.String(): "TRACE4",158plog.SeverityNumberDebug.String(): "DEBUG",159plog.SeverityNumberDebug2.String(): "DEBUG2",160plog.SeverityNumberDebug3.String(): "DEBUG3",161plog.SeverityNumberDebug4.String(): "DEBUG4",162plog.SeverityNumberInfo.String(): "INFO",163plog.SeverityNumberInfo2.String(): "INFO2",164plog.SeverityNumberInfo3.String(): "INFO3",165plog.SeverityNumberInfo4.String(): "INFO4",166plog.SeverityNumberWarn.String(): "WARN",167plog.SeverityNumberWarn2.String(): "WARN2",168plog.SeverityNumberWarn3.String(): "WARN3",169plog.SeverityNumberWarn4.String(): "WARN4",170plog.SeverityNumberError.String(): "ERROR",171plog.SeverityNumberError2.String(): "ERROR2",172plog.SeverityNumberError3.String(): "ERROR3",173plog.SeverityNumberError4.String(): "ERROR4",174plog.SeverityNumberFatal.String(): "FATAL",175plog.SeverityNumberFatal2.String(): "FATAL2",176plog.SeverityNumberFatal3.String(): "FATAL3",177plog.SeverityNumberFatal4.String(): "FATAL4",178}179180func getFormatFromFormatHint(logAttr pcommon.Map, resourceAttr pcommon.Map) string {181format := formatJSON182formatVal, found := resourceAttr.Get(hintFormat)183if !found {184formatVal, found = logAttr.Get(hintFormat)185}186187if found {188format = formatVal.AsString()189}190return format191}192193194