Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/instance.go
4094 views
1
// Package instance provides a mini Prometheus scraper and remote_writer.
2
package instance
3
4
import (
5
"bytes"
6
"context"
7
"crypto/md5"
8
"encoding/hex"
9
"encoding/json"
10
"errors"
11
"fmt"
12
"math"
13
"os"
14
"path/filepath"
15
"sync"
16
"time"
17
18
"github.com/go-kit/log"
19
"github.com/go-kit/log/level"
20
"github.com/grafana/agent/pkg/build"
21
"github.com/grafana/agent/pkg/metrics/wal"
22
"github.com/grafana/agent/pkg/util"
23
"github.com/oklog/run"
24
"github.com/prometheus/client_golang/prometheus"
25
config_util "github.com/prometheus/common/config"
26
"github.com/prometheus/prometheus/config"
27
"github.com/prometheus/prometheus/discovery"
28
"github.com/prometheus/prometheus/model/relabel"
29
"github.com/prometheus/prometheus/model/timestamp"
30
"github.com/prometheus/prometheus/scrape"
31
"github.com/prometheus/prometheus/storage"
32
"github.com/prometheus/prometheus/storage/remote"
33
"go.uber.org/atomic"
34
"gopkg.in/yaml.v2"
35
)
36
37
func init() {
38
remote.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
39
scrape.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
40
41
// default remote_write send_exemplars to true
42
config.DefaultRemoteWriteConfig.SendExemplars = true
43
}
44
45
// Default configuration values
46
var (
47
DefaultConfig = Config{
48
HostFilter: false,
49
WALTruncateFrequency: 60 * time.Minute,
50
MinWALTime: 5 * time.Minute,
51
MaxWALTime: 4 * time.Hour,
52
RemoteFlushDeadline: 1 * time.Minute,
53
WriteStaleOnShutdown: false,
54
global: DefaultGlobalConfig,
55
}
56
)
57
58
// Config is a specific agent that runs within the overall Prometheus
59
// agent. It has its own set of scrape_configs and remote_write rules.
60
type Config struct {
61
Name string `yaml:"name,omitempty"`
62
HostFilter bool `yaml:"host_filter,omitempty"`
63
HostFilterRelabelConfigs []*relabel.Config `yaml:"host_filter_relabel_configs,omitempty"`
64
ScrapeConfigs []*config.ScrapeConfig `yaml:"scrape_configs,omitempty"`
65
RemoteWrite []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"`
66
67
// How frequently the WAL should be truncated.
68
WALTruncateFrequency time.Duration `yaml:"wal_truncate_frequency,omitempty"`
69
70
// Minimum and maximum time series should exist in the WAL for.
71
MinWALTime time.Duration `yaml:"min_wal_time,omitempty"`
72
MaxWALTime time.Duration `yaml:"max_wal_time,omitempty"`
73
74
RemoteFlushDeadline time.Duration `yaml:"remote_flush_deadline,omitempty"`
75
WriteStaleOnShutdown bool `yaml:"write_stale_on_shutdown,omitempty"`
76
77
global GlobalConfig `yaml:"-"`
78
}
79
80
// UnmarshalYAML implements yaml.Unmarshaler.
81
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
82
*c = DefaultConfig
83
84
type plain Config
85
return unmarshal((*plain)(c))
86
}
87
88
// MarshalYAML implements yaml.Marshaler.
89
func (c Config) MarshalYAML() (interface{}, error) {
90
// We want users to be able to marshal instance.Configs directly without
91
// *needing* to call instance.MarshalConfig, so we call it internally
92
// here and return a map.
93
bb, err := MarshalConfig(&c, true)
94
if err != nil {
95
return nil, err
96
}
97
98
// Use a yaml.MapSlice rather than a map[string]interface{} so
99
// order of keys is retained compared to just calling MarshalConfig.
100
var m yaml.MapSlice
101
if err := yaml.Unmarshal(bb, &m); err != nil {
102
return nil, err
103
}
104
return m, nil
105
}
106
107
// ApplyDefaults applies default configurations to the configuration to all
108
// values that have not been changed to their non-zero value. ApplyDefaults
109
// also validates the config.
110
//
111
// The value for global will be saved.
112
func (c *Config) ApplyDefaults(global GlobalConfig) error {
113
c.global = global
114
115
switch {
116
case c.Name == "":
117
return errors.New("missing instance name")
118
case c.WALTruncateFrequency <= 0:
119
return errors.New("wal_truncate_frequency must be greater than 0s")
120
case c.RemoteFlushDeadline <= 0:
121
return errors.New("remote_flush_deadline must be greater than 0s")
122
case c.MinWALTime > c.MaxWALTime:
123
return errors.New("min_wal_time must be less than max_wal_time")
124
}
125
126
jobNames := map[string]struct{}{}
127
for _, sc := range c.ScrapeConfigs {
128
if sc == nil {
129
return fmt.Errorf("empty or null scrape config section")
130
}
131
132
// First set the correct scrape interval, then check that the timeout
133
// (inferred or explicit) is not greater than that.
134
if sc.ScrapeInterval == 0 {
135
sc.ScrapeInterval = c.global.Prometheus.ScrapeInterval
136
}
137
if sc.ScrapeTimeout > sc.ScrapeInterval {
138
return fmt.Errorf("scrape timeout greater than scrape interval for scrape config with job name %q", sc.JobName)
139
}
140
if time.Duration(sc.ScrapeInterval) > c.WALTruncateFrequency {
141
return fmt.Errorf("scrape interval greater than wal_truncate_frequency for scrape config with job name %q", sc.JobName)
142
}
143
if sc.ScrapeTimeout == 0 {
144
if c.global.Prometheus.ScrapeTimeout > sc.ScrapeInterval {
145
sc.ScrapeTimeout = sc.ScrapeInterval
146
} else {
147
sc.ScrapeTimeout = c.global.Prometheus.ScrapeTimeout
148
}
149
}
150
151
if _, exists := jobNames[sc.JobName]; exists {
152
return fmt.Errorf("found multiple scrape configs with job name %q", sc.JobName)
153
}
154
jobNames[sc.JobName] = struct{}{}
155
}
156
157
rwNames := map[string]struct{}{}
158
159
// If the instance remote write is not filled in, then apply the prometheus write config
160
if len(c.RemoteWrite) == 0 {
161
c.RemoteWrite = c.global.RemoteWrite
162
}
163
for _, cfg := range c.RemoteWrite {
164
if cfg == nil {
165
return fmt.Errorf("empty or null remote write config section")
166
}
167
168
// Typically Prometheus ignores empty names here, but we need to assign a
169
// unique name to the config so we can pull metrics from it when running
170
// an instance.
171
var generatedName bool
172
if cfg.Name == "" {
173
hash, err := getHash(cfg)
174
if err != nil {
175
return err
176
}
177
178
// We have to add the name of the instance to ensure that generated metrics
179
// are unique across multiple agent instances. The remote write queues currently
180
// globally register their metrics so we can't inject labels here.
181
cfg.Name = c.Name + "-" + hash[:6]
182
generatedName = true
183
}
184
185
if _, exists := rwNames[cfg.Name]; exists {
186
if generatedName {
187
return fmt.Errorf("found two identical remote_write configs")
188
}
189
return fmt.Errorf("found duplicate remote write configs with name %q", cfg.Name)
190
}
191
rwNames[cfg.Name] = struct{}{}
192
}
193
194
return nil
195
}
196
197
// Clone makes a deep copy of the config along with global settings.
198
func (c *Config) Clone() (Config, error) {
199
bb, err := MarshalConfig(c, false)
200
if err != nil {
201
return Config{}, err
202
}
203
cp, err := UnmarshalConfig(bytes.NewReader(bb))
204
if err != nil {
205
return Config{}, err
206
}
207
cp.global = c.global
208
209
// Some tests will trip up on this; the marshal/unmarshal cycle might set
210
// an empty slice to nil. Set it back to an empty slice if we detect this
211
// happening.
212
if cp.ScrapeConfigs == nil && c.ScrapeConfigs != nil {
213
cp.ScrapeConfigs = []*config.ScrapeConfig{}
214
}
215
if cp.RemoteWrite == nil && c.RemoteWrite != nil {
216
cp.RemoteWrite = []*config.RemoteWriteConfig{}
217
}
218
219
return *cp, nil
220
}
221
222
type walStorageFactory func(reg prometheus.Registerer) (walStorage, error)
223
224
// Instance is an individual metrics collector and remote_writer.
225
type Instance struct {
226
// All fields in the following block may be accessed and modified by
227
// concurrently running goroutines.
228
//
229
// Note that all Prometheus components listed here may be nil at any
230
// given time; methods reading them should take care to do nil checks.
231
mut sync.Mutex
232
cfg Config
233
wal walStorage
234
discovery *discoveryService
235
readyScrapeManager *readyScrapeManager
236
remoteStore *remote.Storage
237
storage storage.Storage
238
239
// ready is set to true after the initialization process finishes
240
ready atomic.Bool
241
242
hostFilter *HostFilter
243
244
logger log.Logger
245
246
reg prometheus.Registerer
247
newWal walStorageFactory
248
}
249
250
// New creates a new Instance with a directory for storing the WAL. The instance
251
// will not start until Run is called on the instance.
252
func New(reg prometheus.Registerer, cfg Config, walDir string, logger log.Logger) (*Instance, error) {
253
logger = log.With(logger, "instance", cfg.Name)
254
255
instWALDir := filepath.Join(walDir, cfg.Name)
256
257
newWal := func(reg prometheus.Registerer) (walStorage, error) {
258
return wal.NewStorage(logger, reg, instWALDir)
259
}
260
261
return newInstance(cfg, reg, logger, newWal)
262
}
263
264
func newInstance(cfg Config, reg prometheus.Registerer, logger log.Logger, newWal walStorageFactory) (*Instance, error) {
265
hostname, err := Hostname()
266
if err != nil {
267
return nil, fmt.Errorf("failed to get hostname: %w", err)
268
}
269
270
i := &Instance{
271
cfg: cfg,
272
logger: logger,
273
hostFilter: NewHostFilter(hostname, cfg.HostFilterRelabelConfigs),
274
275
reg: reg,
276
newWal: newWal,
277
278
readyScrapeManager: &readyScrapeManager{},
279
}
280
281
return i, nil
282
}
283
284
// Run starts the instance, initializing Prometheus components, and will
285
// continue to run until an error happens during execution or the provided
286
// context is cancelled.
287
//
288
// Run may be re-called after exiting, as components will be reinitialized each
289
// time Run is called.
290
func (i *Instance) Run(ctx context.Context) error {
291
// i.cfg may change at any point in the middle of this method but not in a way
292
// that affects any of the code below; rather than grabbing a mutex every time
293
// we want to read the config, we'll simplify the access and just grab a copy
294
// now.
295
i.mut.Lock()
296
cfg := i.cfg
297
i.mut.Unlock()
298
299
level.Debug(i.logger).Log("msg", "initializing instance", "name", cfg.Name)
300
301
// trackingReg wraps the register for the instance to make sure that if Run
302
// exits, any metrics Prometheus registers are removed and can be
303
// re-registered if Run is called again.
304
trackingReg := util.WrapWithUnregisterer(i.reg)
305
defer trackingReg.UnregisterAll()
306
307
if err := i.initialize(ctx, trackingReg, &cfg); err != nil {
308
level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err)
309
return fmt.Errorf("failed to initialize instance: %w", err)
310
}
311
312
// The actors defined here are defined in the order we want them to shut down.
313
// Primarily, we want to ensure that the following shutdown order is
314
// maintained:
315
// 1. The scrape manager stops
316
// 2. WAL storage is closed
317
// 3. Remote write storage is closed
318
// This is done to allow the instance to write stale markers for all active
319
// series.
320
rg := runGroupWithContext(ctx)
321
322
{
323
// Target Discovery
324
rg.Add(i.discovery.Run, i.discovery.Stop)
325
}
326
{
327
// Truncation loop
328
ctx, contextCancel := context.WithCancel(context.Background())
329
defer contextCancel()
330
rg.Add(
331
func() error {
332
i.truncateLoop(ctx, i.wal, &cfg)
333
level.Info(i.logger).Log("msg", "truncation loop stopped")
334
return nil
335
},
336
func(err error) {
337
level.Info(i.logger).Log("msg", "stopping truncation loop...")
338
contextCancel()
339
},
340
)
341
}
342
{
343
sm, err := i.readyScrapeManager.Get()
344
if err != nil {
345
level.Error(i.logger).Log("msg", "failed to get scrape manager")
346
return err
347
}
348
349
// Scrape manager
350
rg.Add(
351
func() error {
352
err := sm.Run(i.discovery.SyncCh())
353
level.Info(i.logger).Log("msg", "scrape manager stopped")
354
return err
355
},
356
func(err error) {
357
// The scrape manager is closed first to allow us to write staleness
358
// markers without receiving new samples from scraping in the meantime.
359
level.Info(i.logger).Log("msg", "stopping scrape manager...")
360
sm.Stop()
361
362
// On a graceful shutdown, write staleness markers. If something went
363
// wrong, then the instance will be relaunched.
364
if err == nil && cfg.WriteStaleOnShutdown {
365
level.Info(i.logger).Log("msg", "writing staleness markers...")
366
err := i.wal.WriteStalenessMarkers(i.getRemoteWriteTimestamp)
367
if err != nil {
368
level.Error(i.logger).Log("msg", "error writing staleness markers", "err", err)
369
}
370
}
371
372
// Closing the storage closes both the WAL storage and remote wrte
373
// storage.
374
level.Info(i.logger).Log("msg", "closing storage...")
375
if err := i.storage.Close(); err != nil {
376
level.Error(i.logger).Log("msg", "error stopping storage", "err", err)
377
}
378
},
379
)
380
}
381
382
level.Debug(i.logger).Log("msg", "running instance", "name", cfg.Name)
383
i.ready.Store(true)
384
err := rg.Run()
385
if err != nil {
386
level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err)
387
}
388
return err
389
}
390
391
// initialize sets up the various Prometheus components with their initial
392
// settings. initialize will be called each time the Instance is run. Prometheus
393
// components cannot be reused after they are stopped so we need to recreate them
394
// each run.
395
func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer, cfg *Config) error {
396
i.mut.Lock()
397
defer i.mut.Unlock()
398
399
if cfg.HostFilter {
400
i.hostFilter.PatchSD(cfg.ScrapeConfigs)
401
}
402
403
var err error
404
405
i.wal, err = i.newWal(reg)
406
if err != nil {
407
return fmt.Errorf("error creating WAL: %w", err)
408
}
409
410
i.discovery, err = i.newDiscoveryManager(ctx, cfg)
411
if err != nil {
412
return fmt.Errorf("error creating discovery manager: %w", err)
413
}
414
415
i.readyScrapeManager = &readyScrapeManager{}
416
417
// Set up the remote storage
418
remoteLogger := log.With(i.logger, "component", "remote")
419
i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), cfg.RemoteFlushDeadline, i.readyScrapeManager)
420
err = i.remoteStore.ApplyConfig(&config.Config{
421
GlobalConfig: cfg.global.Prometheus,
422
RemoteWriteConfigs: cfg.RemoteWrite,
423
})
424
if err != nil {
425
return fmt.Errorf("failed applying config to remote storage: %w", err)
426
}
427
428
i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore)
429
430
opts := &scrape.Options{
431
ExtraMetrics: cfg.global.ExtraMetrics,
432
HTTPClientOptions: []config_util.HTTPClientOption{},
433
}
434
435
if cfg.global.DisableKeepAlives {
436
opts.HTTPClientOptions = append(opts.HTTPClientOptions, config_util.WithKeepAlivesDisabled())
437
}
438
if cfg.global.IdleConnTimeout > 0 {
439
opts.HTTPClientOptions = append(opts.HTTPClientOptions, config_util.WithIdleConnTimeout(cfg.global.IdleConnTimeout))
440
}
441
scrapeManager := newScrapeManager(opts, log.With(i.logger, "component", "scrape manager"), i.storage)
442
err = scrapeManager.ApplyConfig(&config.Config{
443
GlobalConfig: cfg.global.Prometheus,
444
ScrapeConfigs: cfg.ScrapeConfigs,
445
})
446
if err != nil {
447
return fmt.Errorf("failed applying config to scrape manager: %w", err)
448
}
449
450
i.readyScrapeManager.Set(scrapeManager)
451
452
return nil
453
}
454
455
// Ready returns true if the Instance has been initialized and is ready
456
// to start scraping and delivering metrics.
457
func (i *Instance) Ready() bool {
458
return i.ready.Load()
459
}
460
461
// Update accepts a new Config for the Instance and will dynamically update any
462
// running Prometheus components with the new values from Config. Update will
463
// return an ErrInvalidUpdate if the Update could not be applied.
464
func (i *Instance) Update(c Config) (err error) {
465
i.mut.Lock()
466
defer i.mut.Unlock()
467
468
// It's only (currently) valid to update scrape_configs and remote_write, so
469
// if any other field has changed here, return the error.
470
switch {
471
// This first case should never happen in practice but it's included here for
472
// completion’s sake.
473
case i.cfg.Name != c.Name:
474
err = errImmutableField{Field: "name"}
475
case i.cfg.HostFilter != c.HostFilter:
476
err = errImmutableField{Field: "host_filter"}
477
case i.cfg.WALTruncateFrequency != c.WALTruncateFrequency:
478
err = errImmutableField{Field: "wal_truncate_frequency"}
479
case i.cfg.RemoteFlushDeadline != c.RemoteFlushDeadline:
480
err = errImmutableField{Field: "remote_flush_deadline"}
481
case i.cfg.WriteStaleOnShutdown != c.WriteStaleOnShutdown:
482
err = errImmutableField{Field: "write_stale_on_shutdown"}
483
}
484
if err != nil {
485
return ErrInvalidUpdate{Inner: err}
486
}
487
488
// Check to see if the components exist yet.
489
if i.discovery == nil || i.remoteStore == nil || i.readyScrapeManager == nil {
490
return ErrInvalidUpdate{
491
Inner: fmt.Errorf("cannot dynamically update because instance is not running"),
492
}
493
}
494
495
// NOTE(rfratto): Prometheus applies configs in a specific order to ensure
496
// flow from service discovery down to the WAL continues working properly.
497
//
498
// Keep the following order below:
499
//
500
// 1. Local config
501
// 2. Remote Store
502
// 3. Scrape Manager
503
// 4. Discovery Manager
504
505
originalConfig := i.cfg
506
defer func() {
507
if err != nil {
508
i.cfg = originalConfig
509
}
510
}()
511
i.cfg = c
512
513
i.hostFilter.SetRelabels(c.HostFilterRelabelConfigs)
514
if c.HostFilter {
515
// N.B.: only call PatchSD if HostFilter is enabled since it
516
// mutates what targets will be discovered.
517
i.hostFilter.PatchSD(c.ScrapeConfigs)
518
}
519
520
err = i.remoteStore.ApplyConfig(&config.Config{
521
GlobalConfig: c.global.Prometheus,
522
RemoteWriteConfigs: c.RemoteWrite,
523
})
524
if err != nil {
525
return fmt.Errorf("error applying new remote_write configs: %w", err)
526
}
527
528
sm, err := i.readyScrapeManager.Get()
529
if err != nil {
530
return fmt.Errorf("couldn't get scrape manager to apply new scrape configs: %w", err)
531
}
532
err = sm.ApplyConfig(&config.Config{
533
GlobalConfig: c.global.Prometheus,
534
ScrapeConfigs: c.ScrapeConfigs,
535
})
536
if err != nil {
537
return fmt.Errorf("error applying updated configs to scrape manager: %w", err)
538
}
539
540
sdConfigs := map[string]discovery.Configs{}
541
for _, v := range c.ScrapeConfigs {
542
sdConfigs[v.JobName] = v.ServiceDiscoveryConfigs
543
}
544
err = i.discovery.Manager.ApplyConfig(sdConfigs)
545
if err != nil {
546
return fmt.Errorf("failed applying configs to discovery manager: %w", err)
547
}
548
549
return nil
550
}
551
552
// TargetsActive returns the set of active targets from the scrape manager. Returns nil
553
// if the scrape manager is not ready yet.
554
func (i *Instance) TargetsActive() map[string][]*scrape.Target {
555
i.mut.Lock()
556
defer i.mut.Unlock()
557
558
if i.readyScrapeManager == nil {
559
return nil
560
}
561
562
mgr, err := i.readyScrapeManager.Get()
563
if err == ErrNotReady {
564
return nil
565
} else if err != nil {
566
level.Error(i.logger).Log("msg", "failed to get scrape manager when collecting active targets", "err", err)
567
return nil
568
}
569
return mgr.TargetsActive()
570
}
571
572
// StorageDirectory returns the directory where this Instance is writing series
573
// and samples to for the WAL.
574
func (i *Instance) StorageDirectory() string {
575
return i.wal.Directory()
576
}
577
578
// Appender returns a storage.Appender from the instance's WAL
579
func (i *Instance) Appender(ctx context.Context) storage.Appender {
580
return i.wal.Appender(ctx)
581
}
582
583
type discoveryService struct {
584
Manager *discovery.Manager
585
586
RunFunc func() error
587
StopFunc func(err error)
588
SyncChFunc func() GroupChannel
589
}
590
591
func (s *discoveryService) Run() error { return s.RunFunc() }
592
func (s *discoveryService) Stop(err error) { s.StopFunc(err) }
593
func (s *discoveryService) SyncCh() GroupChannel { return s.SyncChFunc() }
594
595
// newDiscoveryManager returns an implementation of a runnable service
596
// that outputs discovered targets to a channel. The implementation
597
// uses the Prometheus Discovery Manager. Targets will be filtered
598
// if the instance is configured to perform host filtering.
599
func (i *Instance) newDiscoveryManager(ctx context.Context, cfg *Config) (*discoveryService, error) {
600
ctx, cancel := context.WithCancel(ctx)
601
602
logger := log.With(i.logger, "component", "discovery manager")
603
manager := discovery.NewManager(ctx, logger, discovery.Name("scrape"))
604
605
// TODO(rfratto): refactor this to a function?
606
// TODO(rfratto): ensure job name name is unique
607
c := map[string]discovery.Configs{}
608
for _, v := range cfg.ScrapeConfigs {
609
c[v.JobName] = v.ServiceDiscoveryConfigs
610
}
611
err := manager.ApplyConfig(c)
612
if err != nil {
613
cancel()
614
level.Error(i.logger).Log("msg", "failed applying config to discovery manager", "err", err)
615
return nil, fmt.Errorf("failed applying config to discovery manager: %w", err)
616
}
617
618
rg := runGroupWithContext(ctx)
619
620
// Run the manager
621
rg.Add(func() error {
622
err := manager.Run()
623
level.Info(i.logger).Log("msg", "discovery manager stopped")
624
return err
625
}, func(err error) {
626
level.Info(i.logger).Log("msg", "stopping discovery manager...")
627
cancel()
628
})
629
630
syncChFunc := manager.SyncCh
631
632
// If host filtering is enabled, run it and use its channel for discovered
633
// targets.
634
if cfg.HostFilter {
635
rg.Add(func() error {
636
i.hostFilter.Run(manager.SyncCh())
637
level.Info(i.logger).Log("msg", "host filterer stopped")
638
return nil
639
}, func(_ error) {
640
level.Info(i.logger).Log("msg", "stopping host filterer...")
641
i.hostFilter.Stop()
642
})
643
644
syncChFunc = i.hostFilter.SyncCh
645
}
646
647
return &discoveryService{
648
Manager: manager,
649
650
RunFunc: rg.Run,
651
StopFunc: rg.Stop,
652
SyncChFunc: syncChFunc,
653
}, nil
654
}
655
656
func (i *Instance) truncateLoop(ctx context.Context, wal walStorage, cfg *Config) {
657
// Track the last timestamp we truncated for to prevent segments from getting
658
// deleted until at least some new data has been sent.
659
var lastTs int64 = math.MinInt64
660
661
for {
662
select {
663
case <-ctx.Done():
664
return
665
case <-time.After(cfg.WALTruncateFrequency):
666
// The timestamp ts is used to determine which series are not receiving
667
// samples and may be deleted from the WAL. Their most recent append
668
// timestamp is compared to ts, and if that timestamp is older than ts,
669
// they are considered inactive and may be deleted.
670
//
671
// Subtracting a duration from ts will delay when it will be considered
672
// inactive and scheduled for deletion.
673
ts := i.getRemoteWriteTimestamp() - i.cfg.MinWALTime.Milliseconds()
674
if ts < 0 {
675
ts = 0
676
}
677
678
// Network issues can prevent the result of getRemoteWriteTimestamp from
679
// changing. We don't want data in the WAL to grow forever, so we set a cap
680
// on the maximum age data can be. If our ts is older than this cutoff point,
681
// we'll shift it forward to start deleting very stale data.
682
if maxTS := timestamp.FromTime(time.Now().Add(-i.cfg.MaxWALTime)); ts < maxTS {
683
ts = maxTS
684
}
685
686
if ts == lastTs {
687
level.Debug(i.logger).Log("msg", "not truncating the WAL, remote_write timestamp is unchanged", "ts", ts)
688
continue
689
}
690
lastTs = ts
691
692
level.Debug(i.logger).Log("msg", "truncating the WAL", "ts", ts)
693
err := wal.Truncate(ts)
694
if err != nil {
695
// The only issue here is larger disk usage and a greater replay time,
696
// so we'll only log this as a warning.
697
level.Warn(i.logger).Log("msg", "could not truncate WAL", "err", err)
698
}
699
}
700
}
701
}
702
703
// getRemoteWriteTimestamp looks up the last successful remote write timestamp.
704
// This is passed to wal.Storage for its truncation. If no remote write sections
705
// are configured, getRemoteWriteTimestamp returns the current time.
706
func (i *Instance) getRemoteWriteTimestamp() int64 {
707
i.mut.Lock()
708
defer i.mut.Unlock()
709
710
if len(i.cfg.RemoteWrite) == 0 {
711
return timestamp.FromTime(time.Now())
712
}
713
714
if i.remoteStore == nil {
715
// Instance still being initialized; start at 0.
716
return 0
717
}
718
return i.remoteStore.LowestSentTimestamp()
719
}
720
721
// walStorage is an interface satisfied by wal.Storage, and created for testing.
722
type walStorage interface {
723
// walStorage implements Queryable/ChunkQueryable for compatibility, but is unused.
724
storage.Queryable
725
storage.ChunkQueryable
726
727
Directory() string
728
729
StartTime() (int64, error)
730
WriteStalenessMarkers(remoteTsFunc func() int64) error
731
Appender(context.Context) storage.Appender
732
Truncate(mint int64) error
733
734
Close() error
735
}
736
737
// Hostname retrieves the hostname identifying the machine the process is
738
// running on. It will return the value of $HOSTNAME, if defined, and fall
739
// back to Go's os.Hostname.
740
func Hostname() (string, error) {
741
hostname := os.Getenv("HOSTNAME")
742
if hostname != "" {
743
return hostname, nil
744
}
745
746
hostname, err := os.Hostname()
747
if err != nil {
748
return "", fmt.Errorf("failed to get hostname: %w", err)
749
}
750
return hostname, nil
751
}
752
753
func getHash(data interface{}) (string, error) {
754
bytes, err := json.Marshal(data)
755
if err != nil {
756
return "", err
757
}
758
hash := md5.Sum(bytes)
759
return hex.EncodeToString(hash[:]), nil
760
}
761
762
var managerMtx sync.Mutex
763
764
func newScrapeManager(o *scrape.Options, logger log.Logger, app storage.Appendable) *scrape.Manager {
765
// scrape.NewManager modifies a global variable in Prometheus. To avoid a
766
// data race of modifying that global, we lock a mutex here briefly.
767
managerMtx.Lock()
768
defer managerMtx.Unlock()
769
return scrape.NewManager(o, logger, app)
770
}
771
772
type runGroupContext struct {
773
cancel context.CancelFunc
774
775
g *run.Group
776
}
777
778
// runGroupWithContext creates a new run.Group that will be stopped if the
779
// context gets canceled in addition to the normal behavior of stopping
780
// when any of the actors stop.
781
func runGroupWithContext(ctx context.Context) *runGroupContext {
782
ctx, cancel := context.WithCancel(ctx)
783
784
var g run.Group
785
g.Add(func() error {
786
<-ctx.Done()
787
return nil
788
}, func(_ error) {
789
cancel()
790
})
791
792
return &runGroupContext{cancel: cancel, g: &g}
793
}
794
795
func (rg *runGroupContext) Add(execute func() error, interrupt func(error)) {
796
rg.g.Add(execute, interrupt)
797
}
798
799
func (rg *runGroupContext) Run() error { return rg.g.Run() }
800
func (rg *runGroupContext) Stop(_ error) { rg.cancel() }
801
802
// ErrNotReady is returned when the scrape manager is used but has not been
803
// initialized yet.
804
var ErrNotReady = errors.New("Scrape manager not ready")
805
806
// readyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time.
807
type readyScrapeManager struct {
808
mtx sync.RWMutex
809
m *scrape.Manager
810
}
811
812
// Set the scrape manager.
813
func (rm *readyScrapeManager) Set(m *scrape.Manager) {
814
rm.mtx.Lock()
815
defer rm.mtx.Unlock()
816
817
rm.m = m
818
}
819
820
// Get the scrape manager. If is not ready, return an error.
821
func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
822
rm.mtx.RLock()
823
defer rm.mtx.RUnlock()
824
825
if rm.m != nil {
826
return rm.m, nil
827
}
828
829
return nil, ErrNotReady
830
}
831
832