Path: blob/main/pkg/traces/servicegraphprocessor/processor.go
4096 views
package servicegraphprocessor12import (3"context"4"errors"5"fmt"6"time"78util "github.com/cortexproject/cortex/pkg/util/log"9"github.com/go-kit/log"10"github.com/go-kit/log/level"11"github.com/grafana/agent/pkg/traces/contextkeys"12"github.com/prometheus/client_golang/prometheus"13"go.opentelemetry.io/collector/component"14"go.opentelemetry.io/collector/consumer"15"go.opentelemetry.io/collector/pdata/ptrace"16semconv "go.opentelemetry.io/collector/semconv/v1.6.1"17"google.golang.org/grpc/codes"18)1920type tooManySpansError struct {21droppedSpans int22}2324func (t tooManySpansError) Error() string {25return fmt.Sprintf("dropped %d spans", t.droppedSpans)26}2728// edge is an edge between two nodes in the graph29type edge struct {30key string3132serverService, clientService string33serverLatency, clientLatency time.Duration3435// If either the client or the server spans have status code error,36// the edge will be considered as failed.37failed bool3839// expiration is the time at which the edge expires, expressed as Unix time40expiration int6441}4243func newEdge(key string, ttl time.Duration) *edge {44return &edge{45key: key,4647expiration: time.Now().Add(ttl).Unix(),48}49}5051// isCompleted returns true if the corresponding client and server52// pair spans have been processed for the given edge53func (e *edge) isCompleted() bool {54return len(e.clientService) != 0 && len(e.serverService) != 055}5657func (e *edge) isExpired() bool {58return time.Now().Unix() >= e.expiration59}6061var _ component.TracesProcessor = (*processor)(nil)6263type processor struct {64nextConsumer consumer.Traces65reg prometheus.Registerer6667store *store6869wait time.Duration70maxItems int7172// completed edges are pushed through this channel to be processed.73collectCh chan string7475serviceGraphRequestTotal *prometheus.CounterVec76serviceGraphRequestFailedTotal *prometheus.CounterVec77serviceGraphRequestServerHistogram *prometheus.HistogramVec78serviceGraphRequestClientHistogram *prometheus.HistogramVec79serviceGraphUnpairedSpansTotal *prometheus.CounterVec80serviceGraphDroppedSpansTotal *prometheus.CounterVec8182httpSuccessCodeMap map[int]struct{}83grpcSuccessCodeMap map[int]struct{}8485logger log.Logger86closeCh chan struct{}87}8889func newProcessor(nextConsumer consumer.Traces, cfg *Config) *processor {90logger := log.With(util.Logger, "component", "service graphs")9192if cfg.Wait == 0 {93cfg.Wait = DefaultWait94}95if cfg.MaxItems == 0 {96cfg.MaxItems = DefaultMaxItems97}98if cfg.Workers == 0 {99cfg.Workers = DefaultWorkers100}101102var (103httpSuccessCodeMap = make(map[int]struct{})104grpcSuccessCodeMap = make(map[int]struct{})105)106if cfg.SuccessCodes != nil {107for _, sc := range cfg.SuccessCodes.http {108httpSuccessCodeMap[int(sc)] = struct{}{}109}110for _, sc := range cfg.SuccessCodes.grpc {111grpcSuccessCodeMap[int(sc)] = struct{}{}112}113}114115p := &processor{116nextConsumer: nextConsumer,117logger: logger,118119wait: cfg.Wait,120maxItems: cfg.MaxItems,121httpSuccessCodeMap: httpSuccessCodeMap,122grpcSuccessCodeMap: grpcSuccessCodeMap,123124collectCh: make(chan string, cfg.Workers),125126closeCh: make(chan struct{}, 1),127}128129for i := 0; i < cfg.Workers; i++ {130go func() {131for {132select {133case k := <-p.collectCh:134p.store.evictEdgeWithLock(k)135136case <-p.closeCh:137return138}139}140}()141}142143return p144}145146func (p *processor) Start(ctx context.Context, _ component.Host) error {147// initialize store148p.store = newStore(p.wait, p.maxItems, p.collectEdge)149150reg, ok := ctx.Value(contextkeys.PrometheusRegisterer).(prometheus.Registerer)151if !ok || reg == nil {152return fmt.Errorf("key does not contain a prometheus registerer")153}154p.reg = reg155return p.registerMetrics()156}157158func (p *processor) registerMetrics() error {159p.serviceGraphRequestTotal = prometheus.NewCounterVec(prometheus.CounterOpts{160Namespace: "traces",161Name: "service_graph_request_total",162Help: "Total count of requests between two nodes",163}, []string{"client", "server"})164p.serviceGraphRequestFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{165Namespace: "traces",166Name: "service_graph_request_failed_total",167Help: "Total count of failed requests between two nodes",168}, []string{"client", "server"})169p.serviceGraphRequestServerHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{170Namespace: "traces",171Name: "service_graph_request_server_seconds",172Help: "Time for a request between two nodes as seen from the server",173Buckets: prometheus.ExponentialBuckets(0.01, 2, 12),174}, []string{"client", "server"})175p.serviceGraphRequestClientHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{176Namespace: "traces",177Name: "service_graph_request_client_seconds",178Help: "Time for a request between two nodes as seen from the client",179Buckets: prometheus.ExponentialBuckets(0.01, 2, 12),180}, []string{"client", "server"})181p.serviceGraphUnpairedSpansTotal = prometheus.NewCounterVec(prometheus.CounterOpts{182Namespace: "traces",183Name: "service_graph_unpaired_spans_total",184Help: "Total count of unpaired spans",185}, []string{"client", "server"})186p.serviceGraphDroppedSpansTotal = prometheus.NewCounterVec(prometheus.CounterOpts{187Namespace: "traces",188Name: "service_graph_dropped_spans_total",189Help: "Total count of dropped spans",190}, []string{"client", "server"})191192cs := []prometheus.Collector{193p.serviceGraphRequestTotal,194p.serviceGraphRequestFailedTotal,195p.serviceGraphRequestServerHistogram,196p.serviceGraphRequestClientHistogram,197p.serviceGraphUnpairedSpansTotal,198p.serviceGraphDroppedSpansTotal,199}200201for _, c := range cs {202if err := p.reg.Register(c); err != nil {203return err204}205}206207return nil208}209210func (p *processor) Shutdown(context.Context) error {211close(p.closeCh)212p.unregisterMetrics()213return nil214}215216func (p *processor) unregisterMetrics() {217cs := []prometheus.Collector{218p.serviceGraphRequestTotal,219p.serviceGraphRequestFailedTotal,220p.serviceGraphRequestServerHistogram,221p.serviceGraphRequestClientHistogram,222p.serviceGraphUnpairedSpansTotal,223}224225for _, c := range cs {226p.reg.Unregister(c)227}228}229230func (p *processor) Capabilities() consumer.Capabilities {231return consumer.Capabilities{}232}233234func (p *processor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {235// Evict expired edges236p.store.expire()237238if err := p.consume(td); err != nil {239if errors.As(err, &tooManySpansError{}) {240level.Warn(p.logger).Log("msg", "skipped processing of spans", "maxItems", p.maxItems, "err", err)241} else {242level.Error(p.logger).Log("msg", "failed consuming traces", "err", err)243}244return nil245}246247return p.nextConsumer.ConsumeTraces(ctx, td)248}249250// collectEdge records the metrics for the given edge.251// Returns true if the edge is completed or expired and should be deleted.252func (p *processor) collectEdge(e *edge) {253if e.isCompleted() {254p.serviceGraphRequestTotal.WithLabelValues(e.clientService, e.serverService).Inc()255if e.failed {256p.serviceGraphRequestFailedTotal.WithLabelValues(e.clientService, e.serverService).Inc()257}258p.serviceGraphRequestServerHistogram.WithLabelValues(e.clientService, e.serverService).Observe(e.serverLatency.Seconds())259p.serviceGraphRequestClientHistogram.WithLabelValues(e.clientService, e.serverService).Observe(e.clientLatency.Seconds())260} else if e.isExpired() {261p.serviceGraphUnpairedSpansTotal.WithLabelValues(e.clientService, e.serverService).Inc()262}263}264265func (p *processor) consume(trace ptrace.Traces) error {266var totalDroppedSpans int267rSpansSlice := trace.ResourceSpans()268for i := 0; i < rSpansSlice.Len(); i++ {269rSpan := rSpansSlice.At(i)270271svc, ok := rSpan.Resource().Attributes().Get(semconv.AttributeServiceName)272if !ok || svc.Str() == "" {273continue274}275276ssSlice := rSpan.ScopeSpans()277for j := 0; j < ssSlice.Len(); j++ {278ils := ssSlice.At(j)279280for k := 0; k < ils.Spans().Len(); k++ {281span := ils.Spans().At(k)282283switch span.Kind() {284case ptrace.SpanKindClient:285k := key(span.TraceID().HexString(), span.SpanID().HexString())286287edge, err := p.store.upsertEdge(k, func(e *edge) {288e.clientService = svc.Str()289e.clientLatency = spanDuration(span)290e.failed = e.failed || p.spanFailed(span) // keep request as failed if any span is failed291})292293if errors.Is(err, errTooManyItems) {294totalDroppedSpans++295p.serviceGraphDroppedSpansTotal.WithLabelValues(svc.Str(), "").Inc()296continue297}298// upsertEdge will only return this errTooManyItems299if err != nil {300return err301}302303if edge.isCompleted() {304p.collectCh <- k305}306307case ptrace.SpanKindServer:308k := key(span.TraceID().HexString(), span.ParentSpanID().HexString())309310edge, err := p.store.upsertEdge(k, func(e *edge) {311e.serverService = svc.Str()312e.serverLatency = spanDuration(span)313e.failed = e.failed || p.spanFailed(span) // keep request as failed if any span is failed314})315316if errors.Is(err, errTooManyItems) {317totalDroppedSpans++318p.serviceGraphDroppedSpansTotal.WithLabelValues("", svc.Str()).Inc()319continue320}321// upsertEdge will only return this errTooManyItems322if err != nil {323return err324}325326if edge.isCompleted() {327p.collectCh <- k328}329330default:331}332}333}334}335336if totalDroppedSpans > 0 {337return &tooManySpansError{338droppedSpans: totalDroppedSpans,339}340}341return nil342}343344func (p *processor) spanFailed(span ptrace.Span) bool {345// Request considered failed if status is not 2XX or added as a successful status code346if statusCode, ok := span.Attributes().Get(semconv.AttributeHTTPStatusCode); ok {347sc := int(statusCode.Int())348if _, ok := p.httpSuccessCodeMap[sc]; !ok && sc/100 != 2 {349return true350}351}352353// Request considered failed if status is not OK or added as a successful status code354if statusCode, ok := span.Attributes().Get(semconv.AttributeRPCGRPCStatusCode); ok {355sc := int(statusCode.Int())356if _, ok := p.grpcSuccessCodeMap[sc]; !ok && sc != int(codes.OK) {357return true358}359}360361return span.Status().Code() == ptrace.StatusCodeError362}363364func spanDuration(span ptrace.Span) time.Duration {365return span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime())366}367368func key(k1, k2 string) string {369return fmt.Sprintf("%s-%s", k1, k2)370}371372373