Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/formatter.go
4096 views
package gcplogtarget12// This code is copied from Promtail. The gcplogtarget package is used to3// configure and run the targets that can read log entries from cloud resource4// logs like bucket logs, load balancer logs, and Kubernetes cluster logs5// from GCP.67import (8"fmt"9"strings"10"time"1112"github.com/grafana/agent/component/common/loki"13"github.com/grafana/loki/pkg/logproto"14"github.com/grafana/loki/pkg/util"15json "github.com/json-iterator/go"16"github.com/prometheus/common/model"17"github.com/prometheus/prometheus/model/labels"18"github.com/prometheus/prometheus/model/relabel"19)2021// GCPLogEntry that will be written to the pubsub topic according to the following spec.22// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry23type GCPLogEntry struct {24// why was this here?? nolint:revive25LogName string `json:"logName"`26Resource struct {27Type string `json:"type"`28Labels map[string]string `json:"labels"`29} `json:"resource"`30Timestamp string `json:"timestamp"`3132// The time the log entry was received by Logging.33// Its important that `Timestamp` is optional in GCE log entry.34ReceiveTimestamp string `json:"receiveTimestamp"`3536TextPayload string `json:"textPayload"`3738// NOTE(kavi): There are other fields on GCPLogEntry. but we need only need39// above fields for now anyway we will be sending the entire entry to Loki.40}4142func parseGCPLogsEntry(data []byte, other model.LabelSet, otherInternal labels.Labels, useIncomingTimestamp bool, relabelConfig []*relabel.Config) (loki.Entry, error) {43var ge GCPLogEntry4445if err := json.Unmarshal(data, &ge); err != nil {46return loki.Entry{}, err47}4849// Adding mandatory labels for gcplog50lbs := labels.NewBuilder(otherInternal)51lbs.Set("__gcp_logname", ge.LogName)52lbs.Set("__gcp_resource_type", ge.Resource.Type)5354// labels from gcp log entry. Add it as internal labels55for k, v := range ge.Resource.Labels {56lbs.Set("__gcp_resource_labels_"+util.SnakeCase(k), v)57}5859var processed labels.Labels6061// apply relabeling62if len(relabelConfig) > 0 {63processed, _ = relabel.Process(lbs.Labels(nil), relabelConfig...)64} else {65processed = lbs.Labels(nil)66}6768// final labelset that will be sent to loki69labels := make(model.LabelSet)70for _, lbl := range processed {71// ignore internal labels72if strings.HasPrefix(lbl.Name, "__") {73continue74}75// ignore invalid labels76if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() {77continue78}79labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)80}8182// add labels coming from scrapeconfig83labels = labels.Merge(other)8485ts := time.Now()86line := string(data)8788if useIncomingTimestamp {89tt := ge.Timestamp90if tt == "" {91tt = ge.ReceiveTimestamp92}93var err error94ts, err = time.Parse(time.RFC3339, tt)95if err != nil {96return loki.Entry{}, fmt.Errorf("invalid timestamp format: %w", err)97}9899if ts.IsZero() {100return loki.Entry{}, fmt.Errorf("no timestamp found in the log entry")101}102}103104// Send only `ge.textPayload` as log line if its present.105if strings.TrimSpace(ge.TextPayload) != "" {106line = ge.TextPayload107}108109return loki.Entry{110Labels: labels,111Entry: logproto.Entry{112Timestamp: ts,113Line: line,114},115}, nil116}117118119