Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/config/agentmanagement.go
4095 views
1
package config
2
3
import (
4
"crypto/sha256"
5
"encoding/hex"
6
"encoding/json"
7
"errors"
8
"flag"
9
"fmt"
10
"math/rand"
11
"net/url"
12
"os"
13
"path/filepath"
14
"time"
15
16
"github.com/go-kit/log/level"
17
"github.com/grafana/agent/pkg/config/instrumentation"
18
"github.com/grafana/agent/pkg/server"
19
"github.com/prometheus/common/config"
20
"gopkg.in/yaml.v2"
21
)
22
23
const cacheFilename = "remote-config-cache.yaml"
24
const apiPath = "/agent-management/api/agent/v2"
25
26
type remoteConfigProvider interface {
27
GetCachedRemoteConfig() ([]byte, error)
28
CacheRemoteConfig(remoteConfigBytes []byte) error
29
FetchRemoteConfig() ([]byte, error)
30
}
31
32
type remoteConfigHTTPProvider struct {
33
InitialConfig *AgentManagementConfig
34
}
35
36
func newRemoteConfigHTTPProvider(c *Config) (*remoteConfigHTTPProvider, error) {
37
err := c.AgentManagement.Validate()
38
if err != nil {
39
return nil, err
40
}
41
return &remoteConfigHTTPProvider{
42
InitialConfig: &c.AgentManagement,
43
}, nil
44
}
45
46
type remoteConfigCache struct {
47
InitialConfigHash string `json:"initial_config_hash"`
48
Config string `json:"config"`
49
}
50
51
func hashInitialConfig(am AgentManagementConfig) (string, error) {
52
marshalled, err := yaml.Marshal(am)
53
if err != nil {
54
return "", fmt.Errorf("could not marshal initial config: %w", err)
55
}
56
hashed := sha256.Sum256(marshalled)
57
return hex.EncodeToString(hashed[:]), nil
58
}
59
60
// initialConfigHashCheck checks if the hash of initialConfig matches what is stored in configCache.InitialConfigHash.
61
// If an error is encountered while hashing initialConfig or the hashes do not match, initialConfigHashCheck
62
// returns an error. Otherwise, it returns nil.
63
func initialConfigHashCheck(initialConfig AgentManagementConfig, configCache remoteConfigCache) error {
64
initialConfigHash, err := hashInitialConfig(initialConfig)
65
if err != nil {
66
return err
67
}
68
69
if !(configCache.InitialConfigHash == initialConfigHash) {
70
return errors.New("invalid remote config cache: initial config hashes don't match")
71
}
72
return nil
73
}
74
75
// GetCachedRemoteConfig retrieves the cached remote config from the location specified
76
// in r.AgentManagement.CacheLocation
77
func (r remoteConfigHTTPProvider) GetCachedRemoteConfig() ([]byte, error) {
78
cachePath := filepath.Join(r.InitialConfig.RemoteConfiguration.CacheLocation, cacheFilename)
79
80
var configCache remoteConfigCache
81
buf, err := os.ReadFile(cachePath)
82
83
if err != nil {
84
return nil, fmt.Errorf("error reading remote config cache: %w", err)
85
}
86
87
if err := json.Unmarshal(buf, &configCache); err != nil {
88
return nil, fmt.Errorf("error trying to load cached remote config from file: %w", err)
89
}
90
91
if err = initialConfigHashCheck(*r.InitialConfig, configCache); err != nil {
92
return nil, err
93
}
94
95
return []byte(configCache.Config), nil
96
}
97
98
// CacheRemoteConfig caches the remote config to the location specified in
99
// r.AgentManagement.CacheLocation
100
func (r remoteConfigHTTPProvider) CacheRemoteConfig(remoteConfigBytes []byte) error {
101
cachePath := filepath.Join(r.InitialConfig.RemoteConfiguration.CacheLocation, cacheFilename)
102
initialConfigHash, err := hashInitialConfig(*r.InitialConfig)
103
if err != nil {
104
return err
105
}
106
configCache := remoteConfigCache{
107
InitialConfigHash: initialConfigHash,
108
Config: string(remoteConfigBytes),
109
}
110
marshalled, err := json.Marshal(configCache)
111
if err != nil {
112
return fmt.Errorf("could not marshal remote config cache: %w", err)
113
}
114
return os.WriteFile(cachePath, marshalled, 0666)
115
}
116
117
// FetchRemoteConfig fetches the raw bytes of the config from a remote API using
118
// the values in r.AgentManagement.
119
//
120
// Sleeps for a short period of time to apply jitter to API requests.
121
func (r remoteConfigHTTPProvider) FetchRemoteConfig() ([]byte, error) {
122
httpClientConfig := &config.HTTPClientConfig{
123
BasicAuth: &r.InitialConfig.BasicAuth,
124
}
125
126
dir, err := os.Getwd()
127
if err != nil {
128
return nil, fmt.Errorf("failed to get current working directory: %w", err)
129
}
130
httpClientConfig.SetDirectory(dir)
131
132
remoteOpts := &remoteOpts{
133
HTTPClientConfig: httpClientConfig,
134
}
135
136
url, err := r.InitialConfig.fullUrl()
137
if err != nil {
138
return nil, fmt.Errorf("error trying to create full url: %w", err)
139
}
140
rc, err := newRemoteProvider(url, remoteOpts)
141
if err != nil {
142
return nil, fmt.Errorf("error reading remote config: %w", err)
143
}
144
145
bb, err := rc.retrieve()
146
if err != nil {
147
return nil, fmt.Errorf("error retrieving remote config: %w", err)
148
}
149
return bb, nil
150
}
151
152
type labelMap map[string]string
153
154
type RemoteConfiguration struct {
155
Labels labelMap `yaml:"labels"`
156
Namespace string `yaml:"namespace"`
157
CacheLocation string `yaml:"cache_location"`
158
}
159
160
type AgentManagementConfig struct {
161
Enabled bool `yaml:"-"` // Derived from enable-features=agent-management
162
Host string `yaml:"host"`
163
BasicAuth config.BasicAuth `yaml:"basic_auth"`
164
Protocol string `yaml:"protocol"`
165
PollingInterval time.Duration `yaml:"polling_interval"`
166
167
RemoteConfiguration RemoteConfiguration `yaml:"remote_configuration"`
168
}
169
170
// getRemoteConfig gets the remote config specified in the initial config, falling back to a local, cached copy
171
// of the remote config if the request to the remote fails. If both fail, an empty config and an
172
// error will be returned.
173
func getRemoteConfig(expandEnvVars bool, configProvider remoteConfigProvider, log *server.Logger, fs *flag.FlagSet, retry bool) (*Config, error) {
174
remoteConfigBytes, err := configProvider.FetchRemoteConfig()
175
if err != nil {
176
var retryAfterErr retryAfterError
177
if errors.As(err, &retryAfterErr) && retry {
178
level.Error(log).Log("msg", "received retry-after from API, sleeping and falling back to cache", "retry-after", retryAfterErr.retryAfter)
179
time.Sleep(retryAfterErr.retryAfter)
180
return getRemoteConfig(expandEnvVars, configProvider, log, fs, false)
181
}
182
level.Error(log).Log("msg", "could not fetch from API, falling back to cache", "err", err)
183
return getCachedRemoteConfig(expandEnvVars, configProvider, fs, log)
184
}
185
186
config, err := loadRemoteConfig(remoteConfigBytes, expandEnvVars, fs)
187
if err != nil {
188
level.Error(log).Log("msg", "could not load remote config, falling back to cache", "err", err)
189
return getCachedRemoteConfig(expandEnvVars, configProvider, fs, log)
190
}
191
192
level.Info(log).Log("msg", "fetched and loaded remote config from API")
193
194
if err = configProvider.CacheRemoteConfig(remoteConfigBytes); err != nil {
195
level.Error(log).Log("err", fmt.Errorf("could not cache config locally: %w", err))
196
}
197
return config, nil
198
}
199
200
// getCachedRemoteConfig gets the cached remote config, falling back to the default config if the cache is invalid or not found.
201
func getCachedRemoteConfig(expandEnvVars bool, configProvider remoteConfigProvider, fs *flag.FlagSet, log *server.Logger) (*Config, error) {
202
rc, err := configProvider.GetCachedRemoteConfig()
203
if err != nil {
204
level.Error(log).Log("msg", "could not get cached remote config, falling back to default (empty) config", "err", err)
205
d := DefaultConfig()
206
instrumentation.InstrumentAgentManagementConfigFallback("empty_config")
207
return &d, nil
208
}
209
instrumentation.InstrumentAgentManagementConfigFallback("cache")
210
return loadRemoteConfig(rc, expandEnvVars, fs)
211
}
212
213
// loadRemoteConfig parses and validates the remote config, both syntactically and semantically.
214
func loadRemoteConfig(remoteConfigBytes []byte, expandEnvVars bool, fs *flag.FlagSet) (*Config, error) {
215
expandedRemoteConfigBytes, err := performEnvVarExpansion(remoteConfigBytes, expandEnvVars)
216
if err != nil {
217
instrumentation.InstrumentInvalidRemoteConfig("env_var_expansion")
218
return nil, fmt.Errorf("could not expand env vars for remote config: %w", err)
219
}
220
221
remoteConfig, err := NewRemoteConfig(expandedRemoteConfigBytes)
222
if err != nil {
223
instrumentation.InstrumentInvalidRemoteConfig("invalid_yaml")
224
return nil, fmt.Errorf("could not unmarshal remote config: %w", err)
225
}
226
227
config, err := remoteConfig.BuildAgentConfig()
228
if err != nil {
229
instrumentation.InstrumentInvalidRemoteConfig("invalid_remote_config")
230
return nil, fmt.Errorf("could not build agent config: %w", err)
231
}
232
233
if err = config.Validate(fs); err != nil {
234
instrumentation.InstrumentInvalidRemoteConfig("semantically_invalid_agent_config")
235
return nil, fmt.Errorf("semantically invalid config received from the API: %w", err)
236
}
237
return config, nil
238
}
239
240
// newRemoteConfigProvider creates a remoteConfigProvider based on the protocol
241
// specified in c.AgentManagement
242
func newRemoteConfigProvider(c *Config) (*remoteConfigHTTPProvider, error) {
243
switch p := c.AgentManagement.Protocol; {
244
case p == "https" || p == "http":
245
return newRemoteConfigHTTPProvider(c)
246
default:
247
return nil, fmt.Errorf("unsupported protocol for agent management api: %s", p)
248
}
249
}
250
251
// fullUrl creates and returns the URL that should be used when querying the Agent Management API,
252
// including the namespace, base config id, and any labels that have been specified.
253
func (am *AgentManagementConfig) fullUrl() (string, error) {
254
fullPath, err := url.JoinPath(am.Protocol+"://", am.Host, apiPath, "namespace", am.RemoteConfiguration.Namespace, "remote_config")
255
if err != nil {
256
return "", fmt.Errorf("error trying to join url: %w", err)
257
}
258
u, err := url.Parse(fullPath)
259
if err != nil {
260
return "", fmt.Errorf("error trying to parse url: %w", err)
261
}
262
q := u.Query()
263
for label, value := range am.RemoteConfiguration.Labels {
264
q.Add(label, value)
265
}
266
u.RawQuery = q.Encode()
267
return u.String(), nil
268
}
269
270
// SleepTime returns the duration in between config fetches.
271
func (am *AgentManagementConfig) SleepTime() time.Duration {
272
return am.PollingInterval
273
}
274
275
// jitterTime returns a random duration in the range [0, am.PollingInterval).
276
func (am *AgentManagementConfig) JitterTime() time.Duration {
277
return time.Duration(rand.Int63n(int64(am.PollingInterval)))
278
}
279
280
// Validate checks that necessary portions of the config have been set.
281
func (am *AgentManagementConfig) Validate() error {
282
if am.BasicAuth.Username == "" || am.BasicAuth.PasswordFile == "" {
283
return errors.New("both username and password_file fields must be specified")
284
}
285
286
if am.PollingInterval <= 0 {
287
return fmt.Errorf("polling interval must be >0")
288
}
289
290
if am.RemoteConfiguration.Namespace == "" {
291
return errors.New("namespace must be specified in 'remote_configuration' block of the config")
292
}
293
294
if am.RemoteConfiguration.CacheLocation == "" {
295
return errors.New("path to cache must be specified in 'agent_management.remote_configuration.cache_location'")
296
}
297
298
return nil
299
}
300
301