Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/remote/http/http.go
4095 views
1
// Package http implements the remote.http component.
2
package http
3
4
import (
5
"context"
6
"fmt"
7
"io"
8
"net/http"
9
"strings"
10
"sync"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/grafana/agent/component"
15
common_config "github.com/grafana/agent/component/common/config"
16
"github.com/grafana/agent/pkg/build"
17
"github.com/grafana/agent/pkg/river"
18
"github.com/grafana/agent/pkg/river/rivertypes"
19
prom_config "github.com/prometheus/common/config"
20
)
21
22
var userAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
23
24
func init() {
25
component.Register(component.Registration{
26
Name: "remote.http",
27
Args: Arguments{},
28
Exports: Exports{},
29
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
30
return New(opts, args.(Arguments))
31
},
32
})
33
}
34
35
// Arguments control the remote.http component.
36
type Arguments struct {
37
URL string `river:"url,attr"`
38
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
39
PollTimeout time.Duration `river:"poll_timeout,attr,optional"`
40
IsSecret bool `river:"is_secret,attr,optional"`
41
42
Method string `river:"method,attr,optional"`
43
Headers map[string]string `river:"headers,attr,optional"`
44
45
Client common_config.HTTPClientConfig `river:"client,block,optional"`
46
}
47
48
// DefaultArguments holds default settings for Arguments.
49
var DefaultArguments = Arguments{
50
PollFrequency: 1 * time.Minute,
51
PollTimeout: 10 * time.Second,
52
Client: common_config.DefaultHTTPClientConfig,
53
Method: http.MethodGet,
54
}
55
56
var _ river.Unmarshaler = (*Arguments)(nil)
57
58
// UnmarshalRiver implements river.Unmarshaler.
59
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
60
*args = DefaultArguments
61
62
type arguments Arguments
63
if err := f((*arguments)(args)); err != nil {
64
return err
65
}
66
67
if args.PollFrequency <= 0 {
68
return fmt.Errorf("poll_frequency must be greater than 0")
69
}
70
if args.PollTimeout <= 0 {
71
return fmt.Errorf("poll_timeout must be greater than 0")
72
}
73
if args.PollTimeout >= args.PollFrequency {
74
return fmt.Errorf("poll_timeout must be less than poll_frequency")
75
}
76
77
if _, err := http.NewRequest(args.Method, args.URL, nil); err != nil {
78
return err
79
}
80
81
return nil
82
}
83
84
// Exports holds settings exported by remote.http.
85
type Exports struct {
86
Content rivertypes.OptionalSecret `river:"content,attr"`
87
}
88
89
// Component implements the remote.http component.
90
type Component struct {
91
log log.Logger
92
opts component.Options
93
94
mut sync.Mutex
95
args Arguments
96
cli *http.Client
97
lastPoll time.Time
98
lastExports Exports // Used for determining whether exports should be updated
99
100
// Updated is written to whenever args updates.
101
updated chan struct{}
102
103
healthMut sync.RWMutex
104
health component.Health
105
}
106
107
var (
108
_ component.Component = (*Component)(nil)
109
_ component.HealthComponent = (*Component)(nil)
110
)
111
112
// New returns a new, unstarted, remote.http component.
113
func New(opts component.Options, args Arguments) (*Component, error) {
114
c := &Component{
115
log: opts.Logger,
116
opts: opts,
117
118
updated: make(chan struct{}, 1),
119
120
health: component.Health{
121
Health: component.HealthTypeUnknown,
122
Message: "component started",
123
UpdateTime: time.Now(),
124
},
125
}
126
127
if err := c.Update(args); err != nil {
128
return nil, err
129
}
130
return c, nil
131
}
132
133
// Run starts the remote.http component.
134
func (c *Component) Run(ctx context.Context) error {
135
for {
136
select {
137
case <-ctx.Done():
138
return nil
139
case <-time.After(c.nextPoll()):
140
c.poll()
141
case <-c.updated:
142
// no-op; force the next wait to be reread.
143
}
144
}
145
}
146
147
// nextPoll returns how long to wait to poll given the last time a
148
// poll occurred. nextPoll returns 0 if a poll should occur immediately.
149
func (c *Component) nextPoll() time.Duration {
150
c.mut.Lock()
151
defer c.mut.Unlock()
152
153
nextPoll := c.lastPoll.Add(c.args.PollFrequency)
154
now := time.Now()
155
156
if now.After(nextPoll) {
157
// Poll immediately; next poll period was in the past.
158
return 0
159
}
160
return nextPoll.Sub(now)
161
}
162
163
// poll performs a HTTP GET for the component's configured URL. c.mut must
164
// not be held when calling. After polling, the component's health is updated
165
// with the success or failure status.
166
func (c *Component) poll() {
167
startTime := time.Now()
168
err := c.pollError()
169
170
// NOTE(rfratto): to prevent the health from being inaccessible for longer
171
// than is needed, only update the health after the poll finished.
172
c.healthMut.Lock()
173
defer c.healthMut.Unlock()
174
175
if err == nil {
176
c.health = component.Health{
177
Health: component.HealthTypeHealthy,
178
Message: "polled endpoint",
179
UpdateTime: startTime,
180
}
181
} else {
182
c.health = component.Health{
183
Health: component.HealthTypeUnhealthy,
184
Message: fmt.Sprintf("polling failed: %s", err),
185
UpdateTime: startTime,
186
}
187
}
188
}
189
190
// pollError is like poll but returns an error if one occurred.
191
func (c *Component) pollError() error {
192
c.mut.Lock()
193
defer c.mut.Unlock()
194
195
c.lastPoll = time.Now()
196
197
ctx, cancel := context.WithTimeout(context.Background(), c.args.PollTimeout)
198
defer cancel()
199
200
req, err := http.NewRequest(c.args.Method, c.args.URL, nil)
201
if err != nil {
202
return fmt.Errorf("building request: %w", err)
203
}
204
for name, value := range c.args.Headers {
205
req.Header.Set(name, value)
206
}
207
req = req.WithContext(ctx)
208
209
resp, err := c.cli.Do(req)
210
if err != nil {
211
return fmt.Errorf("performing request: %w", err)
212
}
213
214
bb, err := io.ReadAll(resp.Body)
215
if err != nil {
216
return fmt.Errorf("reading response: %w", err)
217
}
218
219
if resp.StatusCode != http.StatusOK {
220
return fmt.Errorf("unexpected status code %s", resp.Status)
221
}
222
223
stringContent := strings.TrimSpace(string(bb))
224
225
newExports := Exports{
226
Content: rivertypes.OptionalSecret{
227
IsSecret: c.args.IsSecret,
228
Value: stringContent,
229
},
230
}
231
232
// Only send a state change event if the exports have changed from the
233
// previous poll.
234
if c.lastExports != newExports {
235
c.opts.OnStateChange(newExports)
236
}
237
c.lastExports = newExports
238
return nil
239
}
240
241
// Update updates the remote.http component. After the update completes, a
242
// poll is forced.
243
func (c *Component) Update(args component.Arguments) (err error) {
244
// poll after updating. If an error occurred during Update, we don't bother
245
// to do anything.
246
defer func() {
247
if err != nil {
248
return
249
}
250
c.poll()
251
}()
252
253
c.mut.Lock()
254
defer c.mut.Unlock()
255
256
newArgs := args.(Arguments)
257
c.args = newArgs
258
259
cli, err := prom_config.NewClientFromConfig(
260
*newArgs.Client.Convert(),
261
c.opts.ID,
262
prom_config.WithUserAgent(userAgent),
263
)
264
if err != nil {
265
return err
266
}
267
c.cli = cli
268
269
// Send an updated event if one wasn't already read.
270
select {
271
case c.updated <- struct{}{}:
272
default:
273
}
274
return nil
275
}
276
277
// CurrentHealth returns the current health of the component.
278
func (c *Component) CurrentHealth() component.Health {
279
c.healthMut.RLock()
280
defer c.healthMut.RUnlock()
281
return c.health
282
}
283
284