Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/receive_http/receive_http.go
4094 views
1
package receive_http
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"reflect"
8
"sync"
9
10
"github.com/go-kit/log/level"
11
"github.com/gorilla/mux"
12
"github.com/grafana/agent/component"
13
fnet "github.com/grafana/agent/component/common/net"
14
agentprom "github.com/grafana/agent/component/prometheus"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/prometheus/prometheus/storage"
18
"github.com/prometheus/prometheus/storage/remote"
19
)
20
21
func init() {
22
component.Register(component.Registration{
23
Name: "prometheus.receive_http",
24
Args: Arguments{},
25
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
26
return New(opts, args.(Arguments))
27
},
28
})
29
}
30
31
type Arguments struct {
32
Server *fnet.ServerConfig `river:",squash"`
33
ForwardTo []storage.Appendable `river:"forward_to,attr"`
34
}
35
36
type Component struct {
37
opts component.Options
38
handler http.Handler
39
fanout *agentprom.Fanout
40
uncheckedCollector *util.UncheckedCollector
41
42
updateMut sync.RWMutex
43
args Arguments
44
server *fnet.TargetServer
45
}
46
47
func New(opts component.Options, args Arguments) (component.Component, error) {
48
fanout := agentprom.NewFanout(args.ForwardTo, opts.ID, opts.Registerer)
49
50
uncheckedCollector := util.NewUncheckedCollector(nil)
51
opts.Registerer.MustRegister(uncheckedCollector)
52
53
c := &Component{
54
opts: opts,
55
handler: remote.NewWriteHandler(opts.Logger, fanout),
56
fanout: fanout,
57
uncheckedCollector: uncheckedCollector,
58
}
59
60
if err := c.Update(args); err != nil {
61
return nil, err
62
}
63
return c, nil
64
}
65
66
// Run satisfies the Component interface.
67
func (c *Component) Run(ctx context.Context) error {
68
defer func() {
69
c.updateMut.Lock()
70
defer c.updateMut.Unlock()
71
c.shutdownServer()
72
}()
73
74
<-ctx.Done()
75
level.Info(c.opts.Logger).Log("msg", "terminating due to context done")
76
return nil
77
}
78
79
// Update satisfies the Component interface.
80
func (c *Component) Update(args component.Arguments) error {
81
newArgs := args.(Arguments)
82
c.fanout.UpdateChildren(newArgs.ForwardTo)
83
84
c.updateMut.Lock()
85
defer c.updateMut.Unlock()
86
87
serverNeedsUpdate := !reflect.DeepEqual(c.args.Server, newArgs.Server)
88
if !serverNeedsUpdate {
89
c.args = newArgs
90
return nil
91
}
92
c.shutdownServer()
93
94
err, s := c.createNewServer(newArgs)
95
if err != nil {
96
return err
97
}
98
c.server = s
99
100
err = c.server.MountAndRun(func(router *mux.Router) {
101
router.Path("/api/v1/metrics/write").Methods("POST").Handler(c.handler)
102
})
103
if err != nil {
104
return err
105
}
106
107
c.args = newArgs
108
return nil
109
}
110
111
func (c *Component) createNewServer(args Arguments) (error, *fnet.TargetServer) {
112
// [server.Server] registers new metrics every time it is created. To
113
// avoid issues with re-registering metrics with the same name, we create a
114
// new registry for the server every time we create one, and pass it to an
115
// unchecked collector to bypass uniqueness checking.
116
serverRegistry := prometheus.NewRegistry()
117
c.uncheckedCollector.SetCollector(serverRegistry)
118
119
s, err := fnet.NewTargetServer(
120
c.opts.Logger,
121
"prometheus_receive_http",
122
serverRegistry,
123
args.Server,
124
)
125
if err != nil {
126
return fmt.Errorf("failed to create server: %v", err), nil
127
}
128
129
return nil, s
130
}
131
132
// shutdownServer will shut down the currently used server.
133
// It is not goroutine-safe and an updateMut write lock must be held when it's called.
134
func (c *Component) shutdownServer() {
135
if c.server != nil {
136
c.server.StopAndShutdown()
137
c.server = nil
138
}
139
}
140
141