Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/loki/loki.go
4096 views
1
// Package loki provides an otelcol.receiver.loki component.
2
package loki
3
4
import (
5
"context"
6
"path"
7
"strings"
8
"sync"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/agent/component/otelcol"
15
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
16
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
17
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
18
"go.opentelemetry.io/collector/consumer"
19
)
20
21
func init() {
22
component.Register(component.Registration{
23
Name: "otelcol.receiver.loki",
24
Args: Arguments{},
25
Exports: Exports{},
26
27
Build: func(o component.Options, a component.Arguments) (component.Component, error) {
28
return NewComponent(o, a.(Arguments))
29
},
30
})
31
}
32
33
var hintAttributes = "loki.attribute.labels"
34
35
// Arguments configures the otelcol.receiver.loki component.
36
type Arguments struct {
37
// Output configures where to send received data. Required.
38
Output *otelcol.ConsumerArguments `river:"output,block"`
39
}
40
41
// Exports holds the receiver that is used to send log entries to the
42
// loki.write component.
43
type Exports struct {
44
Receiver loki.LogsReceiver `river:"receiver,attr"`
45
}
46
47
// Component is the otelcol.receiver.loki component.
48
type Component struct {
49
log log.Logger
50
opts component.Options
51
52
mut sync.RWMutex
53
receiver loki.LogsReceiver
54
logsSink consumer.Logs
55
}
56
57
var _ component.Component = (*Component)(nil)
58
59
// NewComponent creates a new otelcol.receiver.loki component.
60
func NewComponent(o component.Options, c Arguments) (*Component, error) {
61
// TODO(@tpaschalis) Create a metrics struct to count
62
// total/successful/errored log entries?
63
res := &Component{
64
log: o.Logger,
65
opts: o,
66
}
67
68
// Create and immediately export the receiver which remains the same for
69
// the component's lifetime.
70
res.receiver = make(loki.LogsReceiver)
71
o.OnStateChange(Exports{Receiver: res.receiver})
72
73
if err := res.Update(c); err != nil {
74
return nil, err
75
}
76
return res, nil
77
}
78
79
// Run implements Component.
80
func (c *Component) Run(ctx context.Context) error {
81
for {
82
select {
83
case <-ctx.Done():
84
return nil
85
case entry := <-c.receiver:
86
stanzaEntry := parsePromtailEntry(entry)
87
plogEntry := adapter.Convert(stanzaEntry)
88
89
// TODO(@tpaschalis) Is there any more handling to be done here?
90
err := c.logsSink.ConsumeLogs(ctx, plogEntry)
91
if err != nil {
92
level.Error(c.opts.Logger).Log("msg", "failed to consume log entries", "err", err)
93
}
94
}
95
}
96
}
97
98
// Update implements Component.
99
func (c *Component) Update(newConfig component.Arguments) error {
100
c.mut.Lock()
101
defer c.mut.Unlock()
102
103
cfg := newConfig.(Arguments)
104
c.logsSink = fanoutconsumer.Logs(cfg.Output.Logs)
105
106
return nil
107
}
108
109
// parsePromtailEntry creates new stanza.Entry from promtail entry
110
func parsePromtailEntry(inputEntry loki.Entry) *entry.Entry {
111
outputEntry := entry.New()
112
outputEntry.Body = inputEntry.Entry.Line
113
outputEntry.Timestamp = inputEntry.Entry.Timestamp
114
115
var lbls []string
116
for key, val := range inputEntry.Labels {
117
valStr := string(val)
118
keyStr := string(key)
119
switch key {
120
case "filename":
121
outputEntry.AddAttribute("filename", valStr)
122
lbls = append(lbls, "filename")
123
// The `promtailreceiver` from the opentelemetry-collector-contrib
124
// repo adds these two labels based on these "semantic conventions
125
// for log media".
126
// https://opentelemetry.io/docs/reference/specification/logs/semantic_conventions/media/
127
// We're keeping them as well, but we're also adding the `filename`
128
// attribute so that it can be used from the
129
// `loki.attribute.labels` hint for when the opposite OTel -> Loki
130
// transformation happens.
131
outputEntry.AddAttribute("log.file.path", valStr)
132
outputEntry.AddAttribute("log.file.name", path.Base(valStr))
133
default:
134
lbls = append(lbls, keyStr)
135
outputEntry.AddAttribute(keyStr, valStr)
136
}
137
}
138
139
if len(lbls) > 0 {
140
// This hint is defined in the pkg/translator/loki package and the
141
// opentelemetry-collector-contrib repo, but is not exported so we
142
// re-define it.
143
// It is used to detect which attributes should be promoted to labels
144
// when transforming back from OTel -> Loki.
145
outputEntry.AddAttribute(hintAttributes, strings.Join(lbls, ","))
146
}
147
return outputEntry
148
}
149
150