Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/remote/s3/watcher.go
4096 views
1
package s3
2
3
import (
4
"errors"
5
"io"
6
"sync"
7
"time"
8
9
"github.com/aws/aws-sdk-go/aws"
10
11
"context"
12
13
"github.com/aws/aws-sdk-go-v2/service/s3"
14
)
15
16
type watcher struct {
17
mut sync.Mutex
18
bucket string
19
file string
20
output chan result
21
dlTicker *time.Ticker
22
downloader *s3.Client
23
}
24
25
type result struct {
26
result []byte
27
err error
28
}
29
30
func newWatcher(
31
bucket, file string,
32
out chan result,
33
frequency time.Duration,
34
downloader *s3.Client,
35
) *watcher {
36
37
return &watcher{
38
bucket: bucket,
39
file: file,
40
output: out,
41
dlTicker: time.NewTicker(frequency),
42
downloader: downloader,
43
}
44
}
45
46
func (w *watcher) updateValues(bucket, file string, frequency time.Duration, downloader *s3.Client) {
47
w.mut.Lock()
48
defer w.mut.Unlock()
49
w.bucket = bucket
50
w.file = file
51
w.dlTicker.Reset(frequency)
52
w.downloader = downloader
53
}
54
55
func (w *watcher) run(ctx context.Context) {
56
w.download(ctx)
57
defer w.dlTicker.Stop()
58
for {
59
select {
60
case <-w.dlTicker.C:
61
w.download(ctx)
62
case <-ctx.Done():
63
return
64
}
65
}
66
}
67
68
// download actually downloads the file from s3
69
func (w *watcher) download(ctx context.Context) {
70
w.mut.Lock()
71
defer w.mut.Unlock()
72
buf, err := w.getObject(context.Background())
73
r := result{
74
result: buf,
75
err: err,
76
}
77
select {
78
case <-ctx.Done():
79
return
80
case w.output <- r:
81
}
82
}
83
84
func (w *watcher) downloadSynchronously() (string, error) {
85
w.mut.Lock()
86
defer w.mut.Unlock()
87
buf, err := w.getObject(context.Background())
88
if err != nil {
89
return "", err
90
}
91
return string(buf), nil
92
}
93
94
// getObject ensure that the return []byte is never nil
95
func (w *watcher) getObject(ctx context.Context) ([]byte, error) {
96
output, err := w.downloader.GetObject(ctx, &s3.GetObjectInput{
97
Bucket: aws.String(w.bucket),
98
Key: aws.String(w.file),
99
})
100
if err != nil {
101
return []byte{}, err
102
}
103
buf := make([]byte, output.ContentLength)
104
_, err = output.Body.Read(buf)
105
if !errors.Is(err, io.EOF) {
106
return []byte{}, err
107
}
108
return buf, nil
109
}
110
111