Path: blob/main/pkg/integrations/elasticsearch_exporter/elasticsearch_exporter.go
5438 views
// Package elasticsearch_exporter instantiates the exporter from github.com/justwatchcom/elasticsearch_exporter - replaced for github.com/prometheus-community/elasticsearch_exporter1// Using the YAML config provided by the agent2package elasticsearch_exporter //nolint:golint34import (5"context"6"fmt"7"net/http"8"net/url"9"time"1011"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/grafana/agent/pkg/integrations"14integrations_v2 "github.com/grafana/agent/pkg/integrations/v2"15"github.com/grafana/agent/pkg/integrations/v2/metricsutils"16"github.com/prometheus/client_golang/prometheus"1718"github.com/prometheus-community/elasticsearch_exporter/collector"19"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"20)2122// DefaultConfig holds the default settings for the elasticsearch_exporter23// integration.24var DefaultConfig = Config{25Address: "http://localhost:9200",26Timeout: 5 * time.Second,27Node: "_local",28ExportClusterInfoInterval: 5 * time.Minute,29IncludeAliases: true,30}3132// Config controls the elasticsearch_exporter integration.33type Config struct {34// HTTP API address of an Elasticsearch node.35Address string `yaml:"address,omitempty"`36// Timeout for trying to get stats from Elasticsearch.37Timeout time.Duration `yaml:"timeout,omitempty"`38// Export stats for all nodes in the cluster. If used, this flag will override the flag es.node.39AllNodes bool `yaml:"all,omitempty"`40// Node's name of which metrics should be exposed.41Node string `yaml:"node,omitempty"`42// Export stats for indices in the cluster.43ExportIndices bool `yaml:"indices,omitempty"`44// Export stats for settings of all indices of the cluster.45ExportIndicesSettings bool `yaml:"indices_settings,omitempty"`46// Export stats for cluster settings.47ExportClusterSettings bool `yaml:"cluster_settings,omitempty"`48// Export stats for shards in the cluster (implies indices).49ExportShards bool `yaml:"shards,omitempty"`50// Include informational aliases metrics51IncludeAliases bool `yaml:"aliases,omitempty"`52// Export stats for the cluster snapshots.53ExportSnapshots bool `yaml:"snapshots,omitempty"`54// Cluster info update interval for the cluster label.55ExportClusterInfoInterval time.Duration `yaml:"clusterinfo_interval,omitempty"`56// Path to PEM file that contains trusted Certificate Authorities for the Elasticsearch connection.57CA string `yaml:"ca,omitempty"`58// Path to PEM file that contains the private key for client auth when connecting to Elasticsearch.59ClientPrivateKey string `yaml:"client_private_key,omitempty"`60// Path to PEM file that contains the corresponding cert for the private key to connect to Elasticsearch.61ClientCert string `yaml:"client_cert,omitempty"`62// Skip SSL verification when connecting to Elasticsearch.63InsecureSkipVerify bool `yaml:"ssl_skip_verify,omitempty"`64// Export stats for Data Streams65ExportDataStreams bool `yaml:"data_stream,omitempty"`66// Export stats for Snapshot Lifecycle Management67ExportSLM bool `yaml:"slm,omitempty"`68}6970// UnmarshalYAML implements yaml.Unmarshaler for Config71func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {72*c = DefaultConfig7374type plain Config75return unmarshal((*plain)(c))76}7778// Name returns the name of the integration that this config represents.79func (c *Config) Name() string {80return "elasticsearch_exporter"81}8283// InstanceKey returns the hostname:port of the elasticsearch node being queried.84func (c *Config) InstanceKey(agentKey string) (string, error) {85u, err := url.Parse(c.Address)86if err != nil {87return "", fmt.Errorf("could not parse url: %w", err)88}89return u.Host, nil90}9192// NewIntegration creates a new elasticsearch_exporter93func (c *Config) NewIntegration(logger log.Logger) (integrations.Integration, error) {94return New(logger, c)95}9697func init() {98integrations.RegisterIntegration(&Config{})99integrations_v2.RegisterLegacy(&Config{}, integrations_v2.TypeMultiplex, metricsutils.NewNamedShim("elasticsearch"))100}101102// New creates a new elasticsearch_exporter103// This function replicates the main() function of github.com/justwatchcom/elasticsearch_exporter104// but uses yaml configuration instead of kingpin flags.105func New(logger log.Logger, c *Config) (integrations.Integration, error) {106if c.Address == "" {107return nil, fmt.Errorf("empty elasticsearch_address provided")108}109esURL, err := url.Parse(c.Address)110if err != nil {111return nil, fmt.Errorf("failed to parse elasticsearch_address: %w", err)112}113114// returns nil if not provided and falls back to simple TCP.115tlsConfig := createTLSConfig(c.CA, c.ClientCert, c.ClientPrivateKey, c.InsecureSkipVerify)116117httpClient := &http.Client{118Timeout: c.Timeout,119Transport: &http.Transport{120TLSClientConfig: tlsConfig,121Proxy: http.ProxyFromEnvironment,122},123}124125clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, c.ExportClusterInfoInterval)126127collectors := []prometheus.Collector{128clusterInfoRetriever,129collector.NewClusterHealth(logger, httpClient, esURL),130collector.NewNodes(logger, httpClient, esURL, c.AllNodes, c.Node),131}132133if c.ExportIndices || c.ExportShards {134iC := collector.NewIndices(logger, httpClient, esURL, c.ExportShards, c.IncludeAliases)135collectors = append(collectors, iC)136if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {137return nil, fmt.Errorf("failed to register indices collector in cluster info: %w", err)138}139}140141if c.ExportSnapshots {142collectors = append(collectors, collector.NewSnapshots(logger, httpClient, esURL))143}144145if c.ExportClusterSettings {146collectors = append(collectors, collector.NewClusterSettings(logger, httpClient, esURL))147}148149if c.ExportDataStreams {150collectors = append(collectors, collector.NewDataStream(logger, httpClient, esURL))151}152153if c.ExportIndicesSettings {154collectors = append(collectors, collector.NewIndicesSettings(logger, httpClient, esURL))155}156157if c.ExportSLM {158collectors = append(collectors, collector.NewSLM(logger, httpClient, esURL))159}160161start := func(ctx context.Context) error {162// start the cluster info retriever163switch runErr := clusterInfoRetriever.Run(ctx); runErr {164case nil:165level.Info(logger).Log(166"msg", "started cluster info retriever",167"interval", c.ExportClusterInfoInterval.String(),168)169case clusterinfo.ErrInitialCallTimeout:170level.Info(logger).Log("msg", "initial cluster info call timed out")171default:172level.Error(logger).Log("msg", "failed to run cluster info retriever", "err", err)173return err174}175176// Wait until we're done177<-ctx.Done()178return ctx.Err()179}180181return integrations.NewCollectorIntegration(c.Name(),182integrations.WithCollectors(collectors...),183integrations.WithRunner(start),184), nil185}186187188