Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/elasticsearch_exporter/elasticsearch_exporter.go
5438 views
1
// Package elasticsearch_exporter instantiates the exporter from github.com/justwatchcom/elasticsearch_exporter - replaced for github.com/prometheus-community/elasticsearch_exporter
2
// Using the YAML config provided by the agent
3
package elasticsearch_exporter //nolint:golint
4
5
import (
6
"context"
7
"fmt"
8
"net/http"
9
"net/url"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/pkg/integrations"
15
integrations_v2 "github.com/grafana/agent/pkg/integrations/v2"
16
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
17
"github.com/prometheus/client_golang/prometheus"
18
19
"github.com/prometheus-community/elasticsearch_exporter/collector"
20
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
21
)
22
23
// DefaultConfig holds the default settings for the elasticsearch_exporter
24
// integration.
25
var DefaultConfig = Config{
26
Address: "http://localhost:9200",
27
Timeout: 5 * time.Second,
28
Node: "_local",
29
ExportClusterInfoInterval: 5 * time.Minute,
30
IncludeAliases: true,
31
}
32
33
// Config controls the elasticsearch_exporter integration.
34
type Config struct {
35
// HTTP API address of an Elasticsearch node.
36
Address string `yaml:"address,omitempty"`
37
// Timeout for trying to get stats from Elasticsearch.
38
Timeout time.Duration `yaml:"timeout,omitempty"`
39
// Export stats for all nodes in the cluster. If used, this flag will override the flag es.node.
40
AllNodes bool `yaml:"all,omitempty"`
41
// Node's name of which metrics should be exposed.
42
Node string `yaml:"node,omitempty"`
43
// Export stats for indices in the cluster.
44
ExportIndices bool `yaml:"indices,omitempty"`
45
// Export stats for settings of all indices of the cluster.
46
ExportIndicesSettings bool `yaml:"indices_settings,omitempty"`
47
// Export stats for cluster settings.
48
ExportClusterSettings bool `yaml:"cluster_settings,omitempty"`
49
// Export stats for shards in the cluster (implies indices).
50
ExportShards bool `yaml:"shards,omitempty"`
51
// Include informational aliases metrics
52
IncludeAliases bool `yaml:"aliases,omitempty"`
53
// Export stats for the cluster snapshots.
54
ExportSnapshots bool `yaml:"snapshots,omitempty"`
55
// Cluster info update interval for the cluster label.
56
ExportClusterInfoInterval time.Duration `yaml:"clusterinfo_interval,omitempty"`
57
// Path to PEM file that contains trusted Certificate Authorities for the Elasticsearch connection.
58
CA string `yaml:"ca,omitempty"`
59
// Path to PEM file that contains the private key for client auth when connecting to Elasticsearch.
60
ClientPrivateKey string `yaml:"client_private_key,omitempty"`
61
// Path to PEM file that contains the corresponding cert for the private key to connect to Elasticsearch.
62
ClientCert string `yaml:"client_cert,omitempty"`
63
// Skip SSL verification when connecting to Elasticsearch.
64
InsecureSkipVerify bool `yaml:"ssl_skip_verify,omitempty"`
65
// Export stats for Data Streams
66
ExportDataStreams bool `yaml:"data_stream,omitempty"`
67
// Export stats for Snapshot Lifecycle Management
68
ExportSLM bool `yaml:"slm,omitempty"`
69
}
70
71
// UnmarshalYAML implements yaml.Unmarshaler for Config
72
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
73
*c = DefaultConfig
74
75
type plain Config
76
return unmarshal((*plain)(c))
77
}
78
79
// Name returns the name of the integration that this config represents.
80
func (c *Config) Name() string {
81
return "elasticsearch_exporter"
82
}
83
84
// InstanceKey returns the hostname:port of the elasticsearch node being queried.
85
func (c *Config) InstanceKey(agentKey string) (string, error) {
86
u, err := url.Parse(c.Address)
87
if err != nil {
88
return "", fmt.Errorf("could not parse url: %w", err)
89
}
90
return u.Host, nil
91
}
92
93
// NewIntegration creates a new elasticsearch_exporter
94
func (c *Config) NewIntegration(logger log.Logger) (integrations.Integration, error) {
95
return New(logger, c)
96
}
97
98
func init() {
99
integrations.RegisterIntegration(&Config{})
100
integrations_v2.RegisterLegacy(&Config{}, integrations_v2.TypeMultiplex, metricsutils.NewNamedShim("elasticsearch"))
101
}
102
103
// New creates a new elasticsearch_exporter
104
// This function replicates the main() function of github.com/justwatchcom/elasticsearch_exporter
105
// but uses yaml configuration instead of kingpin flags.
106
func New(logger log.Logger, c *Config) (integrations.Integration, error) {
107
if c.Address == "" {
108
return nil, fmt.Errorf("empty elasticsearch_address provided")
109
}
110
esURL, err := url.Parse(c.Address)
111
if err != nil {
112
return nil, fmt.Errorf("failed to parse elasticsearch_address: %w", err)
113
}
114
115
// returns nil if not provided and falls back to simple TCP.
116
tlsConfig := createTLSConfig(c.CA, c.ClientCert, c.ClientPrivateKey, c.InsecureSkipVerify)
117
118
httpClient := &http.Client{
119
Timeout: c.Timeout,
120
Transport: &http.Transport{
121
TLSClientConfig: tlsConfig,
122
Proxy: http.ProxyFromEnvironment,
123
},
124
}
125
126
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, c.ExportClusterInfoInterval)
127
128
collectors := []prometheus.Collector{
129
clusterInfoRetriever,
130
collector.NewClusterHealth(logger, httpClient, esURL),
131
collector.NewNodes(logger, httpClient, esURL, c.AllNodes, c.Node),
132
}
133
134
if c.ExportIndices || c.ExportShards {
135
iC := collector.NewIndices(logger, httpClient, esURL, c.ExportShards, c.IncludeAliases)
136
collectors = append(collectors, iC)
137
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
138
return nil, fmt.Errorf("failed to register indices collector in cluster info: %w", err)
139
}
140
}
141
142
if c.ExportSnapshots {
143
collectors = append(collectors, collector.NewSnapshots(logger, httpClient, esURL))
144
}
145
146
if c.ExportClusterSettings {
147
collectors = append(collectors, collector.NewClusterSettings(logger, httpClient, esURL))
148
}
149
150
if c.ExportDataStreams {
151
collectors = append(collectors, collector.NewDataStream(logger, httpClient, esURL))
152
}
153
154
if c.ExportIndicesSettings {
155
collectors = append(collectors, collector.NewIndicesSettings(logger, httpClient, esURL))
156
}
157
158
if c.ExportSLM {
159
collectors = append(collectors, collector.NewSLM(logger, httpClient, esURL))
160
}
161
162
start := func(ctx context.Context) error {
163
// start the cluster info retriever
164
switch runErr := clusterInfoRetriever.Run(ctx); runErr {
165
case nil:
166
level.Info(logger).Log(
167
"msg", "started cluster info retriever",
168
"interval", c.ExportClusterInfoInterval.String(),
169
)
170
case clusterinfo.ErrInitialCallTimeout:
171
level.Info(logger).Log("msg", "initial cluster info call timed out")
172
default:
173
level.Error(logger).Log("msg", "failed to run cluster info retriever", "err", err)
174
return err
175
}
176
177
// Wait until we're done
178
<-ctx.Done()
179
return ctx.Err()
180
}
181
182
return integrations.NewCollectorIntegration(c.Name(),
183
integrations.WithCollectors(collectors...),
184
integrations.WithRunner(start),
185
), nil
186
}
187
188