Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/remote/vault/refresher.go
4096 views
1
package vault
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component"
12
vault "github.com/hashicorp/vault/api"
13
"github.com/prometheus/client_golang/prometheus"
14
)
15
16
const tokenManagerInitializeTimeout = time.Minute
17
18
type getTokenFunc func(ctx context.Context, client *vault.Client) (*vault.Secret, error)
19
20
// A tokenManager retrieves and manages the lifecycle of tokens. tokenManager,
21
// when running, will renew tokens before expiry, and will retrieve new tokens
22
// once expired tokens can no longer be renewed.
23
type tokenManager struct {
24
log log.Logger
25
refreshTicker *ticker
26
getter getTokenFunc
27
onStateChange chan struct{} // Written to when cli or token changes.
28
29
readCounter prometheus.Counter
30
refreshCounter prometheus.Counter
31
32
mut sync.RWMutex
33
cli *vault.Client
34
token *vault.Secret
35
36
healthMut sync.RWMutex
37
health component.Health
38
39
debugMut sync.RWMutex
40
debugInfo secretInfo
41
}
42
43
type tokenManagerOptions struct {
44
Log log.Logger
45
Getter getTokenFunc
46
47
ReadCounter, RefreshCounter prometheus.Counter
48
49
Client *vault.Client
50
RefreshInterval time.Duration
51
}
52
53
// newTokenManager creates a new, unstarted tokenManager. tokenManager will
54
// retrieve the initial token from getter.
55
func newTokenManager(opts tokenManagerOptions) (*tokenManager, error) {
56
ctx, cancel := context.WithTimeout(context.Background(), tokenManagerInitializeTimeout)
57
defer cancel()
58
59
tm := &tokenManager{
60
log: opts.Log,
61
refreshTicker: newTicker(opts.RefreshInterval),
62
getter: opts.Getter,
63
onStateChange: make(chan struct{}, 1),
64
65
readCounter: opts.ReadCounter,
66
refreshCounter: opts.RefreshCounter,
67
68
cli: opts.Client,
69
}
70
if err := tm.updateToken(ctx); err != nil {
71
return nil, fmt.Errorf("failed to get token: %w", err)
72
}
73
return tm, nil
74
}
75
76
// updateToken attempts to update the token, logging an error if getting the
77
// token failed.
78
func (tm *tokenManager) updateToken(ctx context.Context) (err error) {
79
defer func() {
80
if err != nil {
81
tm.updateHealth(component.Health{
82
Health: component.HealthTypeUnhealthy,
83
Message: fmt.Sprintf("failed to retrieve token: %s", err),
84
UpdateTime: time.Now(),
85
})
86
} else {
87
tm.updateHealth(component.Health{
88
Health: component.HealthTypeHealthy,
89
Message: "retrieved token",
90
UpdateTime: time.Now(),
91
})
92
}
93
94
tm.updateDebugInfo(time.Now())
95
}()
96
97
tm.mut.Lock()
98
defer tm.mut.Unlock()
99
100
token, err := tm.getter(ctx, tm.cli)
101
if err != nil {
102
level.Error(tm.log).Log("msg", "failed to get token", "err", err)
103
return err
104
}
105
106
tm.readCounter.Inc()
107
108
tm.token = token
109
110
select {
111
case tm.onStateChange <- struct{}{}:
112
default:
113
}
114
115
return nil
116
}
117
118
// Run runs the tokenManager, blocking until the provided context is canceled.
119
func (tm *tokenManager) Run(ctx context.Context) {
120
var cancelLifecycleWatcher context.CancelFunc
121
defer func() {
122
if cancelLifecycleWatcher != nil {
123
cancelLifecycleWatcher()
124
}
125
}()
126
127
for {
128
select {
129
case <-ctx.Done():
130
return
131
132
case <-tm.refreshTicker.Chan():
133
level.Info(tm.log).Log("msg", "refreshing token")
134
// Error is handled via setting health and debug info.
135
_ = tm.updateToken(ctx)
136
137
case <-tm.onStateChange:
138
if cancelLifecycleWatcher != nil {
139
cancelLifecycleWatcher()
140
}
141
142
ctx, cancel := context.WithCancel(ctx)
143
cancelLifecycleWatcher = cancel
144
145
tm.updateLifecycleWatcher(ctx)
146
}
147
}
148
}
149
150
func (tm *tokenManager) updateHealth(h component.Health) {
151
tm.healthMut.Lock()
152
defer tm.healthMut.Unlock()
153
154
tm.health = h
155
}
156
157
func (tm *tokenManager) updateDebugInfo(updateTime time.Time) {
158
tm.mut.RLock()
159
token := tm.token
160
tm.mut.RUnlock()
161
162
tm.debugMut.Lock()
163
defer tm.debugMut.Unlock()
164
165
tm.debugInfo = getSecretInfo(token, updateTime)
166
}
167
168
func (tm *tokenManager) updateLifecycleWatcher(ctx context.Context) {
169
tm.mut.RLock()
170
defer tm.mut.RUnlock()
171
172
if !needsLifecycleWatcher(tm.token) {
173
return
174
}
175
176
lw, err := tm.cli.NewLifetimeWatcher(&vault.LifetimeWatcherInput{
177
Secret: tm.token,
178
RenewBehavior: vault.RenewBehaviorIgnoreErrors,
179
})
180
if err != nil {
181
level.Error(tm.log).Log("msg", "failed to create lifetime watcher, lease will not renew automatically", "err", err)
182
return
183
}
184
185
go lw.Start()
186
187
go func() {
188
for {
189
select {
190
case <-ctx.Done():
191
lw.Stop()
192
return
193
194
case <-lw.DoneCh():
195
if ctx.Err() != nil {
196
return
197
}
198
// Error is logged as health and debug info.
199
_ = tm.updateToken(ctx)
200
201
case output := <-lw.RenewCh():
202
tm.refreshCounter.Inc()
203
level.Debug(tm.log).Log("msg", "token has renewed")
204
tm.updateDebugInfo(output.RenewedAt)
205
}
206
}
207
}()
208
}
209
210
// needsLifecycleWatcher determines if a secret needs a lifecycle watcher.
211
// Secrets only need a lifecycle watcher if they are renewable or have a lease
212
// duration.
213
func needsLifecycleWatcher(secret *vault.Secret) bool {
214
if secret == nil {
215
return false
216
}
217
218
if secret.Auth != nil {
219
return secret.Auth.Renewable || secret.Auth.LeaseDuration > 0
220
}
221
return secret.Renewable || secret.LeaseDuration > 0
222
}
223
224
// SetClient updates the client associated with the tokenManager. This will
225
// force a new retrieval of the token.
226
func (tm *tokenManager) SetClient(cli *vault.Client) {
227
tm.mut.Lock()
228
tm.cli = cli
229
tm.mut.Unlock()
230
231
ctx, cancel := context.WithTimeout(context.Background(), tokenManagerInitializeTimeout)
232
defer cancel()
233
234
// Error is handled via setting health and debug info.
235
_ = tm.updateToken(ctx)
236
}
237
238
// SetRefreshInterval sets a forced refresh interval, separate from automatic
239
// renewal based on the token lease.
240
func (tm *tokenManager) SetRefreshInterval(interval time.Duration) {
241
tm.refreshTicker.Reset(interval)
242
}
243
244
// CurrentHealth returns the health of the tokenManager.
245
func (tm *tokenManager) CurrentHealth() component.Health {
246
tm.healthMut.RLock()
247
defer tm.healthMut.RUnlock()
248
249
return tm.health
250
}
251
252
// DebugInfo returns the current DebugInfo for the tokenManager.
253
func (tm *tokenManager) DebugInfo() secretInfo {
254
tm.debugMut.RLock()
255
defer tm.debugMut.RUnlock()
256
257
return tm.debugInfo
258
}
259
260
type secretInfo struct {
261
LatestRequestID string `river:"latest_request_id,attr"`
262
LastUpdateTime time.Time `river:"last_update_time,attr"`
263
SecretExpireTime time.Time `river:"secret_expire_time,attr"`
264
Renewable bool `river:"renewable,attr"`
265
Warnings []string `river:"warnings,attr"`
266
}
267
268
func getSecretInfo(secret *vault.Secret, updateTime time.Time) secretInfo {
269
if secret == nil {
270
return secretInfo{
271
LastUpdateTime: updateTime,
272
Warnings: []string{"no secret necessary for configured auth mechanism"},
273
}
274
}
275
276
return secretInfo{
277
LatestRequestID: secret.RequestID,
278
LastUpdateTime: updateTime,
279
SecretExpireTime: secretExpireTime(secret),
280
Renewable: secret.Renewable,
281
Warnings: secret.Warnings,
282
}
283
}
284
285
func secretExpireTime(secret *vault.Secret) time.Time {
286
ttl, err := secret.TokenTTL()
287
if err != nil || ttl == 0 {
288
return time.Time{}
289
}
290
291
return time.Now().UTC().Add(ttl)
292
}
293
294