Path: blob/main/pkg/traces/promsdprocessor/prom_sd_processor_test.go
4094 views
package promsdprocessor12import (3"context"4"net"5"testing"67"github.com/go-kit/log"8"github.com/prometheus/common/model"9"github.com/prometheus/prometheus/discovery/targetgroup"10"github.com/prometheus/prometheus/model/relabel"11"github.com/stretchr/testify/assert"12"github.com/stretchr/testify/require"13"go.opentelemetry.io/collector/client"14"go.opentelemetry.io/collector/consumer/consumertest"15"go.opentelemetry.io/collector/pdata/pcommon"16semconv "go.opentelemetry.io/collector/semconv/v1.6.1"17)1819func TestSyncGroups(t *testing.T) {20tests := []struct {21name string22jobToSync string23relabelCfgs map[string][]*relabel.Config24targets []model.LabelSet25expected map[string]model.LabelSet26}{27{28name: "empty",29jobToSync: "",30relabelCfgs: map[string][]*relabel.Config{},31targets: []model.LabelSet{},32expected: map[string]model.LabelSet{},33},34{35name: "no relabeling",36jobToSync: "job",37relabelCfgs: map[string][]*relabel.Config{38"job": {},39},40targets: []model.LabelSet{41{42"__address__": "127.0.0.1",43},44},45expected: map[string]model.LabelSet{46"127.0.0.1": {},47},48},49{50name: "strip port",51jobToSync: "job",52relabelCfgs: map[string][]*relabel.Config{53"job": {},54},55targets: []model.LabelSet{56{57"__address__": "127.0.0.1:8888",58"label": "val",59},60},61expected: map[string]model.LabelSet{62"127.0.0.1": {63"label": "val",64},65},66},67{68name: "passthrough",69jobToSync: "job",70relabelCfgs: map[string][]*relabel.Config{71"job": {},72},73targets: []model.LabelSet{74{75"__address__": "127.0.0.1",76"label": "val",77},78},79expected: map[string]model.LabelSet{80"127.0.0.1": {81"label": "val",82},83},84},85{86name: "ignore metadata",87jobToSync: "job",88relabelCfgs: map[string][]*relabel.Config{89"job": {},90},91targets: []model.LabelSet{92{93"__address__": "127.0.0.1",94"__ignore": "ignore",95},96},97expected: map[string]model.LabelSet{98"127.0.0.1": {},99},100},101}102103for _, tc := range tests {104t.Run(tc.name, func(t *testing.T) {105groups := []*targetgroup.Group{106{107Targets: tc.targets,108},109}110111p := &promServiceDiscoProcessor{112logger: log.NewNopLogger(),113relabelConfigs: tc.relabelCfgs,114}115116hostLabels := make(map[string]model.LabelSet)117p.syncGroups(tc.jobToSync, groups, hostLabels)118119assert.Equal(t, tc.expected, hostLabels)120})121}122}123124func TestOperationType(t *testing.T) {125const (126attrKey = "key"127attrIP = "1.1.1.1"128)129testCases := []struct {130name string131operationType string132attributeExists bool133newValue string134expectedValue string135}{136{137name: "Upsert updates the attribute already exists",138operationType: OperationTypeUpsert,139attributeExists: true,140newValue: "new-value",141expectedValue: "new-value",142},143{144name: "Update updates the attribute already exists",145operationType: OperationTypeUpdate,146attributeExists: true,147newValue: "new-value",148expectedValue: "new-value",149},150{151name: "Insert does not update the attribute if it's already present",152operationType: OperationTypeInsert,153attributeExists: true,154newValue: "new-value",155expectedValue: "old-value",156},157{158name: "Upsert updates the attribute if it isn't present",159operationType: OperationTypeUpsert,160attributeExists: false,161newValue: "new-value",162expectedValue: "new-value",163},164{165name: "Update updates the attribute already exists",166operationType: OperationTypeUpdate,167attributeExists: false,168newValue: "new-value",169expectedValue: "",170},171{172name: "Insert updates the attribute if it isn't present",173operationType: OperationTypeInsert,174attributeExists: false,175newValue: "new-value",176expectedValue: "new-value",177},178}179180for _, tc := range testCases {181t.Run(tc.name, func(t *testing.T) {182mockProcessor := new(consumertest.TracesSink)183p, err := newTraceProcessor(mockProcessor, tc.operationType, nil, nil)184require.NoError(t, err)185186attrMap := pcommon.NewMap()187if tc.attributeExists {188attrMap.PutStr(attrKey, "old-value")189}190attrMap.PutStr(semconv.AttributeNetHostIP, attrIP)191192hostLabels := map[string]model.LabelSet{193attrIP: {194attrKey: model.LabelValue(tc.newValue),195},196}197p.(*promServiceDiscoProcessor).hostLabels = hostLabels198p.(*promServiceDiscoProcessor).processAttributes(context.TODO(), attrMap)199200actualAttrValue, _ := attrMap.Get(attrKey)201assert.Equal(t, tc.expectedValue, actualAttrValue.Str())202})203}204}205206func TestPodAssociation(t *testing.T) {207const ipStr = "1.1.1.1"208209testCases := []struct {210name string211podAssociations []string212ctxFn func(t *testing.T) context.Context213attrMapFn func(t *testing.T) pcommon.Map214expectedIP string215}{216{217name: "connection IP (HTTP & gRPC)",218ctxFn: func(t *testing.T) context.Context {219info := client.Info{220Addr: &net.IPAddr{221IP: net.ParseIP(net.ParseIP(ipStr).String()),222},223}224return client.NewContext(context.Background(), info)225},226attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },227expectedIP: ipStr,228},229{230name: "connection IP that includes a port number",231ctxFn: func(t *testing.T) context.Context {232info := client.Info{233Addr: &net.TCPAddr{234IP: net.ParseIP(net.ParseIP(ipStr).String()),235Port: 1234,236},237}238return client.NewContext(context.Background(), info)239},240attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },241expectedIP: ipStr,242},243{244name: "connection IP is empty",245podAssociations: []string{podAssociationConnectionIP},246ctxFn: func(t *testing.T) context.Context {247c := client.FromContext(context.Background())248return client.NewContext(context.Background(), c)249},250attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },251expectedIP: "",252},253{254name: "ip attribute",255ctxFn: func(t *testing.T) context.Context { return context.Background() },256attrMapFn: func(*testing.T) pcommon.Map {257attrMap := pcommon.NewMap()258attrMap.PutStr("ip", ipStr)259return attrMap260},261expectedIP: ipStr,262},263{264name: "net.host.ip attribute",265ctxFn: func(t *testing.T) context.Context { return context.Background() },266attrMapFn: func(*testing.T) pcommon.Map {267attrMap := pcommon.NewMap()268attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)269return attrMap270},271expectedIP: ipStr,272},273{274name: "k8s ip attribute",275ctxFn: func(t *testing.T) context.Context { return context.Background() },276attrMapFn: func(*testing.T) pcommon.Map {277attrMap := pcommon.NewMap()278attrMap.PutStr("k8s.pod.ip", ipStr)279return attrMap280},281expectedIP: ipStr,282},283{284name: "ip from hostname",285ctxFn: func(t *testing.T) context.Context { return context.Background() },286attrMapFn: func(*testing.T) pcommon.Map {287attrMap := pcommon.NewMap()288attrMap.PutStr(semconv.AttributeHostName, ipStr)289return attrMap290},291expectedIP: ipStr,292},293{294name: "uses attr before context (default associations)",295ctxFn: func(t *testing.T) context.Context {296info := client.Info{297Addr: &net.IPAddr{298IP: net.ParseIP("2.2.2.2"),299},300}301return client.NewContext(context.Background(), info)302},303attrMapFn: func(*testing.T) pcommon.Map {304attrMap := pcommon.NewMap()305attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)306return attrMap307},308expectedIP: ipStr,309},310{311name: "uses attr before hostname (default associations)",312ctxFn: func(t *testing.T) context.Context { return context.Background() },313attrMapFn: func(*testing.T) pcommon.Map {314attrMap := pcommon.NewMap()315attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)316attrMap.PutStr(semconv.AttributeHostName, "3.3.3.3")317return attrMap318},319expectedIP: ipStr,320},321{322name: "ip attribute but not as pod association",323podAssociations: []string{podAssociationk8sIPLabel},324ctxFn: func(t *testing.T) context.Context { return context.Background() },325attrMapFn: func(*testing.T) pcommon.Map {326attrMap := pcommon.NewMap()327attrMap.PutStr("ip", ipStr)328return attrMap329},330expectedIP: "",331},332{333name: "uses hostname before attribute (reverse order from default)",334podAssociations: []string{podAssociationHostnameLabel, podAssociationOTelIPLabel},335ctxFn: func(t *testing.T) context.Context { return context.Background() },336attrMapFn: func(*testing.T) pcommon.Map {337attrMap := pcommon.NewMap()338attrMap.PutStr(semconv.AttributeNetHostIP, "3.3.3.3")339attrMap.PutStr(semconv.AttributeHostName, ipStr)340return attrMap341},342expectedIP: ipStr,343},344}345346for _, tc := range testCases {347t.Run(tc.name, func(t *testing.T) {348mockProcessor := new(consumertest.TracesSink)349p, err := newTraceProcessor(mockProcessor, "", tc.podAssociations, nil)350require.NoError(t, err)351352ip := p.(*promServiceDiscoProcessor).getPodIP(tc.ctxFn(t), tc.attrMapFn(t))353assert.Equal(t, tc.expectedIP, ip)354})355}356}357358359