Path: blob/main/pkg/traces/automaticloggingprocessor/automaticloggingprocessor.go
4096 views
package automaticloggingprocessor12import (3"context"4"errors"5"fmt"6"strconv"7"time"89util "github.com/cortexproject/cortex/pkg/util/log"10"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/go-logfmt/logfmt"13"github.com/grafana/agent/pkg/logs"14"github.com/grafana/agent/pkg/operator/config"15"github.com/grafana/agent/pkg/traces/contextkeys"16"github.com/grafana/loki/clients/pkg/promtail/api"17"github.com/grafana/loki/pkg/logproto"18"github.com/prometheus/common/model"19"go.opentelemetry.io/collector/component"20"go.opentelemetry.io/collector/consumer"21"go.opentelemetry.io/collector/pdata/pcommon"22"go.opentelemetry.io/collector/pdata/ptrace"23semconv "go.opentelemetry.io/collector/semconv/v1.6.1"24"go.uber.org/atomic"25)2627const (28defaultLogsTag = "traces"29defaultServiceKey = "svc"30defaultSpanNameKey = "span"31defaultStatusKey = "status"32defaultDurationKey = "dur"33defaultTraceIDKey = "tid"3435defaultTimeout = time.Millisecond3637typeSpan = "span"38typeRoot = "root"39typeProcess = "process"40)4142type automaticLoggingProcessor struct {43nextConsumer consumer.Traces4445cfg *AutomaticLoggingConfig46logToStdout bool47logsInstance *logs.Instance48done atomic.Bool4950labels map[string]struct{}5152logger log.Logger53}5455func newTraceProcessor(nextConsumer consumer.Traces, cfg *AutomaticLoggingConfig) (component.TracesProcessor, error) {56logger := log.With(util.Logger, "component", "traces automatic logging")5758if nextConsumer == nil {59return nil, component.ErrNilNextConsumer60}6162if !cfg.Roots && !cfg.Processes && !cfg.Spans {63return nil, errors.New("automaticLoggingProcessor requires one of roots, processes, or spans to be enabled")64}6566if cfg.Timeout == 0 {67cfg.Timeout = defaultTimeout68}6970if cfg.Backend == "" {71cfg.Backend = BackendStdout72}7374if cfg.Backend != BackendLogs && cfg.Backend != BackendStdout {75return nil, fmt.Errorf("automaticLoggingProcessor requires a backend of type '%s' or '%s'", BackendLogs, BackendStdout)76}7778logToStdout := false79if cfg.Backend == BackendStdout {80logToStdout = true81}8283cfg.Overrides.LogsTag = override(cfg.Overrides.LogsTag, defaultLogsTag)84cfg.Overrides.ServiceKey = override(cfg.Overrides.ServiceKey, defaultServiceKey)85cfg.Overrides.SpanNameKey = override(cfg.Overrides.SpanNameKey, defaultSpanNameKey)86cfg.Overrides.StatusKey = override(cfg.Overrides.StatusKey, defaultStatusKey)87cfg.Overrides.DurationKey = override(cfg.Overrides.DurationKey, defaultDurationKey)88cfg.Overrides.TraceIDKey = override(cfg.Overrides.TraceIDKey, defaultTraceIDKey)8990labels := make(map[string]struct{}, len(cfg.Labels))91for _, l := range cfg.Labels {92labels[l] = struct{}{}93}9495return &automaticLoggingProcessor{96nextConsumer: nextConsumer,97cfg: cfg,98logToStdout: logToStdout,99logger: logger,100done: atomic.Bool{},101labels: labels,102}, nil103}104105func (p *automaticLoggingProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {106rsLen := td.ResourceSpans().Len()107for i := 0; i < rsLen; i++ {108rs := td.ResourceSpans().At(i)109ssLen := rs.ScopeSpans().Len()110111var svc string112svcAtt, ok := rs.Resource().Attributes().Get(semconv.AttributeServiceName)113if ok {114svc = svcAtt.Str()115}116117for j := 0; j < ssLen; j++ {118ss := rs.ScopeSpans().At(j)119spanLen := ss.Spans().Len()120121lastTraceID := ""122for k := 0; k < spanLen; k++ {123span := ss.Spans().At(k)124traceID := span.TraceID().HexString()125126if p.cfg.Spans {127keyValues := append(p.spanKeyVals(span), p.processKeyVals(rs.Resource(), svc)...)128p.exportToLogsInstance(typeSpan, traceID, p.spanLabels(keyValues), keyValues...)129}130131if p.cfg.Roots && span.ParentSpanID().IsEmpty() {132keyValues := append(p.spanKeyVals(span), p.processKeyVals(rs.Resource(), svc)...)133p.exportToLogsInstance(typeRoot, traceID, p.spanLabels(keyValues), keyValues...)134}135136if p.cfg.Processes && lastTraceID != traceID {137lastTraceID = traceID138keyValues := p.processKeyVals(rs.Resource(), svc)139p.exportToLogsInstance(typeProcess, traceID, p.spanLabels(keyValues), keyValues...)140}141}142}143}144145return p.nextConsumer.ConsumeTraces(ctx, td)146}147148func (p *automaticLoggingProcessor) spanLabels(keyValues []interface{}) model.LabelSet {149if len(keyValues) == 0 {150return model.LabelSet{}151}152ls := make(map[model.LabelName]model.LabelValue, len(keyValues)/2)153var (154k, v string155ok bool156)157for i := 0; i < len(keyValues); i += 2 {158if k, ok = keyValues[i].(string); !ok {159// Should never happen, all keys are strings160level.Error(p.logger).Log("msg", "error casting label key to string", "key", keyValues[i])161continue162}163// Try to cast value to string164if v, ok = keyValues[i+1].(string); !ok {165// If it's not a string, format it to its string representation166v = fmt.Sprintf("%v", keyValues[i+1])167}168if _, ok := p.labels[k]; ok {169// Loki does not accept "." as a valid character for labels170// Dots . are replaced by underscores _171k = config.SanitizeLabelName(k)172173ls[model.LabelName(k)] = model.LabelValue(v)174}175}176return ls177}178179func (p *automaticLoggingProcessor) Capabilities() consumer.Capabilities {180return consumer.Capabilities{}181}182183// Start is invoked during service startup.184func (p *automaticLoggingProcessor) Start(ctx context.Context, _ component.Host) error {185if !p.logToStdout {186logs, ok := ctx.Value(contextkeys.Logs).(*logs.Logs)187if !ok {188return fmt.Errorf("key does not contain a logs instance")189}190p.logsInstance = logs.Instance(p.cfg.LogsName)191if p.logsInstance == nil {192return fmt.Errorf("logs instance %s not found", p.cfg.LogsName)193}194}195return nil196}197198// Shutdown is invoked during service shutdown.199func (p *automaticLoggingProcessor) Shutdown(context.Context) error {200p.done.Store(true)201202return nil203}204205func (p *automaticLoggingProcessor) processKeyVals(resource pcommon.Resource, svc string) []interface{} {206atts := make([]interface{}, 0, 2) // 2 for service name207rsAtts := resource.Attributes()208209// name210atts = append(atts, p.cfg.Overrides.ServiceKey)211atts = append(atts, svc)212213for _, name := range p.cfg.ProcessAttributes {214att, ok := rsAtts.Get(name)215if ok {216// name/key val pairs217atts = append(atts, name)218atts = append(atts, attributeValue(att))219}220}221222return atts223}224225func (p *automaticLoggingProcessor) spanKeyVals(span ptrace.Span) []interface{} {226atts := make([]interface{}, 0, 8) // 8 for name, duration, service name and status227228atts = append(atts, p.cfg.Overrides.SpanNameKey)229atts = append(atts, span.Name())230231atts = append(atts, p.cfg.Overrides.DurationKey)232atts = append(atts, spanDuration(span))233234// Skip STATUS_CODE_UNSET to be less spammy235if span.Status().Code() != ptrace.StatusCodeUnset {236atts = append(atts, p.cfg.Overrides.StatusKey)237atts = append(atts, span.Status().Code())238}239240for _, name := range p.cfg.SpanAttributes {241att, ok := span.Attributes().Get(name)242if ok {243atts = append(atts, name)244atts = append(atts, attributeValue(att))245}246}247248return atts249}250251func (p *automaticLoggingProcessor) exportToLogsInstance(kind string, traceID string, labels model.LabelSet, keyvals ...interface{}) {252if p.done.Load() {253return254}255256keyvals = append(keyvals, []interface{}{p.cfg.Overrides.TraceIDKey, traceID}...)257line, err := logfmt.MarshalKeyvals(keyvals...)258if err != nil {259level.Warn(p.logger).Log("msg", "unable to marshal keyvals", "err", err)260return261}262263// if we're logging to stdout, log and bail264if p.logToStdout {265level.Info(p.logger).Log(keyvals...)266return267}268269// Add logs instance label270labels[model.LabelName(p.cfg.Overrides.LogsTag)] = model.LabelValue(kind)271272sent := p.logsInstance.SendEntry(api.Entry{273Labels: labels,274Entry: logproto.Entry{275Timestamp: time.Now(),276Line: string(line),277},278}, p.cfg.Timeout)279280if !sent {281level.Warn(p.logger).Log("msg", "failed to autolog to logs pipeline", "kind", kind, "traceid", traceID)282}283}284285func spanDuration(span ptrace.Span) string {286dur := int64(span.EndTimestamp() - span.StartTimestamp())287return strconv.FormatInt(dur, 10) + "ns"288}289290func attributeValue(att pcommon.Value) interface{} {291switch att.Type() {292case pcommon.ValueTypeStr:293return att.Str()294case pcommon.ValueTypeInt:295return att.Int()296case pcommon.ValueTypeDouble:297return att.Double()298case pcommon.ValueTypeBool:299return att.Bool()300case pcommon.ValueTypeMap:301return att.Map()302case pcommon.ValueTypeSlice:303return att.Slice()304}305return nil306}307308func override(cfgValue string, defaultValue string) string {309if cfgValue == "" {310return defaultValue311}312return cfgValue313}314315316