Path: blob/main/pkg/traces/promsdprocessor/prom_sd_processor.go
4094 views
package promsdprocessor12import (3"context"4"fmt"5"net"6"strings"7"sync"89util "github.com/cortexproject/cortex/pkg/util/log"10"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/prometheus/common/model"13"github.com/prometheus/prometheus/config"14"github.com/prometheus/prometheus/discovery"15"github.com/prometheus/prometheus/discovery/targetgroup"16"github.com/prometheus/prometheus/model/labels"17"github.com/prometheus/prometheus/model/relabel"18"go.opentelemetry.io/collector/client"19"go.opentelemetry.io/collector/component"20"go.opentelemetry.io/collector/consumer"21"go.opentelemetry.io/collector/pdata/pcommon"22"go.opentelemetry.io/collector/pdata/ptrace"23semconv "go.opentelemetry.io/collector/semconv/v1.6.1"24)2526type promServiceDiscoProcessor struct {27nextConsumer consumer.Traces28discoveryMgr *discovery.Manager29discoveryMgrStop context.CancelFunc30discoveryMgrCtx context.Context3132relabelConfigs map[string][]*relabel.Config33hostLabels map[string]model.LabelSet34mtx sync.Mutex3536operationType string37podAssociations []string3839logger log.Logger40}4142func newTraceProcessor(nextConsumer consumer.Traces, operationType string, podAssociations []string, scrapeConfigs []*config.ScrapeConfig) (component.TracesProcessor, error) {43ctx, cancel := context.WithCancel(context.Background())4445logger := log.With(util.Logger, "component", "traces service disco")46mgr := discovery.NewManager(ctx, logger, discovery.Name("traces service disco"))4748relabelConfigs := map[string][]*relabel.Config{}49managerConfig := map[string]discovery.Configs{}50for _, v := range scrapeConfigs {51managerConfig[v.JobName] = v.ServiceDiscoveryConfigs52relabelConfigs[v.JobName] = v.RelabelConfigs53}5455err := mgr.ApplyConfig(managerConfig)56if err != nil {57cancel()58return nil, err59}6061switch operationType {62case OperationTypeUpsert, OperationTypeInsert, OperationTypeUpdate:63case "": // Use Upsert by default64operationType = OperationTypeUpsert65default:66cancel()67return nil, fmt.Errorf("unknown operation type %s", operationType)68}6970for _, podAssociation := range podAssociations {71switch podAssociation {72case podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel, podAssociationHostnameLabel, podAssociationConnectionIP:73default:74cancel()75return nil, fmt.Errorf("unknown pod association %s", podAssociation)76}77}7879if len(podAssociations) == 0 {80podAssociations = []string{podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel, podAssociationHostnameLabel, podAssociationConnectionIP}81}8283if nextConsumer == nil {84cancel()85return nil, component.ErrNilNextConsumer86}87return &promServiceDiscoProcessor{88nextConsumer: nextConsumer,89discoveryMgr: mgr,90discoveryMgrStop: cancel,91discoveryMgrCtx: ctx,92relabelConfigs: relabelConfigs,93hostLabels: make(map[string]model.LabelSet),94logger: logger,95operationType: operationType,96podAssociations: podAssociations,97}, nil98}99100func (p *promServiceDiscoProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {101rss := td.ResourceSpans()102for i := 0; i < rss.Len(); i++ {103rs := rss.At(i)104105p.processAttributes(ctx, rs.Resource().Attributes())106}107108return p.nextConsumer.ConsumeTraces(ctx, td)109}110111func stringAttributeFromMap(attrs pcommon.Map, key string) string {112if attr, ok := attrs.Get(key); ok {113if attr.Type() == pcommon.ValueTypeStr {114return attr.Str()115}116}117return ""118}119120func (p *promServiceDiscoProcessor) getConnectionIP(ctx context.Context) string {121c := client.FromContext(ctx)122if c.Addr == nil {123return ""124}125126host := c.Addr.String()127if strings.Contains(host, ":") {128var err error129splitHost, _, err := net.SplitHostPort(host)130if err != nil {131// It's normal for this to fail for IPv6 address strings that don't actually include a port.132level.Debug(p.logger).Log("msg", "unable to split connection host and port", "host", host, "err", err)133} else {134host = splitHost135}136}137138return host139}140141func (p *promServiceDiscoProcessor) getPodIP(ctx context.Context, attrs pcommon.Map) string {142for _, podAssociation := range p.podAssociations {143switch podAssociation {144case podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel:145ip := stringAttributeFromMap(attrs, podAssociation)146if ip != "" {147return ip148}149case podAssociationHostnameLabel:150hostname := stringAttributeFromMap(attrs, semconv.AttributeHostName)151if net.ParseIP(hostname) != nil {152return hostname153}154case podAssociationConnectionIP:155ip := p.getConnectionIP(ctx)156if ip != "" {157return ip158}159}160}161return ""162}163164func (p *promServiceDiscoProcessor) processAttributes(ctx context.Context, attrs pcommon.Map) {165ip := p.getPodIP(ctx, attrs)166// have to have an ip for labels lookup167if ip == "" {168level.Debug(p.logger).Log("msg", "unable to find ip in span attributes, skipping attribute addition")169return170}171172p.mtx.Lock()173defer p.mtx.Unlock()174175labels, ok := p.hostLabels[ip]176if !ok {177level.Debug(p.logger).Log("msg", "unable to find matching hostLabels", "ip", ip)178return179}180181for k, v := range labels {182switch p.operationType {183case OperationTypeUpsert:184attrs.PutStr(string(k), string(v))185case OperationTypeInsert:186if _, ok := attrs.Get(string(k)); !ok {187attrs.PutStr(string(k), string(v))188}189case OperationTypeUpdate:190if toVal, ok := attrs.Get(string(k)); ok {191toVal.SetStr(string(v))192}193}194}195}196197func (p *promServiceDiscoProcessor) Capabilities() consumer.Capabilities {198return consumer.Capabilities{MutatesData: true}199}200201// Start is invoked during service startup.202func (p *promServiceDiscoProcessor) Start(_ context.Context, _ component.Host) error {203go p.watchServiceDiscovery()204205go func() {206err := p.discoveryMgr.Run()207if err != nil && err != context.Canceled {208level.Error(p.logger).Log("msg", "failed to start prom svc disco. relabeling disabled", "err", err)209}210}()211212return nil213}214215// Shutdown is invoked during service shutdown.216func (p *promServiceDiscoProcessor) Shutdown(context.Context) error {217if p.discoveryMgrStop != nil {218p.discoveryMgrStop()219}220return nil221}222223func (p *promServiceDiscoProcessor) watchServiceDiscovery() {224for {225// p.discoveryMgr.SyncCh() is never closed so we need to watch the context as well to properly exit this goroutine226select {227case targetGroups := <-p.discoveryMgr.SyncCh():228hostLabels := make(map[string]model.LabelSet)229level.Debug(p.logger).Log("msg", "syncing target groups", "count", len(targetGroups))230for jobName, groups := range targetGroups {231p.syncGroups(jobName, groups, hostLabels)232}233p.mtx.Lock()234p.hostLabels = hostLabels235p.mtx.Unlock()236case <-p.discoveryMgrCtx.Done():237return238}239}240}241242func (p *promServiceDiscoProcessor) syncGroups(jobName string, groups []*targetgroup.Group, hostLabels map[string]model.LabelSet) {243level.Debug(p.logger).Log("msg", "syncing target group", "jobName", jobName)244for _, g := range groups {245p.syncTargets(jobName, g, hostLabels)246}247}248249func (p *promServiceDiscoProcessor) syncTargets(jobName string, group *targetgroup.Group, hostLabels map[string]model.LabelSet) {250level.Debug(p.logger).Log("msg", "syncing targets", "count", len(group.Targets))251252relabelConfig := p.relabelConfigs[jobName]253if relabelConfig == nil {254level.Warn(p.logger).Log("msg", "relabel config not found for job. skipping labeling", "jobName", jobName)255return256}257258for _, t := range group.Targets {259discoveredLabels := group.Labels.Merge(t)260261level.Debug(p.logger).Log("discoveredLabels", discoveredLabels)262var labelMap = make(map[string]string)263for k, v := range discoveredLabels.Clone() {264labelMap[string(k)] = string(v)265}266processedLabels, keep := relabel.Process(labels.FromMap(labelMap), relabelConfig...)267level.Debug(p.logger).Log("processedLabels", processedLabels)268if !keep {269continue270}271272var labels = make(model.LabelSet)273for k, v := range processedLabels.Map() {274labels[model.LabelName(k)] = model.LabelValue(v)275}276277address, ok := labels[model.AddressLabel]278if !ok {279level.Warn(p.logger).Log("msg", "ignoring target, unable to find address", "labels", labels.String())280continue281}282283host := string(address)284if strings.Contains(host, ":") {285var err error286host, _, err = net.SplitHostPort(host)287if err != nil {288level.Warn(p.logger).Log("msg", "unable to split host port", "address", address, "err", err)289continue290}291}292293for k := range labels {294if strings.HasPrefix(string(k), "__") {295delete(labels, k)296}297}298299level.Debug(p.logger).Log("msg", "adding host to hostLabels", "host", host)300hostLabels[host] = labels301}302}303304305