Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cluster/config_watcher_test.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/grafana/agent/pkg/metrics/instance"
9
"github.com/grafana/agent/pkg/metrics/instance/configstore"
10
"github.com/grafana/agent/pkg/util"
11
"github.com/stretchr/testify/mock"
12
"github.com/stretchr/testify/require"
13
)
14
15
func Test_configWatcher_Refresh(t *testing.T) {
16
var (
17
log = util.TestLogger(t)
18
19
cfg = DefaultConfig
20
store = configstore.Mock{
21
WatchFunc: func() <-chan configstore.WatchEvent {
22
return make(chan configstore.WatchEvent)
23
},
24
}
25
26
im mockConfigManager
27
28
validate = func(*instance.Config) error { return nil }
29
owned = func(key string) (bool, error) { return true, nil }
30
)
31
cfg.Enabled = true
32
cfg.ReshardInterval = time.Hour
33
34
w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)
35
require.NoError(t, err)
36
t.Cleanup(func() { _ = w.Stop() })
37
38
im.On("ApplyConfig", mock.Anything).Return(nil)
39
im.On("DeleteConfig", mock.Anything).Return(nil)
40
41
// First: return a "hello" config.
42
store.AllFunc = func(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
43
ch := make(chan instance.Config)
44
go func() {
45
ch <- instance.Config{Name: "hello"}
46
close(ch)
47
}()
48
return ch, nil
49
}
50
51
err = w.refresh(context.Background())
52
require.NoError(t, err)
53
54
// Then: return a "new" config.
55
store.AllFunc = func(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
56
ch := make(chan instance.Config, 1)
57
go func() {
58
ch <- instance.Config{Name: "new"}
59
close(ch)
60
}()
61
return ch, nil
62
}
63
64
err = w.refresh(context.Background())
65
require.NoError(t, err)
66
67
// "hello" and "new" should've been applied, and "hello" should've been deleted
68
// from the second refresh.
69
im.AssertCalled(t, "ApplyConfig", instance.Config{Name: "hello"})
70
im.AssertCalled(t, "ApplyConfig", instance.Config{Name: "new"})
71
im.AssertCalled(t, "DeleteConfig", "hello")
72
}
73
74
func Test_configWatcher_handleEvent(t *testing.T) {
75
var (
76
cfg = DefaultConfig
77
store = configstore.Mock{
78
WatchFunc: func() <-chan configstore.WatchEvent {
79
return make(chan configstore.WatchEvent)
80
},
81
}
82
83
validate = func(*instance.Config) error { return nil }
84
85
owned = func(key string) (bool, error) { return true, nil }
86
unowned = func(key string) (bool, error) { return false, nil }
87
)
88
cfg.Enabled = true
89
90
t.Run("new owned config", func(t *testing.T) {
91
var (
92
log = util.TestLogger(t)
93
im mockConfigManager
94
)
95
96
w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)
97
require.NoError(t, err)
98
t.Cleanup(func() { _ = w.Stop() })
99
100
im.On("ApplyConfig", mock.Anything).Return(nil)
101
im.On("DeleteConfig", mock.Anything).Return(nil)
102
103
err = w.handleEvent(configstore.WatchEvent{Key: "new", Config: &instance.Config{}})
104
require.NoError(t, err)
105
106
im.AssertNumberOfCalls(t, "ApplyConfig", 1)
107
})
108
109
t.Run("updated owned config", func(t *testing.T) {
110
var (
111
log = util.TestLogger(t)
112
im mockConfigManager
113
)
114
115
w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)
116
require.NoError(t, err)
117
t.Cleanup(func() { _ = w.Stop() })
118
119
im.On("ApplyConfig", mock.Anything).Return(nil)
120
im.On("DeleteConfig", mock.Anything).Return(nil)
121
122
// One for create, one for update
123
err = w.handleEvent(configstore.WatchEvent{Key: "update", Config: &instance.Config{}})
124
require.NoError(t, err)
125
126
err = w.handleEvent(configstore.WatchEvent{Key: "update", Config: &instance.Config{}})
127
require.NoError(t, err)
128
129
im.AssertNumberOfCalls(t, "ApplyConfig", 2)
130
})
131
132
t.Run("new unowned config", func(t *testing.T) {
133
var (
134
log = util.TestLogger(t)
135
im mockConfigManager
136
)
137
138
w, err := newConfigWatcher(log, cfg, &store, &im, unowned, validate)
139
require.NoError(t, err)
140
t.Cleanup(func() { _ = w.Stop() })
141
142
im.On("ApplyConfig", mock.Anything).Return(nil)
143
im.On("DeleteConfig", mock.Anything).Return(nil)
144
145
// One for create, one for update
146
err = w.handleEvent(configstore.WatchEvent{Key: "unowned", Config: &instance.Config{}})
147
require.NoError(t, err)
148
149
im.AssertNumberOfCalls(t, "ApplyConfig", 0)
150
})
151
152
t.Run("lost ownership", func(t *testing.T) {
153
var (
154
log = util.TestLogger(t)
155
156
im mockConfigManager
157
158
isOwned = true
159
owns = func(key string) (bool, error) { return isOwned, nil }
160
)
161
162
w, err := newConfigWatcher(log, cfg, &store, &im, owns, validate)
163
require.NoError(t, err)
164
t.Cleanup(func() { _ = w.Stop() })
165
166
im.On("ApplyConfig", mock.Anything).Return(nil)
167
im.On("DeleteConfig", mock.Anything).Return(nil)
168
169
// One for create, then one for ownership change
170
err = w.handleEvent(configstore.WatchEvent{Key: "disappear", Config: &instance.Config{}})
171
require.NoError(t, err)
172
173
// Mark the config as unowned. The re-apply should then delete it.
174
isOwned = false
175
176
err = w.handleEvent(configstore.WatchEvent{Key: "disappear", Config: &instance.Config{}})
177
require.NoError(t, err)
178
179
im.AssertNumberOfCalls(t, "ApplyConfig", 1)
180
im.AssertNumberOfCalls(t, "DeleteConfig", 1)
181
})
182
183
t.Run("deleted running config", func(t *testing.T) {
184
var (
185
log = util.TestLogger(t)
186
187
im mockConfigManager
188
)
189
190
w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)
191
require.NoError(t, err)
192
t.Cleanup(func() { _ = w.Stop() })
193
194
im.On("ApplyConfig", mock.Anything).Return(nil)
195
im.On("DeleteConfig", mock.Anything).Return(nil)
196
197
// One for create, then one for deleted.
198
err = w.handleEvent(configstore.WatchEvent{Key: "new-key", Config: &instance.Config{}})
199
require.NoError(t, err)
200
201
err = w.handleEvent(configstore.WatchEvent{Key: "new-key", Config: nil})
202
require.NoError(t, err)
203
204
im.AssertNumberOfCalls(t, "ApplyConfig", 1)
205
im.AssertNumberOfCalls(t, "DeleteConfig", 1)
206
})
207
}
208
209
func Test_configWatcher_nextReshard(t *testing.T) {
210
watcher := &configWatcher{
211
log: util.TestLogger(t),
212
cfg: Config{ReshardInterval: time.Second},
213
}
214
215
t.Run("past time", func(t *testing.T) {
216
select {
217
case <-watcher.nextReshard(time.Time{}):
218
case <-time.After(250 * time.Millisecond):
219
require.FailNow(t, "nextReshard did not return an already ready channel")
220
}
221
})
222
223
t.Run("future time", func(t *testing.T) {
224
select {
225
case <-watcher.nextReshard(time.Now()):
226
case <-time.After(1500 * time.Millisecond):
227
require.FailNow(t, "nextReshard took too long to return")
228
}
229
})
230
}
231
232
type mockConfigManager struct {
233
mock.Mock
234
}
235
236
func (m *mockConfigManager) GetInstance(name string) (instance.ManagedInstance, error) {
237
args := m.Mock.Called()
238
return args.Get(0).(instance.ManagedInstance), args.Error(1)
239
}
240
241
func (m *mockConfigManager) ListInstances() map[string]instance.ManagedInstance {
242
args := m.Mock.Called()
243
return args.Get(0).(map[string]instance.ManagedInstance)
244
}
245
246
// ListConfigs implements Manager.
247
func (m *mockConfigManager) ListConfigs() map[string]instance.Config {
248
args := m.Mock.Called()
249
return args.Get(0).(map[string]instance.Config)
250
}
251
252
// ApplyConfig implements Manager.
253
func (m *mockConfigManager) ApplyConfig(c instance.Config) error {
254
args := m.Mock.Called(c)
255
return args.Error(0)
256
}
257
258
// DeleteConfig implements Manager.
259
func (m *mockConfigManager) DeleteConfig(name string) error {
260
args := m.Mock.Called(name)
261
return args.Error(0)
262
}
263
264
// Stop implements Manager.
265
func (m *mockConfigManager) Stop() {
266
m.Mock.Called()
267
}
268
269