Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/formatter.go
4096 views
1
package gcplogtarget
2
3
// This code is copied from Promtail. The gcplogtarget package is used to
4
// configure and run the targets that can read log entries from cloud resource
5
// logs like bucket logs, load balancer logs, and Kubernetes cluster logs
6
// from GCP.
7
8
import (
9
"fmt"
10
"strings"
11
"time"
12
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/loki/pkg/logproto"
15
"github.com/grafana/loki/pkg/util"
16
json "github.com/json-iterator/go"
17
"github.com/prometheus/common/model"
18
"github.com/prometheus/prometheus/model/labels"
19
"github.com/prometheus/prometheus/model/relabel"
20
)
21
22
// GCPLogEntry that will be written to the pubsub topic according to the following spec.
23
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
24
type GCPLogEntry struct {
25
// why was this here?? nolint:revive
26
LogName string `json:"logName"`
27
Resource struct {
28
Type string `json:"type"`
29
Labels map[string]string `json:"labels"`
30
} `json:"resource"`
31
Timestamp string `json:"timestamp"`
32
33
// The time the log entry was received by Logging.
34
// Its important that `Timestamp` is optional in GCE log entry.
35
ReceiveTimestamp string `json:"receiveTimestamp"`
36
37
TextPayload string `json:"textPayload"`
38
39
// NOTE(kavi): There are other fields on GCPLogEntry. but we need only need
40
// above fields for now anyway we will be sending the entire entry to Loki.
41
}
42
43
func parseGCPLogsEntry(data []byte, other model.LabelSet, otherInternal labels.Labels, useIncomingTimestamp bool, relabelConfig []*relabel.Config) (loki.Entry, error) {
44
var ge GCPLogEntry
45
46
if err := json.Unmarshal(data, &ge); err != nil {
47
return loki.Entry{}, err
48
}
49
50
// Adding mandatory labels for gcplog
51
lbs := labels.NewBuilder(otherInternal)
52
lbs.Set("__gcp_logname", ge.LogName)
53
lbs.Set("__gcp_resource_type", ge.Resource.Type)
54
55
// labels from gcp log entry. Add it as internal labels
56
for k, v := range ge.Resource.Labels {
57
lbs.Set("__gcp_resource_labels_"+util.SnakeCase(k), v)
58
}
59
60
var processed labels.Labels
61
62
// apply relabeling
63
if len(relabelConfig) > 0 {
64
processed, _ = relabel.Process(lbs.Labels(nil), relabelConfig...)
65
} else {
66
processed = lbs.Labels(nil)
67
}
68
69
// final labelset that will be sent to loki
70
labels := make(model.LabelSet)
71
for _, lbl := range processed {
72
// ignore internal labels
73
if strings.HasPrefix(lbl.Name, "__") {
74
continue
75
}
76
// ignore invalid labels
77
if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() {
78
continue
79
}
80
labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
81
}
82
83
// add labels coming from scrapeconfig
84
labels = labels.Merge(other)
85
86
ts := time.Now()
87
line := string(data)
88
89
if useIncomingTimestamp {
90
tt := ge.Timestamp
91
if tt == "" {
92
tt = ge.ReceiveTimestamp
93
}
94
var err error
95
ts, err = time.Parse(time.RFC3339, tt)
96
if err != nil {
97
return loki.Entry{}, fmt.Errorf("invalid timestamp format: %w", err)
98
}
99
100
if ts.IsZero() {
101
return loki.Entry{}, fmt.Errorf("no timestamp found in the log entry")
102
}
103
}
104
105
// Send only `ge.textPayload` as log line if its present.
106
if strings.TrimSpace(ge.TextPayload) != "" {
107
line = ge.TextPayload
108
}
109
110
return loki.Entry{
111
Labels: labels,
112
Entry: logproto.Entry{
113
Timestamp: ts,
114
Line: line,
115
},
116
}, nil
117
}
118
119