Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go
4096 views
1
package gcplogtarget
2
3
// This code is copied from Promtail. The gcplogtarget package is used to
4
// configure and run the targets that can read log entries from cloud resource
5
// logs like bucket logs, load balancer logs, and Kubernetes cluster logs
6
// from GCP.
7
8
import (
9
"context"
10
"io"
11
"sync"
12
"time"
13
14
"cloud.google.com/go/pubsub"
15
"github.com/go-kit/log"
16
"github.com/go-kit/log/level"
17
"github.com/grafana/agent/component/common/loki"
18
"github.com/grafana/dskit/backoff"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/relabel"
21
"google.golang.org/api/option"
22
)
23
24
// PullTarget represents a target that scrapes logs from a GCP project id and
25
// subscription and converts them to Loki log entries.
26
type PullTarget struct {
27
metrics *Metrics
28
logger log.Logger
29
handler loki.EntryHandler
30
config *PullConfig
31
relabelConfig []*relabel.Config
32
jobName string
33
34
// lifecycle management
35
ctx context.Context
36
cancel context.CancelFunc
37
wg sync.WaitGroup
38
backoff *backoff.Backoff
39
40
// pubsub
41
ps io.Closer
42
sub pubsubSubscription
43
msgs chan *pubsub.Message
44
}
45
46
// TODO(@tpaschalis) Expose this as River configuration in the future.
47
var defaultBackoff = backoff.Config{
48
MinBackoff: 1 * time.Second,
49
MaxBackoff: 10 * time.Second,
50
MaxRetries: 0, // Retry forever
51
}
52
53
// pubsubSubscription allows us to mock pubsub for testing
54
type pubsubSubscription interface {
55
Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
56
}
57
58
// NewPullTarget returns the new instance of PullTarget.
59
func NewPullTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PullConfig, relabel []*relabel.Config, clientOptions ...option.ClientOption) (*PullTarget, error) {
60
ctx, cancel := context.WithCancel(context.Background())
61
ps, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...)
62
if err != nil {
63
cancel()
64
return nil, err
65
}
66
67
target := &PullTarget{
68
metrics: metrics,
69
logger: logger,
70
handler: handler,
71
relabelConfig: relabel,
72
config: config,
73
jobName: jobName,
74
ctx: ctx,
75
cancel: cancel,
76
ps: ps,
77
sub: ps.SubscriptionInProject(config.Subscription, config.ProjectID),
78
backoff: backoff.New(ctx, defaultBackoff),
79
msgs: make(chan *pubsub.Message),
80
}
81
82
go func() {
83
err := target.run()
84
if err != nil {
85
level.Error(logger).Log("msg", "loki.source.gcplog pull target shutdown with error", "err", err)
86
}
87
}()
88
89
return target, nil
90
}
91
92
func (t *PullTarget) run() error {
93
t.wg.Add(1)
94
defer t.wg.Done()
95
96
go t.consumeSubscription()
97
98
lbls := make(model.LabelSet, len(t.config.Labels))
99
for k, v := range t.config.Labels {
100
lbls[model.LabelName(k)] = model.LabelValue(v)
101
}
102
103
for {
104
select {
105
case <-t.ctx.Done():
106
return t.ctx.Err()
107
case m := <-t.msgs:
108
entry, err := parseGCPLogsEntry(m.Data, lbls, nil, t.config.UseIncomingTimestamp, t.relabelConfig)
109
if err != nil {
110
level.Error(t.logger).Log("event", "error formating log entry", "cause", err)
111
m.Ack()
112
break
113
}
114
t.handler.Chan() <- entry
115
m.Ack() // Ack only after log is sent.
116
t.metrics.gcplogEntries.WithLabelValues(t.config.ProjectID).Inc()
117
}
118
}
119
}
120
121
func (t *PullTarget) consumeSubscription() {
122
// NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop
123
// It makesense as no more messages will be received.
124
defer t.cancel()
125
126
for t.backoff.Ongoing() {
127
err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {
128
t.msgs <- m
129
t.backoff.Reset()
130
})
131
if err != nil {
132
level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err)
133
t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc()
134
t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime()
135
t.backoff.Wait()
136
}
137
}
138
}
139
140
// Labels return the model.LabelSet that the target applies to log entries.
141
func (t *PullTarget) Labels() model.LabelSet {
142
lbls := make(model.LabelSet, len(t.config.Labels))
143
for k, v := range t.config.Labels {
144
lbls[model.LabelName(k)] = model.LabelValue(v)
145
}
146
return lbls
147
}
148
149
// Details returns some debug information about the target.
150
func (t *PullTarget) Details() map[string]string {
151
return map[string]string{
152
"strategy": "pull",
153
"labels": t.Labels().String(),
154
}
155
}
156
157
// Stop shuts the target down.
158
func (t *PullTarget) Stop() error {
159
t.cancel()
160
t.wg.Wait()
161
t.handler.Stop()
162
t.ps.Close()
163
return nil
164
}
165
166