Path: blob/main/component/prometheus/receive_http/receive_http.go
4094 views
package receive_http12import (3"context"4"fmt"5"net/http"6"reflect"7"sync"89"github.com/go-kit/log/level"10"github.com/gorilla/mux"11"github.com/grafana/agent/component"12fnet "github.com/grafana/agent/component/common/net"13agentprom "github.com/grafana/agent/component/prometheus"14"github.com/grafana/agent/pkg/util"15"github.com/prometheus/client_golang/prometheus"16"github.com/prometheus/prometheus/storage"17"github.com/prometheus/prometheus/storage/remote"18)1920func init() {21component.Register(component.Registration{22Name: "prometheus.receive_http",23Args: Arguments{},24Build: func(opts component.Options, args component.Arguments) (component.Component, error) {25return New(opts, args.(Arguments))26},27})28}2930type Arguments struct {31Server *fnet.ServerConfig `river:",squash"`32ForwardTo []storage.Appendable `river:"forward_to,attr"`33}3435type Component struct {36opts component.Options37handler http.Handler38fanout *agentprom.Fanout39uncheckedCollector *util.UncheckedCollector4041updateMut sync.RWMutex42args Arguments43server *fnet.TargetServer44}4546func New(opts component.Options, args Arguments) (component.Component, error) {47fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer)4849uncheckedCollector := util.NewUncheckedCollector(nil)50opts.Registerer.MustRegister(uncheckedCollector)5152c := &Component{53opts: opts,54handler: remote.NewWriteHandler(opts.Logger, fanout),55fanout: fanout,56uncheckedCollector: uncheckedCollector,57}5859if err := c.Update(args); err != nil {60return nil, err61}62return c, nil63}6465// Run satisfies the Component interface.66func (c *Component) Run(ctx context.Context) error {67defer func() {68c.updateMut.Lock()69defer c.updateMut.Unlock()70c.shutdownServer()71}()7273<-ctx.Done()74level.Info(c.opts.Logger).Log("msg", "terminating due to context done")75return nil76}7778// Update satisfies the Component interface.79func (c *Component) Update(args component.Arguments) error {80newArgs := args.(Arguments)81c.fanout.UpdateChildren(newArgs.ForwardTo)8283c.updateMut.Lock()84defer c.updateMut.Unlock()8586serverNeedsUpdate := !reflect.DeepEqual(c.args.Server, newArgs.Server)87if !serverNeedsUpdate {88c.args = newArgs89return nil90}91c.shutdownServer()9293err, s := c.createNewServer(newArgs)94if err != nil {95return err96}97c.server = s9899err = c.server.MountAndRun(func(router *mux.Router) {100router.Path("/api/v1/metrics/write").Methods("POST").Handler(c.handler)101})102if err != nil {103return err104}105106c.args = newArgs107return nil108}109110func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) {111// [server.Server] registers new metrics every time it is created. To112// avoid issues with re-registering metrics with the same name, we create a113// new registry for the server every time we create one, and pass it to an114// unchecked collector to bypass uniqueness checking.115serverRegistry := prometheus.NewRegistry()116c.uncheckedCollector.SetCollector(serverRegistry)117118s, err := fnet.NewTargetServer(119c.opts.Logger,120"prometheus_receive_http",121serverRegistry,122args.Server,123)124if err != nil {125return fmt.Errorf("failed to create server: %v", err), nil126}127128return nil, s129}130131// shutdownServer will shut down the currently used server.132// It is not goroutine-safe and an updateMut write lock must be held when it's called.133func (c *Component) shutdownServer() {134if c.server != nil {135c.server.StopAndShutdown()136c.server = nil137}138}139140141