Path: blob/main/component/loki/source/azure_event_hubs/internal/parser/parser.go
5386 views
package parser12// This code is copied from Promtail. The parser package is used to3// enable parsing entries from Azure Event Hubs entries and forward them4// to other loki components.56import (7"bytes"8"encoding/json"9"errors"10"strings"11"time"1213"github.com/Shopify/sarama"14"github.com/grafana/agent/component/common/loki"15"github.com/grafana/loki/pkg/logproto"16"github.com/prometheus/common/model"17"github.com/prometheus/prometheus/model/labels"18"github.com/prometheus/prometheus/model/relabel"19)2021type azureMonitorResourceLogs struct {22Records []json.RawMessage `json:"records"`23}2425// validate check if message contains records26func (l azureMonitorResourceLogs) validate() error {27if len(l.Records) == 0 {28return errors.New("records are empty")29}3031return nil32}3334// azureMonitorResourceLog used to unmarshal common schema for Azure resource logs35// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema36type azureMonitorResourceLog struct {37Time string `json:"time"`38Category string `json:"category"`39ResourceID string `json:"resourceId"`40OperationName string `json:"operationName"`41}4243// validate check if fields marked as required by schema for Azure resource log are not empty44func (l azureMonitorResourceLog) validate() error {45valid := len(l.Time) != 0 &&46len(l.Category) != 0 &&47len(l.ResourceID) != 0 &&48len(l.OperationName) != 04950if !valid {51return errors.New("required field or fields is empty")52}5354return nil55}5657type AzureEventHubsTargetMessageParser struct {58DisallowCustomMessages bool59}6061func (e *AzureEventHubsTargetMessageParser) Parse(message *sarama.ConsumerMessage, labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) {62messageTime := time.Now()63if useIncomingTimestamp {64messageTime = message.Timestamp65}6667data, err := e.tryUnmarshal(message.Value)68if err == nil {69err = data.validate()70}7172if err != nil {73if e.DisallowCustomMessages {74return []loki.Entry{}, err75}7677return []loki.Entry{e.entryWithCustomPayload(message.Value, labelSet, messageTime)}, nil78}7980return e.processRecords(labelSet, relabels, useIncomingTimestamp, data.Records, messageTime)81}8283// tryUnmarshal tries to unmarshal raw message data, in case of error tries to fix it and unmarshal fixed data.84// If both attempts fail, return the initial unmarshal error.85func (e *AzureEventHubsTargetMessageParser) tryUnmarshal(message []byte) (*azureMonitorResourceLogs, error) {86data := &azureMonitorResourceLogs{}87err := json.Unmarshal(message, data)88if err == nil {89return data, nil90}9192// try fix json as mentioned here:93// https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps?fbclid=IwAR3pK8Nj60GFBtKemqwfpiZyf3rerjowPH_j_qIuNrw_uLDesYvC4mTkfgs94body := bytes.ReplaceAll(message, []byte(`'`), []byte(`"`))95if json.Unmarshal(body, data) != nil {96// return original error97return nil, err98}99100return data, nil101}102103func (e *AzureEventHubsTargetMessageParser) entryWithCustomPayload(body []byte, labelSet model.LabelSet, messageTime time.Time) loki.Entry {104return loki.Entry{105Labels: labelSet,106Entry: logproto.Entry{107Timestamp: messageTime,108Line: string(body),109},110}111}112113// processRecords handles the case when message is a valid json with a key `records`. It can be either a custom payload or a resource log.114func (e *AzureEventHubsTargetMessageParser) processRecords(labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool, records []json.RawMessage, messageTime time.Time) ([]loki.Entry, error) {115result := make([]loki.Entry, 0, len(records))116for _, m := range records {117entry, err := e.parseRecord(m, labelSet, relabels, useIncomingTimestamp, messageTime)118if err != nil {119return nil, err120}121result = append(result, entry)122}123124return result, nil125}126127// parseRecord parses a single value from the "records" in the original message.128// It can also handle a case when the record contains custom data and doesn't match the schema for Azure resource logs.129func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet model.LabelSet, relabelConfig []*relabel.Config, useIncomingTimestamp bool, messageTime time.Time) (loki.Entry, error) {130logRecord := &azureMonitorResourceLog{}131err := json.Unmarshal(record, logRecord)132if err == nil {133err = logRecord.validate()134}135136if err != nil {137if e.DisallowCustomMessages {138return loki.Entry{}, err139}140141return e.entryWithCustomPayload(record, labelSet, messageTime), nil142}143144logLabels := e.getLabels(logRecord, relabelConfig)145ts := e.getTime(messageTime, useIncomingTimestamp, logRecord)146147return loki.Entry{148Labels: labelSet.Merge(logLabels),149Entry: logproto.Entry{150Timestamp: ts,151Line: string(record),152},153}, nil154}155156func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time {157if !useIncomingTimestamp || logRecord.Time == "" {158return messageTime159}160161recordTime, err := time.Parse(time.RFC3339, logRecord.Time)162if err != nil {163return messageTime164}165166return recordTime167}168169func (e *AzureEventHubsTargetMessageParser) getLabels(logRecord *azureMonitorResourceLog, relabelConfig []*relabel.Config) model.LabelSet {170lbs := labels.Labels{171{172Name: "__azure_event_hubs_category",173Value: logRecord.Category,174},175}176177var processed labels.Labels178// apply relabeling179if len(relabelConfig) > 0 {180processed, _ = relabel.Process(lbs, relabelConfig...)181} else {182processed = lbs183}184185// final labelset that will be sent to loki186resultLabels := make(model.LabelSet)187for _, lbl := range processed {188// ignore internal labels189if strings.HasPrefix(lbl.Name, "__") {190continue191}192// ignore invalid labels193if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() {194continue195}196resultLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)197}198199return resultLabels200}201202203