Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_translation.go
4096 views
1
package gcplogtarget
2
3
// This code is copied from Promtail. The gcplogtarget package is used to
4
// configure and run the targets that can read log entries from cloud resource
5
// logs like bucket logs, load balancer logs, and Kubernetes cluster logs
6
// from GCP.
7
8
import (
9
"encoding/base64"
10
"fmt"
11
"strings"
12
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/loki/pkg/util"
15
"github.com/prometheus/common/model"
16
"github.com/prometheus/prometheus/model/labels"
17
"github.com/prometheus/prometheus/model/relabel"
18
)
19
20
// ReservedLabelTenantID reserved to override the tenant ID while processing
21
// pipeline stages
22
const ReservedLabelTenantID = "__tenant_id__"
23
24
// PushMessage is the POST body format sent by GCP PubSub push subscriptions.
25
// See https://cloud.google.com/pubsub/docs/push for details.
26
type PushMessage struct {
27
Message struct {
28
Attributes map[string]string `json:"attributes"`
29
Data string `json:"data"`
30
ID string `json:"message_id"`
31
PublishTimestamp string `json:"publish_time"`
32
} `json:"message"`
33
Subscription string `json:"subscription"`
34
}
35
36
// Validate checks that the required fields of a PushMessage are set.
37
func (pm PushMessage) Validate() error {
38
if pm.Message.Data == "" {
39
return fmt.Errorf("push message has no data")
40
}
41
if pm.Message.ID == "" {
42
return fmt.Errorf("push message has no ID")
43
}
44
if pm.Subscription == "" {
45
return fmt.Errorf("push message has no subscription")
46
}
47
return nil
48
}
49
50
// translate converts a GCP PushMessage into a loki.Entry. It parses the
51
// push-specific labels and delegates the rest to parseGCPLogsEntry.
52
func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (loki.Entry, error) {
53
// Collect all push-specific labels. Every one of them is first configured
54
// as optional, and the user can relabel it if needed. The relabeling and
55
// internal drop is handled in parseGCPLogsEntry.
56
lbs := labels.NewBuilder(nil)
57
lbs.Set("__gcp_message_id", m.Message.ID)
58
lbs.Set("__gcp_subscription_name", m.Subscription)
59
for k, v := range m.Message.Attributes {
60
lbs.Set(fmt.Sprintf("__gcp_attributes_%s", convertToLokiCompatibleLabel(k)), v)
61
}
62
63
// Add fixed labels coming from the target configuration
64
fixedLabels := other.Clone()
65
66
// If the incoming request carries the tenant id, inject it as the reserved
67
// label, so it's used by the remote write client.
68
if xScopeOrgID != "" {
69
// Expose tenant ID through relabel to use as logs or metrics label.
70
lbs.Set(ReservedLabelTenantID, xScopeOrgID)
71
fixedLabels[ReservedLabelTenantID] = model.LabelValue(xScopeOrgID)
72
}
73
74
decodedData, err := base64.StdEncoding.DecodeString(m.Message.Data)
75
if err != nil {
76
return loki.Entry{}, fmt.Errorf("failed to decode data: %w", err)
77
}
78
79
entry, err := parseGCPLogsEntry(decodedData, fixedLabels, lbs.Labels(nil), useIncomingTimestamp, relabelConfigs)
80
if err != nil {
81
return loki.Entry{}, fmt.Errorf("failed to parse logs entry: %w", err)
82
}
83
84
return entry, nil
85
}
86
87
var separatorCharacterReplacer = strings.NewReplacer(".", "_", "-", "_", "/", "_")
88
89
// convertToLokiCompatibleLabel converts an incoming GCP Push message label to
90
// a loki compatible format. There are labels such as
91
// `logging.googleapis.com/timestamp`, which contain non-loki-compatible
92
// characters, which is just alphanumeric and _. The approach taken is to
93
// translate every non-alphanumeric separator character to an underscore.
94
func convertToLokiCompatibleLabel(label string) string {
95
return util.SnakeCase(separatorCharacterReplacer.Replace(label))
96
}
97
98