Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/util/k8s/k8s.go
4096 views
1
// Package k8s spins up a Kubernetes cluster for testing.
2
package k8s
3
4
import (
5
"context"
6
"fmt"
7
"os"
8
"sort"
9
"time"
10
11
docker_types "github.com/docker/docker/api/types"
12
docker_nat "github.com/docker/go-connections/nat"
13
gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"
14
k3d_client "github.com/k3d-io/k3d/v5/pkg/client"
15
config "github.com/k3d-io/k3d/v5/pkg/config"
16
k3d_cfgtypes "github.com/k3d-io/k3d/v5/pkg/config/types"
17
k3d_config "github.com/k3d-io/k3d/v5/pkg/config/v1alpha4"
18
k3d_log "github.com/k3d-io/k3d/v5/pkg/logger"
19
k3d_runtime "github.com/k3d-io/k3d/v5/pkg/runtimes"
20
k3d_docker "github.com/k3d-io/k3d/v5/pkg/runtimes/docker"
21
k3d_types "github.com/k3d-io/k3d/v5/pkg/types"
22
k3d_version "github.com/k3d-io/k3d/v5/version"
23
promop_v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
24
apiextensions_v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
25
"k8s.io/apimachinery/pkg/runtime"
26
"k8s.io/apimachinery/pkg/util/rand"
27
"k8s.io/client-go/kubernetes/scheme"
28
"k8s.io/client-go/rest"
29
k8s_clientcmd "k8s.io/client-go/tools/clientcmd"
30
"sigs.k8s.io/controller-runtime/pkg/client"
31
)
32
33
// Cluster is a Kubernetes cluster that runs inside of a k3s Docker container.
34
// Call GetConfig to retrieve a Kubernetes *rest.Config to use to connect to
35
// the cluster.
36
//
37
// Cluster also runs an NGINX ingress controller which is exposed to the host.
38
// Call GetHTTPAddr to get the address for making requests against the server.
39
//
40
// Set K8S_USE_DOCKER_NETWORK in your environment variables if you are
41
// running tests from inside of a Docker container. This environment variable
42
// configures the k3s Docker container to join the same network as the
43
// container tests are running in. When this environment variable isn't set,
44
// the exposed ports on the Docker host are used for cluster communication.
45
//
46
// Note that k3s uses containerd as its runtime, which means local Docker
47
// images are not immediately available for use. To push local images to a
48
// container, call PushImages. It's recommended that tests use image names that
49
// are not available on Docker Hub to avoid accidentally testing against the
50
// wrong image.
51
//
52
// Cluster should be stopped by calling Stop, otherwise running Docker
53
// containers will leak.
54
type Cluster struct {
55
runtime k3d_runtime.Runtime
56
k3dCluster k3d_types.Cluster
57
restConfig *rest.Config
58
kubeClient client.Client
59
nginxAddr string
60
}
61
62
// Options control creation of a cluster.
63
type Options struct {
64
// Scheme is the Kubernetes scheme used for the generated Kubernetes client.
65
// If nil, a generated scheme that contains all known Kubernetes API types
66
// will be generated.
67
Scheme *runtime.Scheme
68
}
69
70
func (o *Options) applyDefaults() error {
71
if o.Scheme == nil {
72
o.Scheme = runtime.NewScheme()
73
74
for _, add := range []func(*runtime.Scheme) error{
75
scheme.AddToScheme,
76
apiextensions_v1.AddToScheme,
77
gragent.AddToScheme,
78
promop_v1.AddToScheme,
79
} {
80
if err := add(o.Scheme); err != nil {
81
return fmt.Errorf("unable to register scheme: %w", err)
82
}
83
}
84
}
85
return nil
86
}
87
88
// NewCluster creates a new Cluster. NewCluster won't return with success until
89
// the cluster is running, but things like the ingress controller might not be
90
// running right away. You should never assume that any resource in the cluster
91
// is running and utilize exponential backoffs to allow time for things to spin
92
// up.
93
func NewCluster(ctx context.Context, o Options) (cluster *Cluster, err error) {
94
var (
95
// We force the Docker runtime so we can create a Docker client for getting
96
// the exposed ports for the API server and NGINX.
97
runtime = k3d_runtime.Docker
98
99
// Running in docker indicates that we should configure k3s to connect to
100
// the same docker network as the current container.
101
runningInDocker = os.Getenv("K8S_USE_DOCKER_NETWORK") == "1"
102
)
103
104
if err := o.applyDefaults(); err != nil {
105
return nil, fmt.Errorf("failed to apply defaults to options: %w", err)
106
}
107
108
k3dConfig := k3d_config.SimpleConfig{
109
TypeMeta: k3d_cfgtypes.TypeMeta{
110
Kind: "Simple",
111
APIVersion: config.DefaultConfigApiVersion,
112
},
113
ObjectMeta: k3d_cfgtypes.ObjectMeta{
114
Name: randomClusterName(),
115
},
116
Servers: 1,
117
Ports: []k3d_config.PortWithNodeFilters{{
118
// Bind NGINX (container port 80) to 127.0.0.1:0
119
Port: "127.0.0.1:0:80",
120
NodeFilters: []string{"loadbalancer"},
121
}},
122
ExposeAPI: k3d_config.SimpleExposureOpts{
123
// Bind API sever to 127.0.0.1:0
124
Host: "127.0.0.1",
125
HostIP: "127.0.0.1",
126
HostPort: "0",
127
},
128
Image: fmt.Sprintf("%s:%s", k3d_types.DefaultK3sImageRepo, k3d_version.K3sVersion),
129
Options: k3d_config.SimpleConfigOptions{
130
K3dOptions: k3d_config.SimpleConfigOptionsK3d{
131
Wait: true,
132
Timeout: time.Minute,
133
},
134
},
135
}
136
if runningInDocker {
137
err := injectCurrentDockerNetwork(ctx, &k3dConfig)
138
if err != nil {
139
return nil, fmt.Errorf("could not connect k3d to current docker network: %w", err)
140
}
141
}
142
143
clusterConfig, err := config.TransformSimpleToClusterConfig(ctx, runtime, k3dConfig)
144
if err != nil {
145
return nil, fmt.Errorf("failed to generate cluster config: %w", err)
146
}
147
148
err = k3d_client.ClusterRun(ctx, runtime, clusterConfig)
149
150
defer func() {
151
// We don't want to leak the cluster here, and we can't really be sure how
152
// many resources exist, even if ClusterRun fails. If we never set our
153
// cluster return argument, we'll delete the k3d cluster. This also
154
// gracefully handles panics.
155
if cluster == nil {
156
_ = k3d_client.ClusterDelete(ctx, runtime, &clusterConfig.Cluster, k3d_types.ClusterDeleteOpts{})
157
}
158
}()
159
if err != nil {
160
return nil, fmt.Errorf("failed to run cluster: %w", err)
161
}
162
163
var (
164
httpAddr string
165
apiServerAddr string
166
)
167
168
// If we're currently running inside of Docker, we can connect directly to
169
// our container. Otherwise, we have to find what the bound host IPs are.
170
if runningInDocker {
171
httpAddr, apiServerAddr, err = clusterInternalAddrs(ctx, clusterConfig.Cluster)
172
} else {
173
httpAddr, apiServerAddr, err = loadBalancerExposedAddrs(ctx, clusterConfig.Cluster)
174
}
175
if err != nil {
176
return nil, fmt.Errorf("failed to discover exposed cluster addresses: %w", err)
177
}
178
179
kubeconfig, err := k3d_client.KubeconfigGet(ctx, runtime, &clusterConfig.Cluster)
180
if err != nil {
181
return nil, fmt.Errorf("failed to retrieve kubeconfig: %w", err)
182
}
183
if c, ok := kubeconfig.Clusters[kubeconfig.CurrentContext]; ok && c != nil {
184
// The generated kubeconfig will set https://127.0.0.1:0 as the address. We
185
// need to replace it with the actual exposed port that Docker generated
186
// for us.
187
c.Server = "https://" + apiServerAddr
188
} else {
189
return nil, fmt.Errorf("generated kubeconfig missing context set")
190
}
191
restCfg, err := k8s_clientcmd.NewDefaultClientConfig(*kubeconfig, nil).ClientConfig()
192
if err != nil {
193
return nil, fmt.Errorf("could not generate k8s REST API config: %w", err)
194
}
195
196
kubeClient, err := client.New(restCfg, client.Options{
197
Scheme: o.Scheme,
198
})
199
if err != nil {
200
return nil, fmt.Errorf("failed to generate client: %w", err)
201
}
202
203
return &Cluster{
204
runtime: runtime,
205
k3dCluster: clusterConfig.Cluster,
206
restConfig: restCfg,
207
nginxAddr: httpAddr,
208
kubeClient: kubeClient,
209
}, nil
210
}
211
212
// injectCurrentDockerNetwork reconfigures config to join the Docker network of
213
// the current container. Fails if the function is not being called from inside
214
// of a Docker container.
215
func injectCurrentDockerNetwork(ctx context.Context, config *k3d_config.SimpleConfig) error {
216
hostname, err := os.Hostname()
217
if err != nil {
218
return fmt.Errorf("could not get hostname: %w", err)
219
}
220
221
cli, err := k3d_docker.GetDockerClient()
222
if err != nil {
223
return fmt.Errorf("failed to get docker client: %w", err)
224
}
225
info, err := cli.ContainerInspect(ctx, hostname)
226
if err != nil {
227
return fmt.Errorf("failed to find current docker container: %w", err)
228
}
229
230
networks := make([]string, 0, len(info.NetworkSettings.Networks))
231
for nw := range info.NetworkSettings.Networks {
232
networks = append(networks, nw)
233
}
234
sort.Strings(networks)
235
236
if len(networks) == 0 {
237
return fmt.Errorf("no networks")
238
}
239
config.Network = networks[0]
240
return nil
241
}
242
243
func randomClusterName() string {
244
return "grafana-agent-e2e-" + rand.String(5)
245
}
246
247
func clusterInternalAddrs(ctx context.Context, cluster k3d_types.Cluster) (httpAddr, serverAddr string, err error) {
248
var lb, server *k3d_types.Node
249
for _, n := range cluster.Nodes {
250
switch n.Role {
251
case k3d_types.LoadBalancerRole:
252
if lb == nil {
253
lb = n
254
}
255
case k3d_types.ServerRole:
256
if server == nil {
257
server = n
258
}
259
}
260
}
261
if lb == nil {
262
return "", "", fmt.Errorf("no loadbalancer node")
263
} else if server == nil {
264
return "", "", fmt.Errorf("no server node")
265
}
266
267
cli, err := k3d_docker.GetDockerClient()
268
if err != nil {
269
return "", "", fmt.Errorf("failed to get docker client: %w", err)
270
}
271
272
lbInfo, err := cli.ContainerInspect(ctx, lb.Name)
273
if err != nil {
274
return "", "", fmt.Errorf("failed to inspect loadbalancer: %w", err)
275
} else if nw, found := lbInfo.NetworkSettings.Networks[cluster.Network.Name]; !found {
276
return "", "", fmt.Errorf("loadbalancer not connected to expected network %q", cluster.Network.Name)
277
} else {
278
httpAddr = fmt.Sprintf("%s:80", nw.IPAddress)
279
}
280
281
serverInfo, err := cli.ContainerInspect(ctx, server.Name)
282
if err != nil {
283
return "", "", fmt.Errorf("failed to inspect worker: %w", err)
284
} else if nw, found := serverInfo.NetworkSettings.Networks[cluster.Network.Name]; !found {
285
return "", "", fmt.Errorf("worker not connected to expected network %q", cluster.Network.Name)
286
} else {
287
serverAddr = fmt.Sprintf("%s:6443", nw.IPAddress)
288
}
289
290
return httpAddr, serverAddr, nil
291
}
292
293
func loadBalancerExposedAddrs(ctx context.Context, cluster k3d_types.Cluster) (httpAddr, apiServerAddr string, err error) {
294
var lb *k3d_types.Node
295
for _, n := range cluster.Nodes {
296
if n.Role == k3d_types.LoadBalancerRole {
297
lb = n
298
break
299
}
300
}
301
if lb == nil {
302
return "", "", fmt.Errorf("no loadbalancer node")
303
}
304
305
cli, err := k3d_docker.GetDockerClient()
306
if err != nil {
307
return "", "", fmt.Errorf("failed to get docker client: %w", err)
308
}
309
info, err := cli.ContainerInspect(ctx, lb.Name)
310
if err != nil {
311
return "", "", fmt.Errorf("failed to inspect loadbalancer: %w", err)
312
}
313
314
httpAddr, err = hostBinding(info, 80)
315
if err != nil {
316
return "", "", fmt.Errorf("failed to discover NGINX HTTP addr: %w", err)
317
}
318
apiServerAddr, err = hostBinding(info, 6443)
319
if err != nil {
320
return "", "", fmt.Errorf("failed to discover API server addr: %w", err)
321
}
322
return httpAddr, apiServerAddr, nil
323
}
324
325
func hostBinding(containerInfo docker_types.ContainerJSON, containerPort int) (string, error) {
326
for rawPort, bindings := range containerInfo.NetworkSettings.Ports {
327
_, portString := docker_nat.SplitProtoPort(string(rawPort))
328
port, _ := docker_nat.ParsePort(portString)
329
if port != containerPort {
330
continue
331
}
332
if len(bindings) == 0 {
333
return "", fmt.Errorf("no exposed bindings for port %d", containerPort)
334
}
335
return fmt.Sprintf("%s:%s", bindings[0].HostIP, bindings[0].HostPort), nil
336
}
337
338
return "", fmt.Errorf("no container port %d exposed", containerPort)
339
}
340
341
// Client returns the Kubernetes client for this Cluster. Client is handling
342
// objects registered to the Scheme passed to Options when creating the
343
// cluster.
344
func (c *Cluster) Client() client.Client {
345
return c.kubeClient
346
}
347
348
// GetConfig returns a *rest.Config that can be used to connect to the
349
// Kubernetes cluster. The returned Config is a copy and is safe for
350
// modification.
351
func (c *Cluster) GetConfig() *rest.Config {
352
return rest.CopyConfig(c.restConfig)
353
}
354
355
// GetHTTPAddr returns the host:port address that can be used to connect to the
356
// cluster's NGINX server.
357
func (c *Cluster) GetHTTPAddr() string {
358
return c.nginxAddr
359
}
360
361
// PushImages push images from the local Docker host into the Cluster. If the
362
// specified image does not have a tag, `:latest` is assumed.
363
func (c *Cluster) PushImages(images ...string) error {
364
return k3d_client.ImageImportIntoClusterMulti(
365
context.Background(),
366
c.runtime,
367
images,
368
&c.k3dCluster,
369
k3d_types.ImageImportOpts{},
370
)
371
}
372
373
// Stop shuts down and deletes the cluster. Stop must be called to clean up
374
// created Docker resources.
375
func (c *Cluster) Stop() {
376
err := k3d_client.ClusterDelete(context.Background(), c.runtime, &c.k3dCluster, k3d_types.ClusterDeleteOpts{})
377
if err != nil {
378
k3d_log.Log().Errorf("failed to shut down cluster, docker containers may have leaked: %s", err)
379
}
380
}
381
382