Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/cmd/internal/flowmode/cmd_run.go
4094 views
1
package flowmode
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"net"
8
"net/http"
9
"os"
10
"os/signal"
11
"path"
12
"sync"
13
"syscall"
14
15
"github.com/grafana/agent/web/api"
16
"github.com/grafana/agent/web/ui"
17
"github.com/rfratto/ckit/memconn"
18
"go.opentelemetry.io/otel"
19
"golang.org/x/exp/maps"
20
"golang.org/x/net/http2"
21
"golang.org/x/net/http2/h2c"
22
23
"github.com/fatih/color"
24
"github.com/go-kit/log/level"
25
"github.com/gorilla/mux"
26
"github.com/grafana/agent/pkg/cluster"
27
"github.com/grafana/agent/pkg/config/instrumentation"
28
"github.com/grafana/agent/pkg/flow"
29
"github.com/grafana/agent/pkg/flow/logging"
30
"github.com/grafana/agent/pkg/flow/tracing"
31
"github.com/grafana/agent/pkg/river/diag"
32
"github.com/grafana/agent/pkg/usagestats"
33
"github.com/prometheus/client_golang/prometheus"
34
"github.com/prometheus/client_golang/prometheus/promhttp"
35
"github.com/spf13/cobra"
36
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
37
38
// Install Components
39
_ "github.com/grafana/agent/component/all"
40
)
41
42
func runCommand() *cobra.Command {
43
r := &flowRun{
44
inMemoryAddr: "agent.internal:12345",
45
httpListenAddr: "127.0.0.1:12345",
46
storagePath: "data-agent/",
47
uiPrefix: "/",
48
disableReporting: false,
49
}
50
51
cmd := &cobra.Command{
52
Use: "run [flags] file",
53
Short: "Run Grafana Agent Flow",
54
Long: `The run subcommand runs Grafana Agent Flow in the foreground until an interrupt
55
is received.
56
57
run must be provided an argument pointing at the River file to use. If the
58
River file wasn't specified, can't be loaded, or contains errors, run will exit
59
immediately.
60
61
run starts an HTTP server which can be used to debug Grafana Agent Flow or
62
force it to reload (by sending a GET or POST request to /-/reload). The listen
63
address can be changed through the --server.http.listen-addr flag.
64
65
By default, the HTTP server exposes a debugging UI at /. The path of the
66
debugging UI can be changed by providing a different value to
67
--server.http.ui-path-prefix.
68
69
Additionally, the HTTP server exposes the following debug endpoints:
70
71
/debug/pprof Go performance profiling tools
72
73
If reloading the config file fails, Grafana Agent Flow will continue running in
74
its last valid state. Components which failed may be be listed as unhealthy,
75
depending on the nature of the reload error.
76
`,
77
Args: cobra.ExactArgs(1),
78
SilenceUsage: true,
79
80
RunE: func(cmd *cobra.Command, args []string) error {
81
return r.Run(args[0])
82
},
83
}
84
85
cmd.Flags().
86
StringVar(&r.httpListenAddr, "server.http.listen-addr", r.httpListenAddr, "Address to listen for HTTP traffic on")
87
cmd.Flags().StringVar(&r.inMemoryAddr, "server.http.memory-addr", r.inMemoryAddr, "Address to listen for in-memory HTTP traffic on. Change if it collides with a real address")
88
cmd.Flags().StringVar(&r.storagePath, "storage.path", r.storagePath, "Base directory where components can store data")
89
cmd.Flags().StringVar(&r.uiPrefix, "server.http.ui-path-prefix", r.uiPrefix, "Prefix to serve the HTTP UI at")
90
cmd.Flags().
91
BoolVar(&r.clusterEnabled, "cluster.enabled", r.clusterEnabled, "Start in clustered mode")
92
cmd.Flags().
93
StringVar(&r.clusterJoinAddr, "cluster.advertise-address", r.clusterAdvAddr, "Address to advertise to the cluster")
94
cmd.Flags().
95
StringVar(&r.clusterJoinAddr, "cluster.join-addresses", r.clusterJoinAddr, "Comma-separated list of addresses to join the cluster at")
96
cmd.Flags().
97
BoolVar(&r.disableReporting, "disable-reporting", r.disableReporting, "Disable reporting of enabled components to Grafana.")
98
return cmd
99
}
100
101
type flowRun struct {
102
inMemoryAddr string
103
httpListenAddr string
104
storagePath string
105
uiPrefix string
106
disableReporting bool
107
clusterEnabled bool
108
clusterAdvAddr string
109
clusterJoinAddr string
110
}
111
112
func (fr *flowRun) Run(configFile string) error {
113
var wg sync.WaitGroup
114
defer wg.Wait()
115
116
ctx, cancel := interruptContext()
117
defer cancel()
118
119
if configFile == "" {
120
return fmt.Errorf("file argument not provided")
121
}
122
123
logSink, err := logging.WriterSink(os.Stderr, logging.DefaultSinkOptions)
124
if err != nil {
125
return fmt.Errorf("building logger: %w", err)
126
}
127
l := logging.New(logSink)
128
129
t, err := tracing.New(tracing.DefaultOptions)
130
if err != nil {
131
return fmt.Errorf("building tracer: %w", err)
132
}
133
134
// Set the global tracer provider to catch global traces, but ideally things
135
// use the tracer provider given to them so the appropriate attributes get
136
// injected.
137
otel.SetTracerProvider(t)
138
139
// Immediately start the tracer.
140
go func() {
141
err := t.Run(ctx)
142
if err != nil {
143
level.Error(l).Log("msg", "running tracer returned an error", "err", err)
144
}
145
}()
146
147
// TODO(rfratto): many of the dependencies we import register global metrics,
148
// even when their code isn't being used. To reduce the number of series
149
// generated by the agent, we should switch to a custom registry.
150
//
151
// Before doing this, we need to ensure that anything using the default
152
// registry that we want to keep can be given a custom registry so desired
153
// metrics are still exposed.
154
reg := prometheus.DefaultRegisterer
155
reg.MustRegister(newResourcesCollector(l))
156
157
clusterer, err := cluster.New(l, reg, fr.clusterEnabled, fr.httpListenAddr, fr.clusterAdvAddr, fr.clusterJoinAddr)
158
if err != nil {
159
return fmt.Errorf("building clusterer: %w", err)
160
}
161
162
// In-memory listener, used for inner HTTP traffic without the network.
163
memLis := memconn.NewListener(nil)
164
165
f := flow.New(flow.Options{
166
LogSink: logSink,
167
Tracer: t,
168
Clusterer: clusterer,
169
DataPath: fr.storagePath,
170
Reg: reg,
171
HTTPPathPrefix: "/api/v0/component/",
172
HTTPListenAddr: fr.inMemoryAddr,
173
174
// Send requests to fr.inMemoryAddr directly to our in-memory listener.
175
DialFunc: func(ctx context.Context, network, address string) (net.Conn, error) {
176
switch address {
177
case fr.inMemoryAddr:
178
return memLis.DialContext(ctx)
179
default:
180
return (&net.Dialer{}).DialContext(ctx, network, address)
181
}
182
},
183
})
184
185
reload := func() error {
186
flowCfg, err := loadFlowFile(configFile)
187
defer instrumentation.InstrumentLoad(err == nil)
188
189
if err != nil {
190
return fmt.Errorf("reading config file %q: %w", configFile, err)
191
}
192
if err := f.LoadFile(flowCfg, nil); err != nil {
193
return fmt.Errorf("error during the initial gragent load: %w", err)
194
}
195
196
return nil
197
}
198
199
// Flow controller
200
{
201
wg.Add(1)
202
go func() {
203
defer wg.Done()
204
f.Run(ctx)
205
}()
206
}
207
208
// HTTP server
209
{
210
// Network listener.
211
netLis, err := net.Listen("tcp", fr.httpListenAddr)
212
if err != nil {
213
return fmt.Errorf("failed to listen on %s: %w", fr.httpListenAddr, err)
214
}
215
216
r := mux.NewRouter()
217
r.Use(otelmux.Middleware(
218
"grafana-agent",
219
otelmux.WithTracerProvider(t),
220
))
221
222
r.Handle("/metrics", promhttp.Handler())
223
r.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
224
r.PathPrefix("/api/v0/component/{id}/").Handler(f.ComponentHandler())
225
226
// Register routes for the clusterer.
227
cr, ch := clusterer.Node.Handler()
228
r.PathPrefix(cr).Handler(ch)
229
230
r.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) {
231
if f.Ready() {
232
w.WriteHeader(http.StatusOK)
233
fmt.Fprintf(w, "Agent is Ready.\n")
234
} else {
235
w.WriteHeader(http.StatusServiceUnavailable)
236
fmt.Fprint(w, "Config failed to load.\n")
237
}
238
})
239
240
r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
241
level.Info(l).Log("msg", "reload requested via /-/reload endpoint")
242
defer level.Info(l).Log("msg", "config reloaded")
243
244
err := reload()
245
if err != nil {
246
http.Error(w, err.Error(), http.StatusBadRequest)
247
return
248
}
249
fmt.Fprintln(w, "config reloaded")
250
}).Methods(http.MethodGet, http.MethodPost)
251
252
// Register Routes must be the last
253
fa := api.NewFlowAPI(f, r)
254
fa.RegisterRoutes(path.Join(fr.uiPrefix, "/api/v0/web"), r)
255
256
// NOTE(rfratto): keep this at the bottom of all other routes, otherwise it
257
// will take precedence over anything else mapped in uiPrefix.
258
ui.RegisterRoutes(fr.uiPrefix, r)
259
260
srv := &http.Server{Handler: h2c.NewHandler(r, &http2.Server{})}
261
262
level.Info(l).Log("msg", "now listening for http traffic", "addr", fr.httpListenAddr)
263
264
listeners := []net.Listener{netLis, memLis}
265
for _, lis := range listeners {
266
wg.Add(1)
267
go func(lis net.Listener) {
268
defer wg.Done()
269
defer cancel()
270
271
if err := srv.Serve(lis); err != nil {
272
level.Info(l).Log("msg", "http server closed", "addr", lis.Addr(), "err", err)
273
}
274
}(lis)
275
}
276
277
defer func() { _ = srv.Shutdown(ctx) }()
278
}
279
280
// Report usage of enabled components
281
if !fr.disableReporting {
282
reporter, err := usagestats.NewReporter(l)
283
if err != nil {
284
return fmt.Errorf("failed to create reporter: %w", err)
285
}
286
go func() {
287
err := reporter.Start(ctx, getEnabledComponentsFunc(f))
288
if err != nil {
289
level.Error(l).Log("msg", "failed to start reporter", "err", err)
290
}
291
}()
292
}
293
294
// Perform the initial reload. This is done after starting the HTTP server so
295
// that /metric and pprof endpoints are available while the Flow controller
296
// is loading.
297
if err := reload(); err != nil {
298
var diags diag.Diagnostics
299
if errors.As(err, &diags) {
300
bb, _ := os.ReadFile(configFile)
301
302
p := diag.NewPrinter(diag.PrinterConfig{
303
Color: !color.NoColor,
304
ContextLinesBefore: 1,
305
ContextLinesAfter: 1,
306
})
307
_ = p.Fprint(os.Stderr, map[string][]byte{configFile: bb}, diags)
308
309
// Print newline after the diagnostics.
310
fmt.Println()
311
312
return fmt.Errorf("could not perform the initial load successfully")
313
}
314
315
// Exit if the initial load files
316
return err
317
}
318
319
reloadSignal := make(chan os.Signal, 1)
320
signal.Notify(reloadSignal, syscall.SIGHUP)
321
defer signal.Stop(reloadSignal)
322
323
for {
324
select {
325
case <-ctx.Done():
326
return nil
327
case <-reloadSignal:
328
if err := reload(); err != nil {
329
level.Error(l).Log("msg", "failed to reload config", "err", err)
330
} else {
331
level.Info(l).Log("msg", "config reloaded")
332
}
333
}
334
}
335
}
336
337
// getEnabledComponentsFunc returns a function that gets the current enabled components
338
func getEnabledComponentsFunc(f *flow.Flow) func() map[string]interface{} {
339
return func() map[string]interface{} {
340
infos := f.ComponentInfos()
341
components := map[string]struct{}{}
342
for _, info := range infos {
343
components[info.Name] = struct{}{}
344
}
345
return map[string]interface{}{"enabled-components": maps.Keys(components)}
346
}
347
}
348
349
func loadFlowFile(filename string) (*flow.File, error) {
350
bb, err := os.ReadFile(filename)
351
if err != nil {
352
return nil, err
353
}
354
355
instrumentation.InstrumentConfig(bb)
356
357
return flow.ReadFile(filename, bb)
358
}
359
360
func interruptContext() (context.Context, context.CancelFunc) {
361
ctx, cancel := context.WithCancel(context.Background())
362
363
go func() {
364
defer cancel()
365
sig := make(chan os.Signal, 1)
366
signal.Notify(sig, os.Interrupt)
367
select {
368
case <-sig:
369
case <-ctx.Done():
370
}
371
signal.Stop(sig)
372
373
fmt.Fprintln(os.Stderr, "interrupt received")
374
}()
375
376
return ctx, cancel
377
}
378
379