Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cleaner.go
4093 views
1
package metrics
2
3
import (
4
"fmt"
5
"os"
6
"path/filepath"
7
"time"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/pkg/metrics/instance"
12
"github.com/grafana/agent/pkg/metrics/wal"
13
"github.com/prometheus/client_golang/prometheus"
14
"github.com/prometheus/client_golang/prometheus/promauto"
15
promwal "github.com/prometheus/prometheus/tsdb/wlog"
16
)
17
18
// Default settings for the WAL cleaner.
19
const (
20
DefaultCleanupAge = 12 * time.Hour
21
DefaultCleanupPeriod = 30 * time.Minute
22
)
23
24
var (
25
discoveryError = promauto.NewCounterVec(
26
prometheus.CounterOpts{
27
Name: "agent_metrics_cleaner_storage_error_total",
28
Help: "Errors encountered discovering local storage paths",
29
},
30
[]string{"storage"},
31
)
32
33
segmentError = promauto.NewCounterVec(
34
prometheus.CounterOpts{
35
Name: "agent_metrics_cleaner_segment_error_total",
36
Help: "Errors encountered finding most recent WAL segments",
37
},
38
[]string{"storage"},
39
)
40
41
managedStorage = promauto.NewGauge(
42
prometheus.GaugeOpts{
43
Name: "agent_metrics_cleaner_managed_storage",
44
Help: "Number of storage directories associated with managed instances",
45
},
46
)
47
48
abandonedStorage = promauto.NewGauge(
49
prometheus.GaugeOpts{
50
Name: "agent_metrics_cleaner_abandoned_storage",
51
Help: "Number of storage directories not associated with any managed instance",
52
},
53
)
54
55
cleanupRunsSuccess = promauto.NewCounter(
56
prometheus.CounterOpts{
57
Name: "agent_metrics_cleaner_success_total",
58
Help: "Number of successfully removed abandoned WALs",
59
},
60
)
61
62
cleanupRunsErrors = promauto.NewCounter(
63
prometheus.CounterOpts{
64
Name: "agent_metrics_cleaner_errors_total",
65
Help: "Number of errors removing abandoned WALs",
66
},
67
)
68
69
cleanupTimes = promauto.NewHistogram(
70
prometheus.HistogramOpts{
71
Name: "agent_metrics_cleaner_cleanup_seconds",
72
Help: "Time spent performing each periodic WAL cleanup",
73
},
74
)
75
)
76
77
// lastModifiedFunc gets the last modified time of the most recent segment of a WAL
78
type lastModifiedFunc func(path string) (time.Time, error)
79
80
func lastModified(path string) (time.Time, error) {
81
existing, err := promwal.Open(nil, path)
82
if err != nil {
83
return time.Time{}, err
84
}
85
86
// We don't care if there are errors closing the abandoned WAL
87
defer func() { _ = existing.Close() }()
88
89
_, last, err := promwal.Segments(existing.Dir())
90
if err != nil {
91
return time.Time{}, fmt.Errorf("unable to open WAL: %w", err)
92
}
93
94
if last == -1 {
95
return time.Time{}, fmt.Errorf("unable to determine most recent segment for %s", path)
96
}
97
98
// full path to the most recent segment in this WAL
99
lastSegment := promwal.SegmentName(path, last)
100
segmentFile, err := os.Stat(lastSegment)
101
if err != nil {
102
return time.Time{}, fmt.Errorf("unable to determine mtime for %s segment: %w", lastSegment, err)
103
}
104
105
return segmentFile.ModTime(), nil
106
}
107
108
// WALCleaner periodically checks for Write Ahead Logs (WALs) that are not associated
109
// with any active instance.ManagedInstance and have not been written to in some configured
110
// amount of time and deletes them.
111
type WALCleaner struct {
112
logger log.Logger
113
instanceManager instance.Manager
114
walDirectory string
115
walLastModified lastModifiedFunc
116
minAge time.Duration
117
period time.Duration
118
done chan bool
119
}
120
121
// NewWALCleaner creates a new cleaner that looks for abandoned WALs in the given
122
// directory and removes them if they haven't been modified in over minAge. Starts
123
// a goroutine to periodically run the cleanup method in a loop
124
func NewWALCleaner(logger log.Logger, manager instance.Manager, walDirectory string, minAge time.Duration, period time.Duration) *WALCleaner {
125
c := &WALCleaner{
126
logger: log.With(logger, "component", "cleaner"),
127
instanceManager: manager,
128
walDirectory: filepath.Clean(walDirectory),
129
walLastModified: lastModified,
130
minAge: DefaultCleanupAge,
131
period: DefaultCleanupPeriod,
132
done: make(chan bool),
133
}
134
135
if minAge > 0 {
136
c.minAge = minAge
137
}
138
139
// We allow a period of 0 here because '0' means "don't run the task". This
140
// is handled by not running a ticker at all in the run method.
141
if period >= 0 {
142
c.period = period
143
}
144
145
go c.run()
146
return c
147
}
148
149
// getManagedStorage gets storage directories used for each ManagedInstance
150
func (c *WALCleaner) getManagedStorage(instances map[string]instance.ManagedInstance) map[string]bool {
151
out := make(map[string]bool)
152
153
for _, inst := range instances {
154
out[inst.StorageDirectory()] = true
155
}
156
157
return out
158
}
159
160
// getAllStorage gets all storage directories under walDirectory
161
func (c *WALCleaner) getAllStorage() []string {
162
var out []string
163
164
_ = filepath.Walk(c.walDirectory, func(p string, info os.FileInfo, err error) error {
165
if os.IsNotExist(err) {
166
// The root WAL directory doesn't exist. Maybe this Agent isn't responsible for any
167
// instances yet. Log at debug since this isn't a big deal. We'll just try to crawl
168
// the direction again on the next periodic run.
169
level.Debug(c.logger).Log("msg", "WAL storage path does not exist", "path", p, "err", err)
170
} else if err != nil {
171
// Just log any errors traversing the WAL directory. This will potentially result
172
// in a WAL (that has incorrect permissions or some similar problem) not being cleaned
173
// up. This is better than preventing *all* other WALs from being cleaned up.
174
discoveryError.WithLabelValues(p).Inc()
175
level.Warn(c.logger).Log("msg", "unable to traverse WAL storage path", "path", p, "err", err)
176
} else if info.IsDir() && filepath.Dir(p) == c.walDirectory {
177
// Single level below the root are instance storage directories (including WALs)
178
out = append(out, p)
179
}
180
181
return nil
182
})
183
184
return out
185
}
186
187
// getAbandonedStorage gets the full path of storage directories that aren't associated with
188
// an active instance and haven't been written to within a configured duration (usually several
189
// hours or more).
190
func (c *WALCleaner) getAbandonedStorage(all []string, managed map[string]bool, now time.Time) []string {
191
var out []string
192
193
for _, dir := range all {
194
if managed[dir] {
195
level.Debug(c.logger).Log("msg", "active WAL", "name", dir)
196
continue
197
}
198
199
walDir := wal.SubDirectory(dir)
200
mtime, err := c.walLastModified(walDir)
201
if err != nil {
202
segmentError.WithLabelValues(dir).Inc()
203
level.Warn(c.logger).Log("msg", "unable to find segment mtime of WAL", "name", dir, "err", err)
204
continue
205
}
206
207
diff := now.Sub(mtime)
208
if diff > c.minAge {
209
// The last segment for this WAL was modified more than $minAge (positive number of hours)
210
// in the past. This makes it a candidate for deletion since it's also not associated with
211
// any Instances this agent knows about.
212
out = append(out, dir)
213
}
214
215
level.Debug(c.logger).Log("msg", "abandoned WAL", "name", dir, "mtime", mtime, "diff", diff)
216
}
217
218
return out
219
}
220
221
// run cleans up abandoned WALs (if period != 0) in a loop periodically until stopped
222
func (c *WALCleaner) run() {
223
// A period of 0 means don't run a cleanup task
224
if c.period == 0 {
225
return
226
}
227
228
ticker := time.NewTicker(c.period)
229
defer ticker.Stop()
230
231
for {
232
select {
233
case <-c.done:
234
level.Debug(c.logger).Log("msg", "stopping cleaner...")
235
return
236
case <-ticker.C:
237
c.cleanup()
238
}
239
}
240
}
241
242
// cleanup removes any abandoned and unused WAL directories. Note that it shouldn't be
243
// necessary to call this method explicitly in most cases since it will be run periodically
244
// in a goroutine (started when WALCleaner is created).
245
func (c *WALCleaner) cleanup() {
246
start := time.Now()
247
all := c.getAllStorage()
248
managed := c.getManagedStorage(c.instanceManager.ListInstances())
249
abandoned := c.getAbandonedStorage(all, managed, time.Now())
250
251
managedStorage.Set(float64(len(managed)))
252
abandonedStorage.Set(float64(len(abandoned)))
253
254
for _, a := range abandoned {
255
level.Info(c.logger).Log("msg", "deleting abandoned WAL", "name", a)
256
err := os.RemoveAll(a)
257
if err != nil {
258
level.Error(c.logger).Log("msg", "failed to delete abandoned WAL", "name", a, "err", err)
259
cleanupRunsErrors.Inc()
260
} else {
261
cleanupRunsSuccess.Inc()
262
}
263
}
264
265
cleanupTimes.Observe(time.Since(start).Seconds())
266
}
267
268
// Stop the cleaner and any background tasks running
269
func (c *WALCleaner) Stop() {
270
close(c.done)
271
}
272
273