package module
import (
"context"
"fmt"
"net/http"
"path"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/web/api"
"github.com/prometheus/client_golang/prometheus"
)
type ModuleComponent struct {
opts component.Options
ctrl *flow.Flow
mut sync.RWMutex
health component.Health
}
type Exports struct {
Exports map[string]any `river:"exports,block"`
}
func NewModuleComponent(o component.Options) *ModuleComponent {
flowTracer, _ := tracing.New(tracing.DefaultOptions)
flowRegistry := prometheus.NewRegistry()
return &ModuleComponent{
opts: o,
ctrl: flow.New(flow.Options{
ControllerID: o.ID,
LogSink: logging.LoggerSink(o.Logger),
Tracer: flowTracer,
Reg: flowRegistry,
Clusterer: o.Clusterer,
DataPath: o.DataPath,
HTTPPathPrefix: o.HTTPPath,
HTTPListenAddr: o.HTTPListenAddr,
OnExportsChange: func(exports map[string]any) {
o.OnStateChange(Exports{Exports: exports})
},
}),
}
}
func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue string) error {
f, err := flow.ReadFile(c.opts.ID, []byte(contentValue))
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to parse module content: %s", err),
UpdateTime: time.Now(),
})
return err
}
err = c.ctrl.LoadFile(f, args)
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to load module content: %s", err),
UpdateTime: time.Now(),
})
return err
}
c.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "module content loaded",
UpdateTime: time.Now(),
})
return nil
}
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
c.ctrl.Run(ctx)
}
func (c *ModuleComponent) CurrentHealth() component.Health {
c.mut.RLock()
defer c.mut.RUnlock()
return c.health
}
func (c *ModuleComponent) setHealth(h component.Health) {
c.mut.Lock()
defer c.mut.Unlock()
c.health = h
}
func (c *ModuleComponent) Handler() http.Handler {
r := mux.NewRouter()
fa := api.NewFlowAPI(c.ctrl, r)
fa.RegisterRoutes("/", r)
r.PathPrefix("/{id}/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = path.Join(c.opts.HTTPPath, r.URL.Path)
c.ctrl.ComponentHandler().ServeHTTP(w, r)
})
return r
}