Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/relabel/relabel.go
4096 views
1
package relabel
2
3
import (
4
"context"
5
"reflect"
6
"sync"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/loki"
11
flow_relabel "github.com/grafana/agent/component/common/relabel"
12
"github.com/grafana/agent/pkg/river"
13
lru "github.com/hashicorp/golang-lru"
14
"github.com/prometheus/common/model"
15
"github.com/prometheus/prometheus/model/labels"
16
"github.com/prometheus/prometheus/model/relabel"
17
)
18
19
func init() {
20
component.Register(component.Registration{
21
Name: "loki.relabel",
22
Args: Arguments{},
23
Exports: Exports{},
24
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
25
return New(opts, args.(Arguments))
26
},
27
})
28
}
29
30
// Arguments holds values which are used to configure the loki.relabel
31
// component.
32
type Arguments struct {
33
// Where the relabeled metrics should be forwarded to.
34
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
35
36
// The relabelling rules to apply to each log entry before it's forwarded.
37
RelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"`
38
39
// The maximum number of items to hold in the component's LRU cache.
40
MaxCacheSize int `river:"max_cache_size,attr,optional"`
41
}
42
43
// DefaultArguments provides the default arguments for the loki.relabel
44
// component.
45
var DefaultArguments = Arguments{
46
MaxCacheSize: 10_000,
47
}
48
49
var _ river.Unmarshaler = (*Arguments)(nil)
50
51
// UnmarshalRiver implements river.Unmarshaler.
52
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
53
*a = DefaultArguments
54
55
type arguments Arguments
56
return f((*arguments)(a))
57
}
58
59
// Exports holds values which are exported by the loki.relabel component.
60
type Exports struct {
61
Receiver loki.LogsReceiver `river:"receiver,attr"`
62
Rules flow_relabel.Rules `river:"rules,attr"`
63
}
64
65
// Component implements the loki.relabel component.
66
type Component struct {
67
opts component.Options
68
metrics *metrics
69
70
mut sync.RWMutex
71
rcs []*relabel.Config
72
receiver loki.LogsReceiver
73
fanout []loki.LogsReceiver
74
75
cache *lru.Cache
76
maxCacheSize int
77
}
78
79
var (
80
_ component.Component = (*Component)(nil)
81
)
82
83
// New creates a new loki.relabel component.
84
func New(o component.Options, args Arguments) (*Component, error) {
85
cache, err := lru.New(args.MaxCacheSize)
86
if err != nil {
87
return nil, err
88
}
89
90
c := &Component{
91
opts: o,
92
metrics: newMetrics(o.Registerer),
93
cache: cache,
94
maxCacheSize: args.MaxCacheSize,
95
}
96
97
// Create and immediately export the receiver which remains the same for
98
// the component's lifetime.
99
c.receiver = make(loki.LogsReceiver)
100
o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.RelabelConfigs})
101
102
// Call to Update() to set the relabelling rules once at the start.
103
if err := c.Update(args); err != nil {
104
return nil, err
105
}
106
107
return c, nil
108
}
109
110
// Run implements component.Component.
111
func (c *Component) Run(ctx context.Context) error {
112
for {
113
select {
114
case <-ctx.Done():
115
return nil
116
case entry := <-c.receiver:
117
c.metrics.entriesProcessed.Inc()
118
lbls := c.relabel(entry)
119
if len(lbls) == 0 {
120
level.Debug(c.opts.Logger).Log("msg", "dropping entry after relabeling", "labels", entry.Labels.String())
121
continue
122
}
123
124
c.metrics.entriesOutgoing.Inc()
125
entry.Labels = lbls
126
for _, f := range c.fanout {
127
select {
128
case <-ctx.Done():
129
return nil
130
case f <- entry:
131
}
132
}
133
}
134
}
135
}
136
137
// Update implements component.Component.
138
func (c *Component) Update(args component.Arguments) error {
139
c.mut.Lock()
140
defer c.mut.Unlock()
141
142
newArgs := args.(Arguments)
143
newRCS := flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs)
144
if relabelingChanged(c.rcs, newRCS) {
145
level.Debug(c.opts.Logger).Log("msg", "received new relabel configs, purging cache")
146
c.cache.Purge()
147
c.metrics.cacheSize.Set(0)
148
}
149
if newArgs.MaxCacheSize != c.maxCacheSize {
150
evicted := c.cache.Resize(newArgs.MaxCacheSize)
151
if evicted > 0 {
152
level.Debug(c.opts.Logger).Log("msg", "resizing the cache lead to evicting of items", "len_items_evicted", evicted)
153
}
154
}
155
c.rcs = newRCS
156
c.fanout = newArgs.ForwardTo
157
158
c.opts.OnStateChange(Exports{Receiver: c.receiver, Rules: newArgs.RelabelConfigs})
159
160
return nil
161
}
162
163
func relabelingChanged(prev, next []*relabel.Config) bool {
164
if len(prev) != len(next) {
165
return true
166
}
167
for i := range prev {
168
if !reflect.DeepEqual(prev[i], next[i]) {
169
return true
170
}
171
}
172
return false
173
}
174
175
type cacheItem struct {
176
original model.LabelSet
177
relabeled model.LabelSet
178
}
179
180
// TODO(@tpaschalis) It's unfortunate how we have to cast back and forth
181
// between model.LabelSet (map) and labels.Labels (slice). Promtail does
182
// not have this issue as relabel config rules are only applied to targets.
183
// Do we want to use labels.Labels in loki.Entry instead?
184
func (c *Component) relabel(e loki.Entry) model.LabelSet {
185
hash := e.Labels.Fingerprint()
186
187
// Let's look in the cache for the hash of the entry's labels.
188
val, found := c.cache.Get(hash)
189
190
// We've seen this hash before; let's see if we've already relabeled this
191
// specific entry before and can return early, or if it's a collision.
192
if found {
193
for _, ci := range val.([]cacheItem) {
194
if e.Labels.Equal(ci.original) {
195
c.metrics.cacheHits.Inc()
196
return ci.relabeled
197
}
198
}
199
}
200
201
// Seems like it's either a new entry or a hash collision.
202
c.metrics.cacheMisses.Inc()
203
relabeled := c.process(e)
204
205
// In case it's a new hash, initialize it as a new cacheItem.
206
// If it was a collision, append the result to the cached slice.
207
if !found {
208
val = []cacheItem{{e.Labels, relabeled}}
209
} else {
210
val = append(val.([]cacheItem), cacheItem{e.Labels, relabeled})
211
}
212
213
c.cache.Add(hash, val)
214
c.metrics.cacheSize.Set(float64(c.cache.Len()))
215
216
return relabeled
217
}
218
219
func (c *Component) process(e loki.Entry) model.LabelSet {
220
var lbls labels.Labels
221
for k, v := range e.Labels {
222
lbls = append(lbls, labels.Label{
223
Name: string(k),
224
Value: string(v),
225
})
226
}
227
lbls, _ = relabel.Process(lbls, c.rcs...)
228
229
relabeled := make(model.LabelSet, len(lbls))
230
for i := range lbls {
231
relabeled[model.LabelName(lbls[i].Name)] = model.LabelValue(lbls[i].Value)
232
}
233
return relabeled
234
}
235
236