Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/azure_event_hubs/internal/parser/parser.go
5386 views
1
package parser
2
3
// This code is copied from Promtail. The parser package is used to
4
// enable parsing entries from Azure Event Hubs entries and forward them
5
// to other loki components.
6
7
import (
8
"bytes"
9
"encoding/json"
10
"errors"
11
"strings"
12
"time"
13
14
"github.com/Shopify/sarama"
15
"github.com/grafana/agent/component/common/loki"
16
"github.com/grafana/loki/pkg/logproto"
17
"github.com/prometheus/common/model"
18
"github.com/prometheus/prometheus/model/labels"
19
"github.com/prometheus/prometheus/model/relabel"
20
)
21
22
type azureMonitorResourceLogs struct {
23
Records []json.RawMessage `json:"records"`
24
}
25
26
// validate check if message contains records
27
func (l azureMonitorResourceLogs) validate() error {
28
if len(l.Records) == 0 {
29
return errors.New("records are empty")
30
}
31
32
return nil
33
}
34
35
// azureMonitorResourceLog used to unmarshal common schema for Azure resource logs
36
// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema
37
type azureMonitorResourceLog struct {
38
Time string `json:"time"`
39
Category string `json:"category"`
40
ResourceID string `json:"resourceId"`
41
OperationName string `json:"operationName"`
42
}
43
44
// validate check if fields marked as required by schema for Azure resource log are not empty
45
func (l azureMonitorResourceLog) validate() error {
46
valid := len(l.Time) != 0 &&
47
len(l.Category) != 0 &&
48
len(l.ResourceID) != 0 &&
49
len(l.OperationName) != 0
50
51
if !valid {
52
return errors.New("required field or fields is empty")
53
}
54
55
return nil
56
}
57
58
type AzureEventHubsTargetMessageParser struct {
59
DisallowCustomMessages bool
60
}
61
62
func (e *AzureEventHubsTargetMessageParser) Parse(message *sarama.ConsumerMessage, labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error) {
63
messageTime := time.Now()
64
if useIncomingTimestamp {
65
messageTime = message.Timestamp
66
}
67
68
data, err := e.tryUnmarshal(message.Value)
69
if err == nil {
70
err = data.validate()
71
}
72
73
if err != nil {
74
if e.DisallowCustomMessages {
75
return []loki.Entry{}, err
76
}
77
78
return []loki.Entry{e.entryWithCustomPayload(message.Value, labelSet, messageTime)}, nil
79
}
80
81
return e.processRecords(labelSet, relabels, useIncomingTimestamp, data.Records, messageTime)
82
}
83
84
// tryUnmarshal tries to unmarshal raw message data, in case of error tries to fix it and unmarshal fixed data.
85
// If both attempts fail, return the initial unmarshal error.
86
func (e *AzureEventHubsTargetMessageParser) tryUnmarshal(message []byte) (*azureMonitorResourceLogs, error) {
87
data := &azureMonitorResourceLogs{}
88
err := json.Unmarshal(message, data)
89
if err == nil {
90
return data, nil
91
}
92
93
// try fix json as mentioned here:
94
// https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps?fbclid=IwAR3pK8Nj60GFBtKemqwfpiZyf3rerjowPH_j_qIuNrw_uLDesYvC4mTkfgs
95
body := bytes.ReplaceAll(message, []byte(`'`), []byte(`"`))
96
if json.Unmarshal(body, data) != nil {
97
// return original error
98
return nil, err
99
}
100
101
return data, nil
102
}
103
104
func (e *AzureEventHubsTargetMessageParser) entryWithCustomPayload(body []byte, labelSet model.LabelSet, messageTime time.Time) loki.Entry {
105
return loki.Entry{
106
Labels: labelSet,
107
Entry: logproto.Entry{
108
Timestamp: messageTime,
109
Line: string(body),
110
},
111
}
112
}
113
114
// processRecords handles the case when message is a valid json with a key `records`. It can be either a custom payload or a resource log.
115
func (e *AzureEventHubsTargetMessageParser) processRecords(labelSet model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool, records []json.RawMessage, messageTime time.Time) ([]loki.Entry, error) {
116
result := make([]loki.Entry, 0, len(records))
117
for _, m := range records {
118
entry, err := e.parseRecord(m, labelSet, relabels, useIncomingTimestamp, messageTime)
119
if err != nil {
120
return nil, err
121
}
122
result = append(result, entry)
123
}
124
125
return result, nil
126
}
127
128
// parseRecord parses a single value from the "records" in the original message.
129
// It can also handle a case when the record contains custom data and doesn't match the schema for Azure resource logs.
130
func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet model.LabelSet, relabelConfig []*relabel.Config, useIncomingTimestamp bool, messageTime time.Time) (loki.Entry, error) {
131
logRecord := &azureMonitorResourceLog{}
132
err := json.Unmarshal(record, logRecord)
133
if err == nil {
134
err = logRecord.validate()
135
}
136
137
if err != nil {
138
if e.DisallowCustomMessages {
139
return loki.Entry{}, err
140
}
141
142
return e.entryWithCustomPayload(record, labelSet, messageTime), nil
143
}
144
145
logLabels := e.getLabels(logRecord, relabelConfig)
146
ts := e.getTime(messageTime, useIncomingTimestamp, logRecord)
147
148
return loki.Entry{
149
Labels: labelSet.Merge(logLabels),
150
Entry: logproto.Entry{
151
Timestamp: ts,
152
Line: string(record),
153
},
154
}, nil
155
}
156
157
func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time {
158
if !useIncomingTimestamp || logRecord.Time == "" {
159
return messageTime
160
}
161
162
recordTime, err := time.Parse(time.RFC3339, logRecord.Time)
163
if err != nil {
164
return messageTime
165
}
166
167
return recordTime
168
}
169
170
func (e *AzureEventHubsTargetMessageParser) getLabels(logRecord *azureMonitorResourceLog, relabelConfig []*relabel.Config) model.LabelSet {
171
lbs := labels.Labels{
172
{
173
Name: "__azure_event_hubs_category",
174
Value: logRecord.Category,
175
},
176
}
177
178
var processed labels.Labels
179
// apply relabeling
180
if len(relabelConfig) > 0 {
181
processed, _ = relabel.Process(lbs, relabelConfig...)
182
} else {
183
processed = lbs
184
}
185
186
// final labelset that will be sent to loki
187
resultLabels := make(model.LabelSet)
188
for _, lbl := range processed {
189
// ignore internal labels
190
if strings.HasPrefix(lbl.Name, "__") {
191
continue
192
}
193
// ignore invalid labels
194
if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() {
195
continue
196
}
197
resultLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
198
}
199
200
return resultLabels
201
}
202
203