Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/module/module.go
4094 views
1
package module
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"path"
8
"sync"
9
"time"
10
11
"github.com/gorilla/mux"
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/pkg/flow"
14
"github.com/grafana/agent/pkg/flow/logging"
15
"github.com/grafana/agent/pkg/flow/tracing"
16
"github.com/grafana/agent/web/api"
17
"github.com/prometheus/client_golang/prometheus"
18
)
19
20
// ModuleComponent holds the common properties for module components.
21
type ModuleComponent struct {
22
opts component.Options
23
ctrl *flow.Flow
24
25
mut sync.RWMutex
26
health component.Health
27
}
28
29
// Exports holds values which are exported from the run module.
30
type Exports struct {
31
// Exports exported from the running module.
32
Exports map[string]any `river:"exports,block"`
33
}
34
35
// NewModuleComponent initializes a new ModuleComponent.
36
func NewModuleComponent(o component.Options) *ModuleComponent {
37
// TODO(rfratto): replace these with a tracer/registry which properly
38
// propagates data back to the parent.
39
flowTracer, _ := tracing.New(tracing.DefaultOptions)
40
flowRegistry := prometheus.NewRegistry()
41
42
return &ModuleComponent{
43
opts: o,
44
ctrl: flow.New(flow.Options{
45
ControllerID: o.ID,
46
LogSink: logging.LoggerSink(o.Logger),
47
Tracer: flowTracer,
48
Reg: flowRegistry,
49
Clusterer: o.Clusterer,
50
51
DataPath: o.DataPath,
52
HTTPPathPrefix: o.HTTPPath,
53
HTTPListenAddr: o.HTTPListenAddr,
54
55
OnExportsChange: func(exports map[string]any) {
56
o.OnStateChange(Exports{Exports: exports})
57
},
58
}),
59
}
60
}
61
62
// LoadFlowContent loads the flow controller with the current component content. It
63
// will set the component health in addition to return the error so that the consumer
64
// can rely on either or both.
65
func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue string) error {
66
f, err := flow.ReadFile(c.opts.ID, []byte(contentValue))
67
if err != nil {
68
c.setHealth(component.Health{
69
Health: component.HealthTypeUnhealthy,
70
Message: fmt.Sprintf("failed to parse module content: %s", err),
71
UpdateTime: time.Now(),
72
})
73
74
return err
75
}
76
77
err = c.ctrl.LoadFile(f, args)
78
if err != nil {
79
c.setHealth(component.Health{
80
Health: component.HealthTypeUnhealthy,
81
Message: fmt.Sprintf("failed to load module content: %s", err),
82
UpdateTime: time.Now(),
83
})
84
85
return err
86
}
87
88
c.setHealth(component.Health{
89
Health: component.HealthTypeHealthy,
90
Message: "module content loaded",
91
UpdateTime: time.Now(),
92
})
93
return nil
94
}
95
96
// RunFlowController runs the flow controller that all module components start.
97
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
98
c.ctrl.Run(ctx)
99
}
100
101
// CurrentHealth contains the implementation details for CurrentHealth in a module component.
102
func (c *ModuleComponent) CurrentHealth() component.Health {
103
c.mut.RLock()
104
defer c.mut.RUnlock()
105
return c.health
106
}
107
108
// SetHealth contains the implementation details for setHealth in a module component.
109
func (c *ModuleComponent) setHealth(h component.Health) {
110
c.mut.Lock()
111
defer c.mut.Unlock()
112
c.health = h
113
}
114
115
// Handler contains the implementation details for Handler in a module component.
116
func (c *ModuleComponent) Handler() http.Handler {
117
r := mux.NewRouter()
118
119
fa := api.NewFlowAPI(c.ctrl, r)
120
fa.RegisterRoutes("/", r)
121
122
r.PathPrefix("/{id}/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123
// Re-add the full path to ensure that nested controllers propagate
124
// requests properly.
125
r.URL.Path = path.Join(c.opts.HTTPPath, r.URL.Path)
126
127
c.ctrl.ComponentHandler().ServeHTTP(w, r)
128
})
129
130
return r
131
}
132
133