Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/remotewrite/remote_write.go
4094 views
1
package remotewrite
2
3
import (
4
"context"
5
"fmt"
6
"math"
7
"os"
8
"path/filepath"
9
"sync"
10
"time"
11
12
"go.uber.org/atomic"
13
14
"github.com/prometheus/prometheus/model/exemplar"
15
"github.com/prometheus/prometheus/model/labels"
16
"github.com/prometheus/prometheus/model/metadata"
17
18
"github.com/grafana/agent/component/prometheus"
19
20
"github.com/go-kit/log"
21
"github.com/go-kit/log/level"
22
"github.com/grafana/agent/component"
23
"github.com/grafana/agent/pkg/build"
24
"github.com/grafana/agent/pkg/metrics/wal"
25
"github.com/prometheus/prometheus/model/timestamp"
26
"github.com/prometheus/prometheus/storage"
27
"github.com/prometheus/prometheus/storage/remote"
28
)
29
30
// Options.
31
//
32
// TODO(rfratto): This should be exposed. How do we want to expose this?
33
var remoteFlushDeadline = 1 * time.Minute
34
35
func init() {
36
remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
37
38
component.Register(component.Registration{
39
Name: "prometheus.remote_write",
40
Args: Arguments{},
41
Exports: Exports{},
42
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
43
return NewComponent(o, c.(Arguments))
44
},
45
})
46
}
47
48
// Component is the prometheus.remote_write component.
49
type Component struct {
50
log log.Logger
51
opts component.Options
52
53
walStore *wal.Storage
54
remoteStore *remote.Storage
55
storage storage.Storage
56
exited atomic.Bool
57
58
mut sync.RWMutex
59
cfg Arguments
60
61
receiver *prometheus.Interceptor
62
}
63
64
// NewComponent creates a new prometheus.remote_write component.
65
func NewComponent(o component.Options, c Arguments) (*Component, error) {
66
// Older versions of prometheus.remote_write used the subpath below, which
67
// added in too many extra unnecessary directories (since o.DataPath is
68
// already unique).
69
//
70
// We best-effort attempt to delete the old path if it already exists to not
71
// leak storage space.
72
oldDataPath := filepath.Join(o.DataPath, "wal", o.ID)
73
_ = os.RemoveAll(oldDataPath)
74
75
walLogger := log.With(o.Logger, "subcomponent", "wal")
76
walStorage, err := wal.NewStorage(walLogger, o.Registerer, o.DataPath)
77
if err != nil {
78
return nil, err
79
}
80
81
remoteLogger := log.With(o.Logger, "subcomponent", "rw")
82
remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil)
83
84
res := &Component{
85
log: o.Logger,
86
opts: o,
87
walStore: walStorage,
88
remoteStore: remoteStore,
89
storage: storage.NewFanout(o.Logger, walStorage, remoteStore),
90
}
91
res.receiver = prometheus.NewInterceptor(
92
res.storage,
93
94
// In the methods below, conversion is needed because remote_writes assume
95
// they are responsible for generating ref IDs. This means two
96
// remote_writes may return the same ref ID for two different series. We
97
// treat the remote_write ID as a "local ID" and translate it to a "global
98
// ID" to ensure Flow compatibility.
99
100
prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
101
if res.exited.Load() {
102
return 0, fmt.Errorf("%s has exited", o.ID)
103
}
104
105
localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))
106
newRef, nextErr := next.Append(storage.SeriesRef(localID), l, t, v)
107
if localID == 0 {
108
prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)
109
}
110
return globalRef, nextErr
111
}),
112
prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {
113
if res.exited.Load() {
114
return 0, fmt.Errorf("%s has exited", o.ID)
115
}
116
117
localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))
118
newRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localID), l, m)
119
if localID == 0 {
120
prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)
121
}
122
return globalRef, nextErr
123
}),
124
prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {
125
if res.exited.Load() {
126
return 0, fmt.Errorf("%s has exited", o.ID)
127
}
128
129
localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))
130
newRef, nextErr := next.AppendExemplar(storage.SeriesRef(localID), l, e)
131
if localID == 0 {
132
prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)
133
}
134
return globalRef, nextErr
135
}),
136
)
137
138
// Immediately export the receiver which remains the same for the component
139
// lifetime.
140
o.OnStateChange(Exports{Receiver: res.receiver})
141
142
if err := res.Update(c); err != nil {
143
return nil, err
144
}
145
return res, nil
146
}
147
148
func startTime() (int64, error) { return 0, nil }
149
150
var _ component.Component = (*Component)(nil)
151
152
// Run implements Component.
153
func (c *Component) Run(ctx context.Context) error {
154
defer func() {
155
c.exited.Store(true)
156
157
level.Debug(c.log).Log("msg", "closing storage")
158
err := c.storage.Close()
159
level.Debug(c.log).Log("msg", "storage closed")
160
if err != nil {
161
level.Error(c.log).Log("msg", "error when closing storage", "err", err)
162
}
163
}()
164
165
// Track the last timestamp we truncated for to prevent segments from getting
166
// deleted until at least some new data has been sent.
167
var lastTs = int64(math.MinInt64)
168
169
for {
170
select {
171
case <-ctx.Done():
172
return nil
173
case <-time.After(c.truncateFrequency()):
174
// We retrieve the current min/max keepalive time at once, since
175
// retrieving them separately could lead to issues where we have an older
176
// value for min which is now larger than max.
177
c.mut.RLock()
178
var (
179
minWALTime = c.cfg.WALOptions.MinKeepaliveTime
180
maxWALTime = c.cfg.WALOptions.MaxKeepaliveTime
181
)
182
c.mut.RUnlock()
183
184
// The timestamp ts is used to determine which series are not receiving
185
// samples and may be deleted from the WAL. Their most recent append
186
// timestamp is compared to ts, and if that timestamp is older than ts,
187
// they are considered inactive and may be deleted.
188
//
189
// Subtracting a duration from ts will delay when it will be considered
190
// inactive and scheduled for deletion.
191
ts := c.remoteStore.LowestSentTimestamp() - minWALTime.Milliseconds()
192
if ts < 0 {
193
ts = 0
194
}
195
196
// Network issues can prevent the result of getRemoteWriteTimestamp from
197
// changing. We don't want data in the WAL to grow forever, so we set a cap
198
// on the maximum age data can be. If our ts is older than this cutoff point,
199
// we'll shift it forward to start deleting very stale data.
200
if maxTS := timestamp.FromTime(time.Now().Add(-maxWALTime)); ts < maxTS {
201
ts = maxTS
202
}
203
204
if ts == lastTs {
205
level.Debug(c.log).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts)
206
continue
207
}
208
lastTs = ts
209
210
level.Debug(c.log).Log("msg", "truncating the WAL", "ts", ts)
211
err := c.walStore.Truncate(ts)
212
if err != nil {
213
// The only issue here is larger disk usage and a greater replay time,
214
// so we'll only log this as a warning.
215
level.Warn(c.log).Log("msg", "could not truncate WAL", "err", err)
216
}
217
}
218
}
219
}
220
221
func (c *Component) truncateFrequency() time.Duration {
222
c.mut.RLock()
223
defer c.mut.RUnlock()
224
return c.cfg.WALOptions.TruncateFrequency
225
}
226
227
// Update implements Component.
228
func (c *Component) Update(newConfig component.Arguments) error {
229
cfg := newConfig.(Arguments)
230
231
c.mut.Lock()
232
defer c.mut.Unlock()
233
234
convertedConfig, err := convertConfigs(cfg)
235
if err != nil {
236
return err
237
}
238
err = c.remoteStore.ApplyConfig(convertedConfig)
239
if err != nil {
240
return err
241
}
242
243
c.cfg = cfg
244
return nil
245
}
246
247