Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cluster/cluster.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
8
"github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/golang/protobuf/ptypes/empty"
11
"github.com/gorilla/mux"
12
"github.com/grafana/agent/pkg/agentproto"
13
"github.com/grafana/agent/pkg/metrics/instance"
14
"github.com/grafana/agent/pkg/metrics/instance/configstore"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/prometheus/client_golang/prometheus"
17
"google.golang.org/grpc"
18
)
19
20
// Cluster connects an Agent to other Agents and allows them to distribute
21
// workload.
22
type Cluster struct {
23
mut sync.RWMutex
24
25
log log.Logger
26
cfg Config
27
baseValidation ValidationFunc
28
29
//
30
// Internally, Cluster glues together four separate pieces of logic.
31
// See comments below to get an understanding of what is going on.
32
//
33
34
// node manages membership in the cluster and performs cluster-wide reshards.
35
node *node
36
37
// store connects to a configstore for changes. storeAPI is an HTTP API for it.
38
store *configstore.Remote
39
storeAPI *configstore.API
40
41
// watcher watches the store and applies changes to an instance.Manager,
42
// triggering metrics to be collected and sent. configWatcher also does a
43
// complete refresh of its state on an interval.
44
watcher *configWatcher
45
}
46
47
// New creates a new Cluster.
48
func New(
49
l log.Logger,
50
reg prometheus.Registerer,
51
cfg Config,
52
im instance.Manager,
53
validate ValidationFunc,
54
) (*Cluster, error) {
55
56
l = log.With(l, "component", "cluster")
57
58
var (
59
c = &Cluster{log: l, cfg: cfg, baseValidation: validate}
60
err error
61
)
62
63
// Hold the lock for the initialization. This is necessary since newNode will
64
// eventually call Reshard, and we want c.watcher to be initialized when that
65
// happens.
66
c.mut.Lock()
67
defer c.mut.Unlock()
68
69
c.node, err = newNode(reg, l, cfg, c)
70
if err != nil {
71
return nil, fmt.Errorf("failed to initialize node membership: %w", err)
72
}
73
74
c.store, err = configstore.NewRemote(l, reg, cfg.KVStore.Config, cfg.Enabled)
75
if err != nil {
76
return nil, fmt.Errorf("failed to initialize configstore: %w", err)
77
}
78
c.storeAPI = configstore.NewAPI(l, c.store, c.storeValidate, cfg.APIEnableGetConfiguration)
79
reg.MustRegister(c.storeAPI)
80
81
c.watcher, err = newConfigWatcher(l, cfg, c.store, im, c.node.Owns, validate)
82
if err != nil {
83
return nil, fmt.Errorf("failed to initialize configwatcher: %w", err)
84
}
85
86
// NOTE(rfratto): ApplyConfig isn't necessary for the initialization but must
87
// be called for any changes to the configuration.
88
return c, nil
89
}
90
91
func (c *Cluster) storeValidate(cfg *instance.Config) error {
92
c.mut.RLock()
93
defer c.mut.RUnlock()
94
95
if err := c.baseValidation(cfg); err != nil {
96
return err
97
}
98
99
if c.cfg.DangerousAllowReadingFiles {
100
return nil
101
}
102
103
// If configs aren't allowed to read from the store, we need to make sure no
104
// configs coming in from the API set files for passwords.
105
return validateNofiles(cfg)
106
}
107
108
// Reshard implements agentproto.ScrapingServiceServer, and syncs the state of
109
// configs with the configstore.
110
func (c *Cluster) Reshard(ctx context.Context, _ *agentproto.ReshardRequest) (*empty.Empty, error) {
111
c.mut.RLock()
112
defer c.mut.RUnlock()
113
114
level.Info(c.log).Log("msg", "received reshard notification, requesting refresh")
115
c.watcher.RequestRefresh()
116
return &empty.Empty{}, nil
117
}
118
119
// ApplyConfig applies configuration changes to Cluster.
120
func (c *Cluster) ApplyConfig(cfg Config) error {
121
c.mut.Lock()
122
defer c.mut.Unlock()
123
124
if util.CompareYAML(c.cfg, cfg) {
125
return nil
126
}
127
128
if err := c.node.ApplyConfig(cfg); err != nil {
129
return fmt.Errorf("failed to apply config to node membership: %w", err)
130
}
131
132
if err := c.store.ApplyConfig(cfg.Lifecycler.RingConfig.KVStore, cfg.Enabled); err != nil {
133
return fmt.Errorf("failed to apply config to config store: %w", err)
134
}
135
136
if err := c.watcher.ApplyConfig(cfg); err != nil {
137
return fmt.Errorf("failed to apply config to watcher: %w", err)
138
}
139
140
c.cfg = cfg
141
142
// Force a refresh so all the configs get updated with new defaults.
143
level.Info(c.log).Log("msg", "cluster config changed, queueing refresh")
144
c.watcher.RequestRefresh()
145
return nil
146
}
147
148
// WireAPI injects routes into the provided mux router for the config
149
// management API.
150
func (c *Cluster) WireAPI(r *mux.Router) {
151
c.storeAPI.WireAPI(r)
152
c.node.WireAPI(r)
153
}
154
155
// WireGRPC injects gRPC server handlers into the provided gRPC server.
156
func (c *Cluster) WireGRPC(srv *grpc.Server) {
157
agentproto.RegisterScrapingServiceServer(srv, c)
158
}
159
160
// Stop stops the cluster and all of its dependencies.
161
func (c *Cluster) Stop() {
162
c.mut.Lock()
163
defer c.mut.Unlock()
164
165
deps := []struct {
166
name string
167
closer func() error
168
}{
169
{"node", c.node.Stop},
170
{"config store", c.store.Close},
171
{"config watcher", c.watcher.Stop},
172
}
173
for _, dep := range deps {
174
err := dep.closer()
175
if err != nil {
176
level.Error(c.log).Log("msg", "failed to stop dependency", "dependency", dep.name, "err", err)
177
}
178
}
179
}
180
181