Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/util/k8s/objects.go
4096 views
1
package k8s
2
3
import (
4
"context"
5
"encoding/json"
6
"errors"
7
"fmt"
8
"io"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/grafana/dskit/backoff"
14
"k8s.io/apimachinery/pkg/runtime/serializer"
15
"k8s.io/apimachinery/pkg/util/yaml"
16
"sigs.k8s.io/controller-runtime/pkg/client"
17
18
apps_v1 "k8s.io/api/apps/v1"
19
core_v1 "k8s.io/api/core/v1"
20
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
21
)
22
23
// CreateObjects will create the provided set of objects. If any object
24
// couldn't be created, an error will be returned and created objects will be
25
// deleted.
26
func CreateObjects(ctx context.Context, cli client.Client, objs ...client.Object) (err error) {
27
// Index offset into objs for objects we managed to create.
28
createdOffset := -1
29
30
defer func() {
31
if err == nil {
32
return
33
}
34
// Delete the subset of objs we managed to create
35
for i := 0; i <= createdOffset; i++ {
36
_ = cli.Delete(context.Background(), objs[i])
37
}
38
}()
39
40
for i, obj := range objs {
41
if err := cli.Create(ctx, obj); err != nil {
42
return fmt.Errorf("failed to create %s: %w", client.ObjectKeyFromObject(obj), err)
43
}
44
createdOffset = i
45
}
46
return nil
47
}
48
49
// ReadObjects will read the set of objects from r and convert them into
50
// client.Object based on the scheme of the provided Kubernetes client.
51
//
52
// The data of r may be YAML or JSON.
53
func ReadObjects(r io.Reader, cli client.Client) ([]client.Object, error) {
54
var (
55
objects []client.Object
56
57
scheme = cli.Scheme()
58
rawDecoder = yaml.NewYAMLOrJSONDecoder(r, 4096)
59
decoder = serializer.NewCodecFactory(scheme).UniversalDecoder(scheme.PrioritizedVersionsAllGroups()...)
60
)
61
62
NextObject:
63
for {
64
var raw json.RawMessage
65
66
err := rawDecoder.Decode(&raw)
67
switch {
68
case errors.Is(err, io.EOF):
69
break NextObject
70
case err != nil:
71
return nil, fmt.Errorf("error parsing object: %w", err)
72
case len(raw) == 0:
73
// Skip over empty objects. This can happen when --- is used at the top
74
// of YAML files.
75
continue NextObject
76
}
77
78
obj, _, err := decoder.Decode(raw, nil, nil)
79
if err != nil {
80
return nil, fmt.Errorf("failed to decode object: %w", err)
81
}
82
clientObj, ok := obj.(client.Object)
83
if !ok {
84
return nil, fmt.Errorf("decoded object %T is not a controller-runtime object", obj)
85
}
86
objects = append(objects, clientObj)
87
}
88
89
return objects, nil
90
}
91
92
// ReadUnstructuredObjects will read the set of objects from r as unstructured
93
// objects.
94
func ReadUnstructuredObjects(r io.Reader) ([]*unstructured.Unstructured, error) {
95
var (
96
objects []*unstructured.Unstructured
97
rawDecoder = yaml.NewYAMLOrJSONDecoder(r, 4096)
98
)
99
100
NextObject:
101
for {
102
var raw json.RawMessage
103
104
err := rawDecoder.Decode(&raw)
105
switch {
106
case errors.Is(err, io.EOF):
107
break NextObject
108
case err != nil:
109
return nil, fmt.Errorf("error parsing object: %w", err)
110
case len(raw) == 0:
111
// Skip over empty objects. This can happen when --- is used at the top
112
// of YAML files.
113
continue NextObject
114
}
115
116
var us unstructured.Unstructured
117
if err := json.Unmarshal(raw, &us); err != nil {
118
return nil, fmt.Errorf("failed to decode object: %w", err)
119
}
120
objects = append(objects, &us)
121
}
122
123
return objects, nil
124
}
125
126
// DefaultBackoff is a default backoff config that retries forever until ctx is
127
// canceled.
128
var DefaultBackoff = backoff.Config{
129
MinBackoff: 100 * time.Millisecond,
130
MaxBackoff: 1 * time.Second,
131
}
132
133
// WaitReady will return with no error if obj becomes ready before ctx cancels
134
// or the backoff fails.
135
//
136
// obj may be one of: DaemonSet, StatefulSet, Deployment, Pod. obj must have
137
// namespace and name set so it can be found. obj will be updated with the
138
// state of the object in the cluster as WaitReady runs.
139
//
140
// The final state of the object will be returned when it is ready.
141
func WaitReady(ctx context.Context, cli client.Client, obj client.Object, bc backoff.Config) error {
142
bo := backoff.New(ctx, bc)
143
144
key := client.ObjectKeyFromObject(obj)
145
146
var readyCheck func() bool
147
switch obj := obj.(type) {
148
case *apps_v1.DaemonSet:
149
readyCheck = func() bool {
150
return obj.Status.NumberReady >= obj.Status.UpdatedNumberScheduled
151
}
152
case *apps_v1.StatefulSet:
153
readyCheck = func() bool {
154
return obj.Status.ReadyReplicas >= obj.Status.UpdatedReplicas
155
}
156
case *apps_v1.Deployment:
157
readyCheck = func() bool {
158
return obj.Status.ReadyReplicas >= obj.Status.UpdatedReplicas
159
}
160
case *core_v1.Pod:
161
readyCheck = func() bool {
162
phase := obj.Status.Phase
163
return phase == core_v1.PodRunning || phase == core_v1.PodSucceeded
164
}
165
}
166
167
for bo.Ongoing() {
168
err := cli.Get(ctx, key, obj)
169
if err == nil && readyCheck() {
170
break
171
}
172
bo.Wait()
173
}
174
175
return bo.Err()
176
}
177
178
// Wait calls done until ctx is canceled or check returns nil. Returns an error
179
// if ctx is canceled.
180
func Wait(ctx context.Context, l log.Logger, check func() error) error {
181
bo := backoff.New(ctx, DefaultBackoff)
182
for bo.Ongoing() {
183
err := check()
184
if err == nil {
185
return nil
186
}
187
level.Error(l).Log("msg", "check failed", "err", err)
188
bo.Wait()
189
}
190
return bo.Err()
191
}
192
193