Path: blob/main/component/prometheus/remotewrite/remote_write.go
4094 views
package remotewrite12import (3"context"4"fmt"5"math"6"os"7"path/filepath"8"sync"9"time"1011"go.uber.org/atomic"1213"github.com/prometheus/prometheus/model/exemplar"14"github.com/prometheus/prometheus/model/labels"15"github.com/prometheus/prometheus/model/metadata"1617"github.com/grafana/agent/component/prometheus"1819"github.com/go-kit/log"20"github.com/go-kit/log/level"21"github.com/grafana/agent/component"22"github.com/grafana/agent/pkg/build"23"github.com/grafana/agent/pkg/metrics/wal"24"github.com/prometheus/prometheus/model/timestamp"25"github.com/prometheus/prometheus/storage"26"github.com/prometheus/prometheus/storage/remote"27)2829// Options.30//31// TODO(rfratto): This should be exposed. How do we want to expose this?32var remoteFlushDeadline = 1 * time.Minute3334func init() {35remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)3637component.Register(component.Registration{38Name: "prometheus.remote_write",39Args: Arguments{},40Exports: Exports{},41Build: func(o component.Options, c component.Arguments) (component.Component, error) {42return NewComponent(o, c.(Arguments))43},44})45}4647// Component is the prometheus.remote_write component.48type Component struct {49log log.Logger50opts component.Options5152walStore *wal.Storage53remoteStore *remote.Storage54storage storage.Storage55exited atomic.Bool5657mut sync.RWMutex58cfg Arguments5960receiver *prometheus.Interceptor61}6263// NewComponent creates a new prometheus.remote_write component.64func NewComponent(o component.Options, c Arguments) (*Component, error) {65// Older versions of prometheus.remote_write used the subpath below, which66// added in too many extra unnecessary directories (since o.DataPath is67// already unique).68//69// We best-effort attempt to delete the old path if it already exists to not70// leak storage space.71oldDataPath := filepath.Join(o.DataPath, "wal", o.ID)72_ = os.RemoveAll(oldDataPath)7374walLogger := log.With(o.Logger, "subcomponent", "wal")75walStorage, err := wal.NewStorage(walLogger, o.Registerer, o.DataPath)76if err != nil {77return nil, err78}7980remoteLogger := log.With(o.Logger, "subcomponent", "rw")81remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil)8283res := &Component{84log: o.Logger,85opts: o,86walStore: walStorage,87remoteStore: remoteStore,88storage: storage.NewFanout(o.Logger, walStorage, remoteStore),89}90res.receiver = prometheus.NewInterceptor(91res.storage,9293// In the methods below, conversion is needed because remote_writes assume94// they are responsible for generating ref IDs. This means two95// remote_writes may return the same ref ID for two different series. We96// treat the remote_write ID as a "local ID" and translate it to a "global97// ID" to ensure Flow compatibility.9899prometheus.WithAppendHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {100if res.exited.Load() {101return 0, fmt.Errorf("%s has exited", o.ID)102}103104localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))105newRef, nextErr := next.Append(storage.SeriesRef(localID), l, t, v)106if localID == 0 {107prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)108}109return globalRef, nextErr110}),111prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {112if res.exited.Load() {113return 0, fmt.Errorf("%s has exited", o.ID)114}115116localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))117newRef, nextErr := next.UpdateMetadata(storage.SeriesRef(localID), l, m)118if localID == 0 {119prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)120}121return globalRef, nextErr122}),123prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {124if res.exited.Load() {125return 0, fmt.Errorf("%s has exited", o.ID)126}127128localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(globalRef))129newRef, nextErr := next.AppendExemplar(storage.SeriesRef(localID), l, e)130if localID == 0 {131prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newRef), l)132}133return globalRef, nextErr134}),135)136137// Immediately export the receiver which remains the same for the component138// lifetime.139o.OnStateChange(Exports{Receiver: res.receiver})140141if err := res.Update(c); err != nil {142return nil, err143}144return res, nil145}146147func startTime() (int64, error) { return 0, nil }148149var _ component.Component = (*Component)(nil)150151// Run implements Component.152func (c *Component) Run(ctx context.Context) error {153defer func() {154c.exited.Store(true)155156level.Debug(c.log).Log("msg", "closing storage")157err := c.storage.Close()158level.Debug(c.log).Log("msg", "storage closed")159if err != nil {160level.Error(c.log).Log("msg", "error when closing storage", "err", err)161}162}()163164// Track the last timestamp we truncated for to prevent segments from getting165// deleted until at least some new data has been sent.166var lastTs = int64(math.MinInt64)167168for {169select {170case <-ctx.Done():171return nil172case <-time.After(c.truncateFrequency()):173// We retrieve the current min/max keepalive time at once, since174// retrieving them separately could lead to issues where we have an older175// value for min which is now larger than max.176c.mut.RLock()177var (178minWALTime = c.cfg.WALOptions.MinKeepaliveTime179maxWALTime = c.cfg.WALOptions.MaxKeepaliveTime180)181c.mut.RUnlock()182183// The timestamp ts is used to determine which series are not receiving184// samples and may be deleted from the WAL. Their most recent append185// timestamp is compared to ts, and if that timestamp is older than ts,186// they are considered inactive and may be deleted.187//188// Subtracting a duration from ts will delay when it will be considered189// inactive and scheduled for deletion.190ts := c.remoteStore.LowestSentTimestamp() - minWALTime.Milliseconds()191if ts < 0 {192ts = 0193}194195// Network issues can prevent the result of getRemoteWriteTimestamp from196// changing. We don't want data in the WAL to grow forever, so we set a cap197// on the maximum age data can be. If our ts is older than this cutoff point,198// we'll shift it forward to start deleting very stale data.199if maxTS := timestamp.FromTime(time.Now().Add(-maxWALTime)); ts < maxTS {200ts = maxTS201}202203if ts == lastTs {204level.Debug(c.log).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts)205continue206}207lastTs = ts208209level.Debug(c.log).Log("msg", "truncating the WAL", "ts", ts)210err := c.walStore.Truncate(ts)211if err != nil {212// The only issue here is larger disk usage and a greater replay time,213// so we'll only log this as a warning.214level.Warn(c.log).Log("msg", "could not truncate WAL", "err", err)215}216}217}218}219220func (c *Component) truncateFrequency() time.Duration {221c.mut.RLock()222defer c.mut.RUnlock()223return c.cfg.WALOptions.TruncateFrequency224}225226// Update implements Component.227func (c *Component) Update(newConfig component.Arguments) error {228cfg := newConfig.(Arguments)229230c.mut.Lock()231defer c.mut.Unlock()232233convertedConfig, err := convertConfigs(cfg)234if err != nil {235return err236}237err = c.remoteStore.ApplyConfig(convertedConfig)238if err != nil {239return err240}241242c.cfg = cfg243return nil244}245246247