Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/servicegraphprocessor/processor.go
4096 views
1
package servicegraphprocessor
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"time"
8
9
util "github.com/cortexproject/cortex/pkg/util/log"
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/pkg/traces/contextkeys"
13
"github.com/prometheus/client_golang/prometheus"
14
"go.opentelemetry.io/collector/component"
15
"go.opentelemetry.io/collector/consumer"
16
"go.opentelemetry.io/collector/pdata/ptrace"
17
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
18
"google.golang.org/grpc/codes"
19
)
20
21
type tooManySpansError struct {
22
droppedSpans int
23
}
24
25
func (t tooManySpansError) Error() string {
26
return fmt.Sprintf("dropped %d spans", t.droppedSpans)
27
}
28
29
// edge is an edge between two nodes in the graph
30
type edge struct {
31
key string
32
33
serverService, clientService string
34
serverLatency, clientLatency time.Duration
35
36
// If either the client or the server spans have status code error,
37
// the edge will be considered as failed.
38
failed bool
39
40
// expiration is the time at which the edge expires, expressed as Unix time
41
expiration int64
42
}
43
44
func newEdge(key string, ttl time.Duration) *edge {
45
return &edge{
46
key: key,
47
48
expiration: time.Now().Add(ttl).Unix(),
49
}
50
}
51
52
// isCompleted returns true if the corresponding client and server
53
// pair spans have been processed for the given edge
54
func (e *edge) isCompleted() bool {
55
return len(e.clientService) != 0 && len(e.serverService) != 0
56
}
57
58
func (e *edge) isExpired() bool {
59
return time.Now().Unix() >= e.expiration
60
}
61
62
var _ component.TracesProcessor = (*processor)(nil)
63
64
type processor struct {
65
nextConsumer consumer.Traces
66
reg prometheus.Registerer
67
68
store *store
69
70
wait time.Duration
71
maxItems int
72
73
// completed edges are pushed through this channel to be processed.
74
collectCh chan string
75
76
serviceGraphRequestTotal *prometheus.CounterVec
77
serviceGraphRequestFailedTotal *prometheus.CounterVec
78
serviceGraphRequestServerHistogram *prometheus.HistogramVec
79
serviceGraphRequestClientHistogram *prometheus.HistogramVec
80
serviceGraphUnpairedSpansTotal *prometheus.CounterVec
81
serviceGraphDroppedSpansTotal *prometheus.CounterVec
82
83
httpSuccessCodeMap map[int]struct{}
84
grpcSuccessCodeMap map[int]struct{}
85
86
logger log.Logger
87
closeCh chan struct{}
88
}
89
90
func newProcessor(nextConsumer consumer.Traces, cfg *Config) *processor {
91
logger := log.With(util.Logger, "component", "service graphs")
92
93
if cfg.Wait == 0 {
94
cfg.Wait = DefaultWait
95
}
96
if cfg.MaxItems == 0 {
97
cfg.MaxItems = DefaultMaxItems
98
}
99
if cfg.Workers == 0 {
100
cfg.Workers = DefaultWorkers
101
}
102
103
var (
104
httpSuccessCodeMap = make(map[int]struct{})
105
grpcSuccessCodeMap = make(map[int]struct{})
106
)
107
if cfg.SuccessCodes != nil {
108
for _, sc := range cfg.SuccessCodes.http {
109
httpSuccessCodeMap[int(sc)] = struct{}{}
110
}
111
for _, sc := range cfg.SuccessCodes.grpc {
112
grpcSuccessCodeMap[int(sc)] = struct{}{}
113
}
114
}
115
116
p := &processor{
117
nextConsumer: nextConsumer,
118
logger: logger,
119
120
wait: cfg.Wait,
121
maxItems: cfg.MaxItems,
122
httpSuccessCodeMap: httpSuccessCodeMap,
123
grpcSuccessCodeMap: grpcSuccessCodeMap,
124
125
collectCh: make(chan string, cfg.Workers),
126
127
closeCh: make(chan struct{}, 1),
128
}
129
130
for i := 0; i < cfg.Workers; i++ {
131
go func() {
132
for {
133
select {
134
case k := <-p.collectCh:
135
p.store.evictEdgeWithLock(k)
136
137
case <-p.closeCh:
138
return
139
}
140
}
141
}()
142
}
143
144
return p
145
}
146
147
func (p *processor) Start(ctx context.Context, _ component.Host) error {
148
// initialize store
149
p.store = newStore(p.wait, p.maxItems, p.collectEdge)
150
151
reg, ok := ctx.Value(contextkeys.PrometheusRegisterer).(prometheus.Registerer)
152
if !ok || reg == nil {
153
return fmt.Errorf("key does not contain a prometheus registerer")
154
}
155
p.reg = reg
156
return p.registerMetrics()
157
}
158
159
func (p *processor) registerMetrics() error {
160
p.serviceGraphRequestTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
161
Namespace: "traces",
162
Name: "service_graph_request_total",
163
Help: "Total count of requests between two nodes",
164
}, []string{"client", "server"})
165
p.serviceGraphRequestFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
166
Namespace: "traces",
167
Name: "service_graph_request_failed_total",
168
Help: "Total count of failed requests between two nodes",
169
}, []string{"client", "server"})
170
p.serviceGraphRequestServerHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
171
Namespace: "traces",
172
Name: "service_graph_request_server_seconds",
173
Help: "Time for a request between two nodes as seen from the server",
174
Buckets: prometheus.ExponentialBuckets(0.01, 2, 12),
175
}, []string{"client", "server"})
176
p.serviceGraphRequestClientHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
177
Namespace: "traces",
178
Name: "service_graph_request_client_seconds",
179
Help: "Time for a request between two nodes as seen from the client",
180
Buckets: prometheus.ExponentialBuckets(0.01, 2, 12),
181
}, []string{"client", "server"})
182
p.serviceGraphUnpairedSpansTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
183
Namespace: "traces",
184
Name: "service_graph_unpaired_spans_total",
185
Help: "Total count of unpaired spans",
186
}, []string{"client", "server"})
187
p.serviceGraphDroppedSpansTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
188
Namespace: "traces",
189
Name: "service_graph_dropped_spans_total",
190
Help: "Total count of dropped spans",
191
}, []string{"client", "server"})
192
193
cs := []prometheus.Collector{
194
p.serviceGraphRequestTotal,
195
p.serviceGraphRequestFailedTotal,
196
p.serviceGraphRequestServerHistogram,
197
p.serviceGraphRequestClientHistogram,
198
p.serviceGraphUnpairedSpansTotal,
199
p.serviceGraphDroppedSpansTotal,
200
}
201
202
for _, c := range cs {
203
if err := p.reg.Register(c); err != nil {
204
return err
205
}
206
}
207
208
return nil
209
}
210
211
func (p *processor) Shutdown(context.Context) error {
212
close(p.closeCh)
213
p.unregisterMetrics()
214
return nil
215
}
216
217
func (p *processor) unregisterMetrics() {
218
cs := []prometheus.Collector{
219
p.serviceGraphRequestTotal,
220
p.serviceGraphRequestFailedTotal,
221
p.serviceGraphRequestServerHistogram,
222
p.serviceGraphRequestClientHistogram,
223
p.serviceGraphUnpairedSpansTotal,
224
}
225
226
for _, c := range cs {
227
p.reg.Unregister(c)
228
}
229
}
230
231
func (p *processor) Capabilities() consumer.Capabilities {
232
return consumer.Capabilities{}
233
}
234
235
func (p *processor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
236
// Evict expired edges
237
p.store.expire()
238
239
if err := p.consume(td); err != nil {
240
if errors.As(err, &tooManySpansError{}) {
241
level.Warn(p.logger).Log("msg", "skipped processing of spans", "maxItems", p.maxItems, "err", err)
242
} else {
243
level.Error(p.logger).Log("msg", "failed consuming traces", "err", err)
244
}
245
return nil
246
}
247
248
return p.nextConsumer.ConsumeTraces(ctx, td)
249
}
250
251
// collectEdge records the metrics for the given edge.
252
// Returns true if the edge is completed or expired and should be deleted.
253
func (p *processor) collectEdge(e *edge) {
254
if e.isCompleted() {
255
p.serviceGraphRequestTotal.WithLabelValues(e.clientService, e.serverService).Inc()
256
if e.failed {
257
p.serviceGraphRequestFailedTotal.WithLabelValues(e.clientService, e.serverService).Inc()
258
}
259
p.serviceGraphRequestServerHistogram.WithLabelValues(e.clientService, e.serverService).Observe(e.serverLatency.Seconds())
260
p.serviceGraphRequestClientHistogram.WithLabelValues(e.clientService, e.serverService).Observe(e.clientLatency.Seconds())
261
} else if e.isExpired() {
262
p.serviceGraphUnpairedSpansTotal.WithLabelValues(e.clientService, e.serverService).Inc()
263
}
264
}
265
266
func (p *processor) consume(trace ptrace.Traces) error {
267
var totalDroppedSpans int
268
rSpansSlice := trace.ResourceSpans()
269
for i := 0; i < rSpansSlice.Len(); i++ {
270
rSpan := rSpansSlice.At(i)
271
272
svc, ok := rSpan.Resource().Attributes().Get(semconv.AttributeServiceName)
273
if !ok || svc.Str() == "" {
274
continue
275
}
276
277
ssSlice := rSpan.ScopeSpans()
278
for j := 0; j < ssSlice.Len(); j++ {
279
ils := ssSlice.At(j)
280
281
for k := 0; k < ils.Spans().Len(); k++ {
282
span := ils.Spans().At(k)
283
284
switch span.Kind() {
285
case ptrace.SpanKindClient:
286
k := key(span.TraceID().HexString(), span.SpanID().HexString())
287
288
edge, err := p.store.upsertEdge(k, func(e *edge) {
289
e.clientService = svc.Str()
290
e.clientLatency = spanDuration(span)
291
e.failed = e.failed || p.spanFailed(span) // keep request as failed if any span is failed
292
})
293
294
if errors.Is(err, errTooManyItems) {
295
totalDroppedSpans++
296
p.serviceGraphDroppedSpansTotal.WithLabelValues(svc.Str(), "").Inc()
297
continue
298
}
299
// upsertEdge will only return this errTooManyItems
300
if err != nil {
301
return err
302
}
303
304
if edge.isCompleted() {
305
p.collectCh <- k
306
}
307
308
case ptrace.SpanKindServer:
309
k := key(span.TraceID().HexString(), span.ParentSpanID().HexString())
310
311
edge, err := p.store.upsertEdge(k, func(e *edge) {
312
e.serverService = svc.Str()
313
e.serverLatency = spanDuration(span)
314
e.failed = e.failed || p.spanFailed(span) // keep request as failed if any span is failed
315
})
316
317
if errors.Is(err, errTooManyItems) {
318
totalDroppedSpans++
319
p.serviceGraphDroppedSpansTotal.WithLabelValues("", svc.Str()).Inc()
320
continue
321
}
322
// upsertEdge will only return this errTooManyItems
323
if err != nil {
324
return err
325
}
326
327
if edge.isCompleted() {
328
p.collectCh <- k
329
}
330
331
default:
332
}
333
}
334
}
335
}
336
337
if totalDroppedSpans > 0 {
338
return &tooManySpansError{
339
droppedSpans: totalDroppedSpans,
340
}
341
}
342
return nil
343
}
344
345
func (p *processor) spanFailed(span ptrace.Span) bool {
346
// Request considered failed if status is not 2XX or added as a successful status code
347
if statusCode, ok := span.Attributes().Get(semconv.AttributeHTTPStatusCode); ok {
348
sc := int(statusCode.Int())
349
if _, ok := p.httpSuccessCodeMap[sc]; !ok && sc/100 != 2 {
350
return true
351
}
352
}
353
354
// Request considered failed if status is not OK or added as a successful status code
355
if statusCode, ok := span.Attributes().Get(semconv.AttributeRPCGRPCStatusCode); ok {
356
sc := int(statusCode.Int())
357
if _, ok := p.grpcSuccessCodeMap[sc]; !ok && sc != int(codes.OK) {
358
return true
359
}
360
}
361
362
return span.Status().Code() == ptrace.StatusCodeError
363
}
364
365
func spanDuration(span ptrace.Span) time.Duration {
366
return span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime())
367
}
368
369
func key(k1, k2 string) string {
370
return fmt.Sprintf("%s-%s", k1, k2)
371
}
372
373