Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_translation.go
4096 views
package gcplogtarget12// This code is copied from Promtail. The gcplogtarget package is used to3// configure and run the targets that can read log entries from cloud resource4// logs like bucket logs, load balancer logs, and Kubernetes cluster logs5// from GCP.67import (8"encoding/base64"9"fmt"10"strings"1112"github.com/grafana/agent/component/common/loki"13"github.com/grafana/loki/pkg/util"14"github.com/prometheus/common/model"15"github.com/prometheus/prometheus/model/labels"16"github.com/prometheus/prometheus/model/relabel"17)1819// ReservedLabelTenantID reserved to override the tenant ID while processing20// pipeline stages21const ReservedLabelTenantID = "__tenant_id__"2223// PushMessage is the POST body format sent by GCP PubSub push subscriptions.24// See https://cloud.google.com/pubsub/docs/push for details.25type PushMessage struct {26Message struct {27Attributes map[string]string `json:"attributes"`28Data string `json:"data"`29ID string `json:"message_id"`30PublishTimestamp string `json:"publish_time"`31} `json:"message"`32Subscription string `json:"subscription"`33}3435// Validate checks that the required fields of a PushMessage are set.36func (pm PushMessage) Validate() error {37if pm.Message.Data == "" {38return fmt.Errorf("push message has no data")39}40if pm.Message.ID == "" {41return fmt.Errorf("push message has no ID")42}43if pm.Subscription == "" {44return fmt.Errorf("push message has no subscription")45}46return nil47}4849// translate converts a GCP PushMessage into a loki.Entry. It parses the50// push-specific labels and delegates the rest to parseGCPLogsEntry.51func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (loki.Entry, error) {52// Collect all push-specific labels. Every one of them is first configured53// as optional, and the user can relabel it if needed. The relabeling and54// internal drop is handled in parseGCPLogsEntry.55lbs := labels.NewBuilder(nil)56lbs.Set("__gcp_message_id", m.Message.ID)57lbs.Set("__gcp_subscription_name", m.Subscription)58for k, v := range m.Message.Attributes {59lbs.Set(fmt.Sprintf("__gcp_attributes_%s", convertToLokiCompatibleLabel(k)), v)60}6162// Add fixed labels coming from the target configuration63fixedLabels := other.Clone()6465// If the incoming request carries the tenant id, inject it as the reserved66// label, so it's used by the remote write client.67if xScopeOrgID != "" {68// Expose tenant ID through relabel to use as logs or metrics label.69lbs.Set(ReservedLabelTenantID, xScopeOrgID)70fixedLabels[ReservedLabelTenantID] = model.LabelValue(xScopeOrgID)71}7273decodedData, err := base64.StdEncoding.DecodeString(m.Message.Data)74if err != nil {75return loki.Entry{}, fmt.Errorf("failed to decode data: %w", err)76}7778entry, err := parseGCPLogsEntry(decodedData, fixedLabels, lbs.Labels(nil), useIncomingTimestamp, relabelConfigs)79if err != nil {80return loki.Entry{}, fmt.Errorf("failed to parse logs entry: %w", err)81}8283return entry, nil84}8586var separatorCharacterReplacer = strings.NewReplacer(".", "_", "-", "_", "/", "_")8788// convertToLokiCompatibleLabel converts an incoming GCP Push message label to89// a loki compatible format. There are labels such as90// `logging.googleapis.com/timestamp`, which contain non-loki-compatible91// characters, which is just alphanumeric and _. The approach taken is to92// translate every non-alphanumeric separator character to an underscore.93func convertToLokiCompatibleLabel(label string) string {94return util.SnakeCase(separatorCharacterReplacer.Replace(label))95}969798