Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/automaticloggingprocessor/automaticloggingprocessor.go
4096 views
1
package automaticloggingprocessor
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"strconv"
8
"time"
9
10
util "github.com/cortexproject/cortex/pkg/util/log"
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/go-logfmt/logfmt"
14
"github.com/grafana/agent/pkg/logs"
15
"github.com/grafana/agent/pkg/operator/config"
16
"github.com/grafana/agent/pkg/traces/contextkeys"
17
"github.com/grafana/loki/clients/pkg/promtail/api"
18
"github.com/grafana/loki/pkg/logproto"
19
"github.com/prometheus/common/model"
20
"go.opentelemetry.io/collector/component"
21
"go.opentelemetry.io/collector/consumer"
22
"go.opentelemetry.io/collector/pdata/pcommon"
23
"go.opentelemetry.io/collector/pdata/ptrace"
24
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
25
"go.uber.org/atomic"
26
)
27
28
const (
29
defaultLogsTag = "traces"
30
defaultServiceKey = "svc"
31
defaultSpanNameKey = "span"
32
defaultStatusKey = "status"
33
defaultDurationKey = "dur"
34
defaultTraceIDKey = "tid"
35
36
defaultTimeout = time.Millisecond
37
38
typeSpan = "span"
39
typeRoot = "root"
40
typeProcess = "process"
41
)
42
43
type automaticLoggingProcessor struct {
44
nextConsumer consumer.Traces
45
46
cfg *AutomaticLoggingConfig
47
logToStdout bool
48
logsInstance *logs.Instance
49
done atomic.Bool
50
51
labels map[string]struct{}
52
53
logger log.Logger
54
}
55
56
func newTraceProcessor(nextConsumer consumer.Traces, cfg *AutomaticLoggingConfig) (component.TracesProcessor, error) {
57
logger := log.With(util.Logger, "component", "traces automatic logging")
58
59
if nextConsumer == nil {
60
return nil, component.ErrNilNextConsumer
61
}
62
63
if !cfg.Roots && !cfg.Processes && !cfg.Spans {
64
return nil, errors.New("automaticLoggingProcessor requires one of roots, processes, or spans to be enabled")
65
}
66
67
if cfg.Timeout == 0 {
68
cfg.Timeout = defaultTimeout
69
}
70
71
if cfg.Backend == "" {
72
cfg.Backend = BackendStdout
73
}
74
75
if cfg.Backend != BackendLogs && cfg.Backend != BackendStdout {
76
return nil, fmt.Errorf("automaticLoggingProcessor requires a backend of type '%s' or '%s'", BackendLogs, BackendStdout)
77
}
78
79
logToStdout := false
80
if cfg.Backend == BackendStdout {
81
logToStdout = true
82
}
83
84
cfg.Overrides.LogsTag = override(cfg.Overrides.LogsTag, defaultLogsTag)
85
cfg.Overrides.ServiceKey = override(cfg.Overrides.ServiceKey, defaultServiceKey)
86
cfg.Overrides.SpanNameKey = override(cfg.Overrides.SpanNameKey, defaultSpanNameKey)
87
cfg.Overrides.StatusKey = override(cfg.Overrides.StatusKey, defaultStatusKey)
88
cfg.Overrides.DurationKey = override(cfg.Overrides.DurationKey, defaultDurationKey)
89
cfg.Overrides.TraceIDKey = override(cfg.Overrides.TraceIDKey, defaultTraceIDKey)
90
91
labels := make(map[string]struct{}, len(cfg.Labels))
92
for _, l := range cfg.Labels {
93
labels[l] = struct{}{}
94
}
95
96
return &automaticLoggingProcessor{
97
nextConsumer: nextConsumer,
98
cfg: cfg,
99
logToStdout: logToStdout,
100
logger: logger,
101
done: atomic.Bool{},
102
labels: labels,
103
}, nil
104
}
105
106
func (p *automaticLoggingProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
107
rsLen := td.ResourceSpans().Len()
108
for i := 0; i < rsLen; i++ {
109
rs := td.ResourceSpans().At(i)
110
ssLen := rs.ScopeSpans().Len()
111
112
var svc string
113
svcAtt, ok := rs.Resource().Attributes().Get(semconv.AttributeServiceName)
114
if ok {
115
svc = svcAtt.Str()
116
}
117
118
for j := 0; j < ssLen; j++ {
119
ss := rs.ScopeSpans().At(j)
120
spanLen := ss.Spans().Len()
121
122
lastTraceID := ""
123
for k := 0; k < spanLen; k++ {
124
span := ss.Spans().At(k)
125
traceID := span.TraceID().HexString()
126
127
if p.cfg.Spans {
128
keyValues := append(p.spanKeyVals(span), p.processKeyVals(rs.Resource(), svc)...)
129
p.exportToLogsInstance(typeSpan, traceID, p.spanLabels(keyValues), keyValues...)
130
}
131
132
if p.cfg.Roots && span.ParentSpanID().IsEmpty() {
133
keyValues := append(p.spanKeyVals(span), p.processKeyVals(rs.Resource(), svc)...)
134
p.exportToLogsInstance(typeRoot, traceID, p.spanLabels(keyValues), keyValues...)
135
}
136
137
if p.cfg.Processes && lastTraceID != traceID {
138
lastTraceID = traceID
139
keyValues := p.processKeyVals(rs.Resource(), svc)
140
p.exportToLogsInstance(typeProcess, traceID, p.spanLabels(keyValues), keyValues...)
141
}
142
}
143
}
144
}
145
146
return p.nextConsumer.ConsumeTraces(ctx, td)
147
}
148
149
func (p *automaticLoggingProcessor) spanLabels(keyValues []interface{}) model.LabelSet {
150
if len(keyValues) == 0 {
151
return model.LabelSet{}
152
}
153
ls := make(map[model.LabelName]model.LabelValue, len(keyValues)/2)
154
var (
155
k, v string
156
ok bool
157
)
158
for i := 0; i < len(keyValues); i += 2 {
159
if k, ok = keyValues[i].(string); !ok {
160
// Should never happen, all keys are strings
161
level.Error(p.logger).Log("msg", "error casting label key to string", "key", keyValues[i])
162
continue
163
}
164
// Try to cast value to string
165
if v, ok = keyValues[i+1].(string); !ok {
166
// If it's not a string, format it to its string representation
167
v = fmt.Sprintf("%v", keyValues[i+1])
168
}
169
if _, ok := p.labels[k]; ok {
170
// Loki does not accept "." as a valid character for labels
171
// Dots . are replaced by underscores _
172
k = config.SanitizeLabelName(k)
173
174
ls[model.LabelName(k)] = model.LabelValue(v)
175
}
176
}
177
return ls
178
}
179
180
func (p *automaticLoggingProcessor) Capabilities() consumer.Capabilities {
181
return consumer.Capabilities{}
182
}
183
184
// Start is invoked during service startup.
185
func (p *automaticLoggingProcessor) Start(ctx context.Context, _ component.Host) error {
186
if !p.logToStdout {
187
logs, ok := ctx.Value(contextkeys.Logs).(*logs.Logs)
188
if !ok {
189
return fmt.Errorf("key does not contain a logs instance")
190
}
191
p.logsInstance = logs.Instance(p.cfg.LogsName)
192
if p.logsInstance == nil {
193
return fmt.Errorf("logs instance %s not found", p.cfg.LogsName)
194
}
195
}
196
return nil
197
}
198
199
// Shutdown is invoked during service shutdown.
200
func (p *automaticLoggingProcessor) Shutdown(context.Context) error {
201
p.done.Store(true)
202
203
return nil
204
}
205
206
func (p *automaticLoggingProcessor) processKeyVals(resource pcommon.Resource, svc string) []interface{} {
207
atts := make([]interface{}, 0, 2) // 2 for service name
208
rsAtts := resource.Attributes()
209
210
// name
211
atts = append(atts, p.cfg.Overrides.ServiceKey)
212
atts = append(atts, svc)
213
214
for _, name := range p.cfg.ProcessAttributes {
215
att, ok := rsAtts.Get(name)
216
if ok {
217
// name/key val pairs
218
atts = append(atts, name)
219
atts = append(atts, attributeValue(att))
220
}
221
}
222
223
return atts
224
}
225
226
func (p *automaticLoggingProcessor) spanKeyVals(span ptrace.Span) []interface{} {
227
atts := make([]interface{}, 0, 8) // 8 for name, duration, service name and status
228
229
atts = append(atts, p.cfg.Overrides.SpanNameKey)
230
atts = append(atts, span.Name())
231
232
atts = append(atts, p.cfg.Overrides.DurationKey)
233
atts = append(atts, spanDuration(span))
234
235
// Skip STATUS_CODE_UNSET to be less spammy
236
if span.Status().Code() != ptrace.StatusCodeUnset {
237
atts = append(atts, p.cfg.Overrides.StatusKey)
238
atts = append(atts, span.Status().Code())
239
}
240
241
for _, name := range p.cfg.SpanAttributes {
242
att, ok := span.Attributes().Get(name)
243
if ok {
244
atts = append(atts, name)
245
atts = append(atts, attributeValue(att))
246
}
247
}
248
249
return atts
250
}
251
252
func (p *automaticLoggingProcessor) exportToLogsInstance(kind string, traceID string, labels model.LabelSet, keyvals ...interface{}) {
253
if p.done.Load() {
254
return
255
}
256
257
keyvals = append(keyvals, []interface{}{p.cfg.Overrides.TraceIDKey, traceID}...)
258
line, err := logfmt.MarshalKeyvals(keyvals...)
259
if err != nil {
260
level.Warn(p.logger).Log("msg", "unable to marshal keyvals", "err", err)
261
return
262
}
263
264
// if we're logging to stdout, log and bail
265
if p.logToStdout {
266
level.Info(p.logger).Log(keyvals...)
267
return
268
}
269
270
// Add logs instance label
271
labels[model.LabelName(p.cfg.Overrides.LogsTag)] = model.LabelValue(kind)
272
273
sent := p.logsInstance.SendEntry(api.Entry{
274
Labels: labels,
275
Entry: logproto.Entry{
276
Timestamp: time.Now(),
277
Line: string(line),
278
},
279
}, p.cfg.Timeout)
280
281
if !sent {
282
level.Warn(p.logger).Log("msg", "failed to autolog to logs pipeline", "kind", kind, "traceid", traceID)
283
}
284
}
285
286
func spanDuration(span ptrace.Span) string {
287
dur := int64(span.EndTimestamp() - span.StartTimestamp())
288
return strconv.FormatInt(dur, 10) + "ns"
289
}
290
291
func attributeValue(att pcommon.Value) interface{} {
292
switch att.Type() {
293
case pcommon.ValueTypeStr:
294
return att.Str()
295
case pcommon.ValueTypeInt:
296
return att.Int()
297
case pcommon.ValueTypeDouble:
298
return att.Double()
299
case pcommon.ValueTypeBool:
300
return att.Bool()
301
case pcommon.ValueTypeMap:
302
return att.Map()
303
case pcommon.ValueTypeSlice:
304
return att.Slice()
305
}
306
return nil
307
}
308
309
func override(cfgValue string, defaultValue string) string {
310
if cfgValue == "" {
311
return defaultValue
312
}
313
return cfgValue
314
}
315
316