Path: blob/main/component/otelcol/receiver/loki/loki.go
4096 views
// Package loki provides an otelcol.receiver.loki component.1package loki23import (4"context"5"path"6"strings"7"sync"89"github.com/go-kit/log"10"github.com/go-kit/log/level"11"github.com/grafana/agent/component"12"github.com/grafana/agent/component/common/loki"13"github.com/grafana/agent/component/otelcol"14"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"15"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"16"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"17"go.opentelemetry.io/collector/consumer"18)1920func init() {21component.Register(component.Registration{22Name: "otelcol.receiver.loki",23Args: Arguments{},24Exports: Exports{},2526Build: func(o component.Options, a component.Arguments) (component.Component, error) {27return NewComponent(o, a.(Arguments))28},29})30}3132var hintAttributes = "loki.attribute.labels"3334// Arguments configures the otelcol.receiver.loki component.35type Arguments struct {36// Output configures where to send received data. Required.37Output *otelcol.ConsumerArguments `river:"output,block"`38}3940// Exports holds the receiver that is used to send log entries to the41// loki.write component.42type Exports struct {43Receiver loki.LogsReceiver `river:"receiver,attr"`44}4546// Component is the otelcol.receiver.loki component.47type Component struct {48log log.Logger49opts component.Options5051mut sync.RWMutex52receiver loki.LogsReceiver53logsSink consumer.Logs54}5556var _ component.Component = (*Component)(nil)5758// NewComponent creates a new otelcol.receiver.loki component.59func NewComponent(o component.Options, c Arguments) (*Component, error) {60// TODO(@tpaschalis) Create a metrics struct to count61// total/successful/errored log entries?62res := &Component{63log: o.Logger,64opts: o,65}6667// Create and immediately export the receiver which remains the same for68// the component's lifetime.69res.receiver = make(loki.LogsReceiver)70o.OnStateChange(Exports{Receiver: res.receiver})7172if err := res.Update(c); err != nil {73return nil, err74}75return res, nil76}7778// Run implements Component.79func (c *Component) Run(ctx context.Context) error {80for {81select {82case <-ctx.Done():83return nil84case entry := <-c.receiver:85stanzaEntry := parsePromtailEntry(entry)86plogEntry := adapter.Convert(stanzaEntry)8788// TODO(@tpaschalis) Is there any more handling to be done here?89err := c.logsSink.ConsumeLogs(ctx, plogEntry)90if err != nil {91level.Error(c.opts.Logger).Log("msg", "failed to consume log entries", "err", err)92}93}94}95}9697// Update implements Component.98func (c *Component) Update(newConfig component.Arguments) error {99c.mut.Lock()100defer c.mut.Unlock()101102cfg := newConfig.(Arguments)103c.logsSink = fanoutconsumer.Logs(cfg.Output.Logs)104105return nil106}107108// parsePromtailEntry creates new stanza.Entry from promtail entry109func parsePromtailEntry(inputEntry loki.Entry) *entry.Entry {110outputEntry := entry.New()111outputEntry.Body = inputEntry.Entry.Line112outputEntry.Timestamp = inputEntry.Entry.Timestamp113114var lbls []string115for key, val := range inputEntry.Labels {116valStr := string(val)117keyStr := string(key)118switch key {119case "filename":120outputEntry.AddAttribute("filename", valStr)121lbls = append(lbls, "filename")122// The `promtailreceiver` from the opentelemetry-collector-contrib123// repo adds these two labels based on these "semantic conventions124// for log media".125// https://opentelemetry.io/docs/reference/specification/logs/semantic_conventions/media/126// We're keeping them as well, but we're also adding the `filename`127// attribute so that it can be used from the128// `loki.attribute.labels` hint for when the opposite OTel -> Loki129// transformation happens.130outputEntry.AddAttribute("log.file.path", valStr)131outputEntry.AddAttribute("log.file.name", path.Base(valStr))132default:133lbls = append(lbls, keyStr)134outputEntry.AddAttribute(keyStr, valStr)135}136}137138if len(lbls) > 0 {139// This hint is defined in the pkg/translator/loki package and the140// opentelemetry-collector-contrib repo, but is not exported so we141// re-define it.142// It is used to detect which attributes should be promoted to labels143// when transforming back from OTel -> Loki.144outputEntry.AddAttribute(hintAttributes, strings.Join(lbls, ","))145}146return outputEntry147}148149150