package metrics
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/grafana/agent/pkg/metrics/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promwal "github.com/prometheus/prometheus/tsdb/wlog"
)
const (
DefaultCleanupAge = 12 * time.Hour
DefaultCleanupPeriod = 30 * time.Minute
)
var (
discoveryError = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "agent_metrics_cleaner_storage_error_total",
Help: "Errors encountered discovering local storage paths",
},
[]string{"storage"},
)
segmentError = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "agent_metrics_cleaner_segment_error_total",
Help: "Errors encountered finding most recent WAL segments",
},
[]string{"storage"},
)
managedStorage = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "agent_metrics_cleaner_managed_storage",
Help: "Number of storage directories associated with managed instances",
},
)
abandonedStorage = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "agent_metrics_cleaner_abandoned_storage",
Help: "Number of storage directories not associated with any managed instance",
},
)
cleanupRunsSuccess = promauto.NewCounter(
prometheus.CounterOpts{
Name: "agent_metrics_cleaner_success_total",
Help: "Number of successfully removed abandoned WALs",
},
)
cleanupRunsErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "agent_metrics_cleaner_errors_total",
Help: "Number of errors removing abandoned WALs",
},
)
cleanupTimes = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "agent_metrics_cleaner_cleanup_seconds",
Help: "Time spent performing each periodic WAL cleanup",
},
)
)
type lastModifiedFunc func(path string) (time.Time, error)
func lastModified(path string) (time.Time, error) {
existing, err := promwal.Open(nil, path)
if err != nil {
return time.Time{}, err
}
defer func() { _ = existing.Close() }()
_, last, err := promwal.Segments(existing.Dir())
if err != nil {
return time.Time{}, fmt.Errorf("unable to open WAL: %w", err)
}
if last == -1 {
return time.Time{}, fmt.Errorf("unable to determine most recent segment for %s", path)
}
lastSegment := promwal.SegmentName(path, last)
segmentFile, err := os.Stat(lastSegment)
if err != nil {
return time.Time{}, fmt.Errorf("unable to determine mtime for %s segment: %w", lastSegment, err)
}
return segmentFile.ModTime(), nil
}
type WALCleaner struct {
logger log.Logger
instanceManager instance.Manager
walDirectory string
walLastModified lastModifiedFunc
minAge time.Duration
period time.Duration
done chan bool
}
func NewWALCleaner(logger log.Logger, manager instance.Manager, walDirectory string, minAge time.Duration, period time.Duration) *WALCleaner {
c := &WALCleaner{
logger: log.With(logger, "component", "cleaner"),
instanceManager: manager,
walDirectory: filepath.Clean(walDirectory),
walLastModified: lastModified,
minAge: DefaultCleanupAge,
period: DefaultCleanupPeriod,
done: make(chan bool),
}
if minAge > 0 {
c.minAge = minAge
}
if period >= 0 {
c.period = period
}
go c.run()
return c
}
func (c *WALCleaner) getManagedStorage(instances map[string]instance.ManagedInstance) map[string]bool {
out := make(map[string]bool)
for _, inst := range instances {
out[inst.StorageDirectory()] = true
}
return out
}
func (c *WALCleaner) getAllStorage() []string {
var out []string
_ = filepath.Walk(c.walDirectory, func(p string, info os.FileInfo, err error) error {
if os.IsNotExist(err) {
level.Debug(c.logger).Log("msg", "WAL storage path does not exist", "path", p, "err", err)
} else if err != nil {
discoveryError.WithLabelValues(p).Inc()
level.Warn(c.logger).Log("msg", "unable to traverse WAL storage path", "path", p, "err", err)
} else if info.IsDir() && filepath.Dir(p) == c.walDirectory {
out = append(out, p)
}
return nil
})
return out
}
func (c *WALCleaner) getAbandonedStorage(all []string, managed map[string]bool, now time.Time) []string {
var out []string
for _, dir := range all {
if managed[dir] {
level.Debug(c.logger).Log("msg", "active WAL", "name", dir)
continue
}
walDir := wal.SubDirectory(dir)
mtime, err := c.walLastModified(walDir)
if err != nil {
segmentError.WithLabelValues(dir).Inc()
level.Warn(c.logger).Log("msg", "unable to find segment mtime of WAL", "name", dir, "err", err)
continue
}
diff := now.Sub(mtime)
if diff > c.minAge {
out = append(out, dir)
}
level.Debug(c.logger).Log("msg", "abandoned WAL", "name", dir, "mtime", mtime, "diff", diff)
}
return out
}
func (c *WALCleaner) run() {
if c.period == 0 {
return
}
ticker := time.NewTicker(c.period)
defer ticker.Stop()
for {
select {
case <-c.done:
level.Debug(c.logger).Log("msg", "stopping cleaner...")
return
case <-ticker.C:
c.cleanup()
}
}
}
func (c *WALCleaner) cleanup() {
start := time.Now()
all := c.getAllStorage()
managed := c.getManagedStorage(c.instanceManager.ListInstances())
abandoned := c.getAbandonedStorage(all, managed, time.Now())
managedStorage.Set(float64(len(managed)))
abandonedStorage.Set(float64(len(abandoned)))
for _, a := range abandoned {
level.Info(c.logger).Log("msg", "deleting abandoned WAL", "name", a)
err := os.RemoveAll(a)
if err != nil {
level.Error(c.logger).Log("msg", "failed to delete abandoned WAL", "name", a, "err", err)
cleanupRunsErrors.Inc()
} else {
cleanupRunsSuccess.Inc()
}
}
cleanupTimes.Observe(time.Since(start).Seconds())
}
func (c *WALCleaner) Stop() {
close(c.done)
}