package http
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/grafana/agent/component"
common_config "github.com/grafana/agent/component/common/config"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/river/rivertypes"
prom_config "github.com/prometheus/common/config"
)
var userAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
func init() {
component.Register(component.Registration{
Name: "remote.http",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
type Arguments struct {
URL string `river:"url,attr"`
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
PollTimeout time.Duration `river:"poll_timeout,attr,optional"`
IsSecret bool `river:"is_secret,attr,optional"`
Method string `river:"method,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
Client common_config.HTTPClientConfig `river:"client,block,optional"`
}
var DefaultArguments = Arguments{
PollFrequency: 1 * time.Minute,
PollTimeout: 10 * time.Second,
Client: common_config.DefaultHTTPClientConfig,
Method: http.MethodGet,
}
var _ river.Unmarshaler = (*Arguments)(nil)
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultArguments
type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}
if args.PollFrequency <= 0 {
return fmt.Errorf("poll_frequency must be greater than 0")
}
if args.PollTimeout <= 0 {
return fmt.Errorf("poll_timeout must be greater than 0")
}
if args.PollTimeout >= args.PollFrequency {
return fmt.Errorf("poll_timeout must be less than poll_frequency")
}
if _, err := http.NewRequest(args.Method, args.URL, nil); err != nil {
return err
}
return nil
}
type Exports struct {
Content rivertypes.OptionalSecret `river:"content,attr"`
}
type Component struct {
log log.Logger
opts component.Options
mut sync.Mutex
args Arguments
cli *http.Client
lastPoll time.Time
lastExports Exports
updated chan struct{}
healthMut sync.RWMutex
health component.Health
}
var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)
func New(opts component.Options, args Arguments) (*Component, error) {
c := &Component{
log: opts.Logger,
opts: opts,
updated: make(chan struct{}, 1),
health: component.Health{
Health: component.HealthTypeUnknown,
Message: "component started",
UpdateTime: time.Now(),
},
}
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(c.nextPoll()):
c.poll()
case <-c.updated:
}
}
}
func (c *Component) nextPoll() time.Duration {
c.mut.Lock()
defer c.mut.Unlock()
nextPoll := c.lastPoll.Add(c.args.PollFrequency)
now := time.Now()
if now.After(nextPoll) {
return 0
}
return nextPoll.Sub(now)
}
func (c *Component) poll() {
startTime := time.Now()
err := c.pollError()
c.healthMut.Lock()
defer c.healthMut.Unlock()
if err == nil {
c.health = component.Health{
Health: component.HealthTypeHealthy,
Message: "polled endpoint",
UpdateTime: startTime,
}
} else {
c.health = component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("polling failed: %s", err),
UpdateTime: startTime,
}
}
}
func (c *Component) pollError() error {
c.mut.Lock()
defer c.mut.Unlock()
c.lastPoll = time.Now()
ctx, cancel := context.WithTimeout(context.Background(), c.args.PollTimeout)
defer cancel()
req, err := http.NewRequest(c.args.Method, c.args.URL, nil)
if err != nil {
return fmt.Errorf("building request: %w", err)
}
for name, value := range c.args.Headers {
req.Header.Set(name, value)
}
req = req.WithContext(ctx)
resp, err := c.cli.Do(req)
if err != nil {
return fmt.Errorf("performing request: %w", err)
}
bb, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("reading response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %s", resp.Status)
}
stringContent := strings.TrimSpace(string(bb))
newExports := Exports{
Content: rivertypes.OptionalSecret{
IsSecret: c.args.IsSecret,
Value: stringContent,
},
}
if c.lastExports != newExports {
c.opts.OnStateChange(newExports)
}
c.lastExports = newExports
return nil
}
func (c *Component) Update(args component.Arguments) (err error) {
defer func() {
if err != nil {
return
}
c.poll()
}()
c.mut.Lock()
defer c.mut.Unlock()
newArgs := args.(Arguments)
c.args = newArgs
cli, err := prom_config.NewClientFromConfig(
*newArgs.Client.Convert(),
c.opts.ID,
prom_config.WithUserAgent(userAgent),
)
if err != nil {
return err
}
c.cli = cli
select {
case c.updated <- struct{}{}:
default:
}
return nil
}
func (c *Component) CurrentHealth() component.Health {
c.healthMut.RLock()
defer c.healthMut.RUnlock()
return c.health
}