Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/remote/s3/s3.go
4095 views
1
package s3
2
3
import (
4
"crypto/tls"
5
"fmt"
6
"net/http"
7
"strings"
8
"sync"
9
"time"
10
11
"context"
12
13
"github.com/aws/aws-sdk-go-v2/aws"
14
aws_config "github.com/aws/aws-sdk-go-v2/config"
15
"github.com/aws/aws-sdk-go-v2/service/s3"
16
"github.com/grafana/agent/component"
17
"github.com/grafana/agent/pkg/river/rivertypes"
18
"github.com/prometheus/client_golang/prometheus"
19
)
20
21
func init() {
22
component.Register(component.Registration{
23
Name: "remote.s3",
24
Args: Arguments{},
25
Exports: Exports{},
26
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
27
return New(opts, args.(Arguments))
28
},
29
})
30
}
31
32
// S3 handles reading content from a file located in an S3-compatible system.
33
type S3 struct {
34
mut sync.Mutex
35
opts component.Options
36
args Arguments
37
health component.Health
38
content string
39
40
watcher *watcher
41
updateChan chan result
42
s3Errors prometheus.Counter
43
lastAccessed prometheus.Gauge
44
}
45
46
var (
47
_ component.Component = (*S3)(nil)
48
_ component.HealthComponent = (*S3)(nil)
49
)
50
51
// New initializes the S3 component.
52
func New(o component.Options, args Arguments) (*S3, error) {
53
s3cfg, err := generateS3Config(args)
54
if err != nil {
55
return nil, err
56
}
57
58
s3Client := s3.NewFromConfig(*s3cfg, func(s3o *s3.Options) {
59
s3o.UsePathStyle = args.Options.UsePathStyle
60
})
61
62
bucket, file := getPathBucketAndFile(args.Path)
63
s := &S3{
64
opts: o,
65
args: args,
66
health: component.Health{},
67
updateChan: make(chan result),
68
s3Errors: prometheus.NewCounter(prometheus.CounterOpts{
69
Name: "agent_remote_s3_errors_total",
70
Help: "The number of errors while accessing s3",
71
}),
72
lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{
73
Name: "agent_remote_s3_timestamp_last_accessed_unix_seconds",
74
Help: "The last successful access in unix seconds",
75
}),
76
}
77
78
w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client)
79
s.watcher = w
80
81
err = o.Registerer.Register(s.s3Errors)
82
if err != nil {
83
return nil, err
84
}
85
err = o.Registerer.Register(s.lastAccessed)
86
if err != nil {
87
return nil, err
88
}
89
90
content, err := w.downloadSynchronously()
91
s.handleContentPolling(content, err)
92
return s, nil
93
}
94
95
// Run activates the content handler and watcher.
96
func (s *S3) Run(ctx context.Context) error {
97
go s.handleContentUpdate(ctx)
98
go s.watcher.run(ctx)
99
<-ctx.Done()
100
101
return nil
102
}
103
104
// Update is called whenever the arguments have changed.
105
func (s *S3) Update(args component.Arguments) error {
106
newArgs := args.(Arguments)
107
108
s3cfg, err := generateS3Config(newArgs)
109
if err != nil {
110
return nil
111
}
112
s3Client := s3.NewFromConfig(*s3cfg, func(s3o *s3.Options) {
113
s3o.UsePathStyle = newArgs.Options.UsePathStyle
114
})
115
116
bucket, file := getPathBucketAndFile(newArgs.Path)
117
118
s.mut.Lock()
119
defer s.mut.Unlock()
120
s.args = newArgs
121
s.watcher.updateValues(bucket, file, newArgs.PollFrequency, s3Client)
122
123
return nil
124
}
125
126
// CurrentHealth returns the health of the component.
127
func (s *S3) CurrentHealth() component.Health {
128
s.mut.Lock()
129
defer s.mut.Unlock()
130
return s.health
131
}
132
133
func generateS3Config(args Arguments) (*aws.Config, error) {
134
configOptions := make([]func(*aws_config.LoadOptions) error, 0)
135
// Override the endpoint.
136
if args.Options.Endpoint != "" {
137
endFunc := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) {
138
return aws.Endpoint{URL: args.Options.Endpoint}, nil
139
})
140
endResolver := aws_config.WithEndpointResolverWithOptions(endFunc)
141
configOptions = append(configOptions, endResolver)
142
}
143
144
// This incredibly nested option turns off SSL.
145
if args.Options.DisableSSL {
146
httpOverride := aws_config.WithHTTPClient(
147
&http.Client{
148
Transport: &http.Transport{
149
TLSClientConfig: &tls.Config{
150
InsecureSkipVerify: args.Options.DisableSSL,
151
},
152
},
153
},
154
)
155
configOptions = append(configOptions, httpOverride)
156
}
157
158
// Check to see if we need to override the credentials, else it will use the default ones.
159
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
160
if args.Options.AccessKey != "" {
161
if args.Options.Secret == "" {
162
return nil, fmt.Errorf("if accesskey or secret are specified then the other must also be specified")
163
}
164
credFunc := aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
165
return aws.Credentials{
166
AccessKeyID: args.Options.AccessKey,
167
SecretAccessKey: string(args.Options.Secret),
168
}, nil
169
})
170
credProvider := aws_config.WithCredentialsProvider(credFunc)
171
configOptions = append(configOptions, credProvider)
172
}
173
174
cfg, err := aws_config.LoadDefaultConfig(context.TODO(), configOptions...)
175
if err != nil {
176
return nil, err
177
}
178
// Set region.
179
if args.Options.Region != "" {
180
cfg.Region = args.Options.Region
181
}
182
183
return &cfg, nil
184
}
185
186
// handleContentUpdate reads from the update and error channels setting as appropriate
187
func (s *S3) handleContentUpdate(ctx context.Context) {
188
for {
189
select {
190
case r := <-s.updateChan:
191
// r.result will never be nil,
192
s.handleContentPolling(string(r.result), r.err)
193
case <-ctx.Done():
194
return
195
}
196
}
197
}
198
199
func (s *S3) handleContentPolling(newContent string, err error) {
200
s.mut.Lock()
201
defer s.mut.Unlock()
202
203
if err == nil {
204
s.opts.OnStateChange(Exports{
205
Content: rivertypes.OptionalSecret{
206
IsSecret: s.args.IsSecret,
207
Value: newContent,
208
},
209
})
210
s.lastAccessed.SetToCurrentTime()
211
s.content = newContent
212
s.health.Health = component.HealthTypeHealthy
213
s.health.Message = "s3 file updated"
214
} else {
215
s.s3Errors.Inc()
216
s.health.Health = component.HealthTypeUnhealthy
217
s.health.Message = err.Error()
218
}
219
s.health.UpdateTime = time.Now()
220
}
221
222
// getPathBucketAndFile takes the path and splits it into a bucket and file.
223
func getPathBucketAndFile(path string) (bucket, file string) {
224
parts := strings.Split(path, "/")
225
file = strings.Join(parts[3:], "/")
226
bucket = strings.Join(parts[:3], "/")
227
bucket = strings.ReplaceAll(bucket, "s3://", "")
228
return
229
}
230
231