Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/util/k8s/resources.go
4096 views
1
package k8s
2
3
import (
4
"context"
5
"fmt"
6
"io"
7
"os"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/grafana/dskit/backoff"
13
"sigs.k8s.io/controller-runtime/pkg/client"
14
)
15
16
// ResourceSet deploys a set of temporary objects to a k8s test cluster and
17
// deletes them when Stop is called.
18
type ResourceSet struct {
19
log log.Logger
20
kubeClient client.Client
21
objects []client.Object
22
}
23
24
// NewResourceSet returns a new resource set.
25
func NewResourceSet(l log.Logger, cluster *Cluster) *ResourceSet {
26
return &ResourceSet{
27
log: l,
28
kubeClient: cluster.Client(),
29
}
30
}
31
32
// Add will read from r and deploy the resources into the cluster.
33
func (rs *ResourceSet) Add(ctx context.Context, r io.Reader) error {
34
readObjects, err := ReadObjects(r, rs.kubeClient)
35
if err != nil {
36
return fmt.Errorf("error reading fixture: %w", err)
37
}
38
err = CreateObjects(ctx, rs.kubeClient, readObjects...)
39
if err != nil {
40
return err
41
}
42
43
rs.objects = append(rs.objects, readObjects...)
44
return nil
45
}
46
47
// AddFile will open filename and deploy it into the cluster.
48
func (rs *ResourceSet) AddFile(ctx context.Context, filename string) error {
49
f, err := os.Open(filename)
50
if err != nil {
51
return fmt.Errorf("failed to open %q: %w", filename, err)
52
}
53
defer f.Close()
54
return rs.Add(ctx, f)
55
}
56
57
// Wait waits until all of the ResourceSet's objects can be found.
58
func (rs *ResourceSet) Wait(ctx context.Context) error {
59
bo := backoff.New(ctx, backoff.Config{
60
MinBackoff: 10 * time.Millisecond,
61
MaxBackoff: 100 * time.Second,
62
})
63
64
check := func() error {
65
for _, obj := range rs.objects {
66
key := client.ObjectKeyFromObject(obj)
67
68
clone := obj.DeepCopyObject().(client.Object)
69
if err := rs.kubeClient.Get(ctx, key, clone); err != nil {
70
return fmt.Errorf("failed to get %s: %w", key, err)
71
}
72
}
73
74
return nil
75
}
76
77
for bo.Ongoing() {
78
err := check()
79
if err == nil {
80
return nil
81
}
82
83
level.Debug(rs.log).Log("msg", "not all resources are available; waiting", "err", err)
84
bo.Wait()
85
}
86
87
return bo.Err()
88
}
89
90
// Stop removes deployed resources from the cluster.
91
func (rs *ResourceSet) Stop() {
92
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
93
defer cancel()
94
95
for _, obj := range rs.objects {
96
err := rs.kubeClient.Delete(ctx, obj)
97
if err != nil {
98
level.Error(rs.log).Log("msg", "failed to delete object", "obj", client.ObjectKeyFromObject(obj), "err", err)
99
}
100
}
101
}
102
103