Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-mk2/main.go
2492 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License-AGPL.txt in the project root for license information.
4
5
package main
6
7
import (
8
"bytes"
9
"encoding/json"
10
"flag"
11
"fmt"
12
"net"
13
"os"
14
15
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
16
// to ensure that exec-entrypoint and run can make use of them.
17
"google.golang.org/grpc"
18
"google.golang.org/grpc/credentials"
19
"google.golang.org/grpc/credentials/insecure"
20
_ "k8s.io/client-go/plugin/pkg/client/auth"
21
"k8s.io/client-go/rest"
22
23
"github.com/bombsimon/logrusr/v4"
24
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
25
"github.com/prometheus/client_golang/prometheus"
26
"github.com/sirupsen/logrus"
27
"k8s.io/apimachinery/pkg/runtime"
28
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30
"k8s.io/klog/v2"
31
ctrl "sigs.k8s.io/controller-runtime"
32
"sigs.k8s.io/controller-runtime/pkg/cache"
33
"sigs.k8s.io/controller-runtime/pkg/client"
34
"sigs.k8s.io/controller-runtime/pkg/healthz"
35
"sigs.k8s.io/controller-runtime/pkg/metrics"
36
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
37
"sigs.k8s.io/controller-runtime/pkg/webhook"
38
39
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
40
"github.com/gitpod-io/gitpod/common-go/log"
41
"github.com/gitpod-io/gitpod/common-go/pprof"
42
"github.com/gitpod-io/gitpod/common-go/tracing"
43
"github.com/gitpod-io/gitpod/components/scrubber"
44
imgbldr "github.com/gitpod-io/gitpod/image-builder/api"
45
regapi "github.com/gitpod-io/gitpod/registry-facade/api"
46
"github.com/gitpod-io/gitpod/ws-manager-mk2/controllers"
47
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"
48
imgproxy "github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/proxy"
49
"github.com/gitpod-io/gitpod/ws-manager-mk2/service"
50
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
51
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
52
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
53
//+kubebuilder:scaffold:imports
54
)
55
56
// PROXIED_GRPC_SERVICE_PREFIX is the prefix used for proxied gRPC services.
57
// It is used to avoid conflicts with the original service names in metrics, and to filter out proxied services in SLIs etc.
58
const PROXIED_GRPC_SERVICE_PREFIX = "proxied"
59
60
var (
61
// ServiceName is the name we use for tracing/logging
62
ServiceName = "ws-manager-mk2"
63
// Version of this service - set during build
64
Version = ""
65
66
scheme = runtime.NewScheme()
67
setupLog = ctrl.Log.WithName("setup")
68
)
69
70
func init() {
71
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
72
73
utilruntime.Must(workspacev1.AddToScheme(scheme))
74
//+kubebuilder:scaffold:scheme
75
}
76
77
func main() {
78
var configFN string
79
var jsonLog bool
80
var verbose bool
81
flag.StringVar(&configFN, "config", "", "Path to the config file")
82
flag.BoolVar(&jsonLog, "json-log", true, "produce JSON log output on verbose level")
83
flag.BoolVar(&verbose, "verbose", false, "Enable verbose logging")
84
flag.Parse()
85
86
log.Init(ServiceName, Version, jsonLog, verbose)
87
88
l := log.WithFields(logrus.Fields{})
89
l.Logger.SetReportCaller(false)
90
baseLogger := logrusr.New(l, logrusr.WithFormatter(func(i interface{}) interface{} {
91
return &log.TrustedValueWrap{Value: scrubber.Default.DeepCopyStruct(i)}
92
}))
93
ctrl.SetLogger(baseLogger)
94
// Set the logger used by k8s (e.g. client-go).
95
klog.SetLogger(baseLogger)
96
promrep := &tracing.PromReporter{
97
Operations: map[string]tracing.SpanMetricMapping{
98
"StartWorkspace": {
99
Name: "wsman_start_workspace",
100
Help: "time it takes to service a StartWorkspace request",
101
Buckets: prometheus.LinearBuckets(0, 500, 10), // 10 buckets, each 500ms wide
102
},
103
},
104
}
105
closer := tracing.Init(ServiceName, tracing.WithPrometheusReporter(promrep))
106
if closer != nil {
107
defer closer.Close()
108
}
109
110
cfg, err := getConfig(configFN)
111
if err != nil {
112
setupLog.Error(err, "unable to read config")
113
os.Exit(1)
114
}
115
116
if cfg.PProf.Addr != "" {
117
go pprof.Serve(cfg.PProf.Addr)
118
}
119
120
// Check that namespace config values are set. Empty namespaces default to a cluster-scoped cache,
121
// which the controller doesn't have the right RBAC for.
122
if cfg.Manager.Namespace == "" {
123
setupLog.Error(nil, "namespace cannot be empty")
124
os.Exit(1)
125
}
126
127
if cfg.Manager.SecretsNamespace == "" {
128
setupLog.Error(nil, "secretsNamespace cannot be empty")
129
os.Exit(1)
130
}
131
132
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
133
Scheme: scheme,
134
Metrics: metricsserver.Options{BindAddress: cfg.Prometheus.Addr},
135
Cache: cache.Options{
136
DefaultNamespaces: map[string]cache.Config{
137
cfg.Manager.Namespace: {},
138
cfg.Manager.SecretsNamespace: {},
139
},
140
},
141
WebhookServer: webhook.NewServer(webhook.Options{
142
Port: 9443,
143
}),
144
HealthProbeBindAddress: cfg.Health.Addr,
145
LeaderElection: true,
146
LeaderElectionID: "ws-manager-mk2-leader.gitpod.io",
147
LeaderElectionReleaseOnCancel: true,
148
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
149
config.QPS = 100
150
config.Burst = 150
151
152
c, err := client.New(config, options)
153
if err != nil {
154
return nil, err
155
}
156
157
return c, nil
158
},
159
})
160
if err != nil {
161
setupLog.Error(err, "unable to start manager")
162
os.Exit(1)
163
}
164
165
mgrCtx := ctrl.SetupSignalHandler()
166
167
maintenanceReconciler, err := controllers.NewMaintenanceReconciler(mgr.GetClient(), metrics.Registry)
168
if err != nil {
169
setupLog.Error(err, "unable to create maintenance controller", "controller", "Maintenance")
170
os.Exit(1)
171
}
172
173
timeoutReconciler, err := controllers.NewTimeoutReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("workspace"), cfg.Manager, maintenanceReconciler)
174
if err != nil {
175
setupLog.Error(err, "unable to create timeout controller", "controller", "Timeout")
176
os.Exit(1)
177
}
178
179
wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), maintenanceReconciler)
180
if err != nil {
181
setupLog.Error(err, "unable to start manager service")
182
os.Exit(1)
183
}
184
185
subscriberReconciler, err := controllers.NewSubscriberReconciler(mgr.GetClient(), &cfg.Manager)
186
if err != nil {
187
setupLog.Error(err, "unable to create subscriber controller", "controller", "Subscribers")
188
os.Exit(1)
189
}
190
191
subscriberReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
192
193
if err = subscriberReconciler.SetupWithManager(mgrCtx, mgr); err != nil {
194
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Subscribers")
195
os.Exit(1)
196
}
197
198
err = controllers.SetupIndexer(mgr)
199
if err != nil {
200
setupLog.Error(err, "unable to configure field indexer")
201
os.Exit(1)
202
}
203
204
go func() {
205
<-mgr.Elected()
206
207
workspaceReconciler, err := controllers.NewWorkspaceReconciler(
208
mgr.GetClient(), mgr.GetConfig(), mgr.GetScheme(), mgr.GetEventRecorderFor("workspace"), &cfg.Manager, metrics.Registry, maintenanceReconciler)
209
if err != nil {
210
setupLog.Error(err, "unable to create controller", "controller", "Workspace")
211
os.Exit(1)
212
}
213
214
if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
215
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Workspace")
216
os.Exit(1)
217
}
218
}()
219
220
if err = timeoutReconciler.SetupWithManager(mgr); err != nil {
221
setupLog.Error(err, "unable to setup timeout controller with manager", "controller", "Timeout")
222
os.Exit(1)
223
}
224
225
if err = maintenanceReconciler.SetupWithManager(mgrCtx, mgr); err != nil {
226
setupLog.Error(err, "unable to setup maintenance controller with manager", "controller", "Maintenance")
227
os.Exit(1)
228
}
229
230
// if err = (&workspacev1.Workspace{}).SetupWebhookWithManager(mgr); err != nil {
231
// setupLog.Error(err, "unable to create webhook", "webhook", "Workspace")
232
// os.Exit(1)
233
// }
234
235
//+kubebuilder:scaffold:builder
236
237
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
238
setupLog.Error(err, "unable to set up health check")
239
os.Exit(1)
240
}
241
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
242
setupLog.Error(err, "unable to set up ready check")
243
os.Exit(1)
244
}
245
246
setupLog.Info("starting manager")
247
if err := mgr.Start(mgrCtx); err != nil {
248
setupLog.Error(err, "problem running manager")
249
os.Exit(1)
250
}
251
}
252
253
func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maintenance maintenance.Maintenance) (*service.WorkspaceManagerServer, error) {
254
// TODO(cw): remove use of common-go/log
255
256
if len(cfg.RPCServer.RateLimits) > 0 {
257
log.WithField("ratelimits", cfg.RPCServer.RateLimits).Info("imposing rate limits on the gRPC interface")
258
}
259
ratelimits := common_grpc.NewRatelimitingInterceptor(cfg.RPCServer.RateLimits)
260
261
grpcMetrics := grpc_prometheus.NewServerMetrics()
262
grpcMetrics.EnableHandlingTimeHistogram()
263
metrics.Registry.MustRegister(grpcMetrics)
264
265
// Create interceptor for prefixing the proxied gRPC service names, to avoid conflicts with the original service names in metrics
266
grpcServicePrefixer := &imgproxy.ServiceNamePrefixerInterceptor{Prefix: PROXIED_GRPC_SERVICE_PREFIX}
267
268
grpcOpts := common_grpc.ServerOptionsWithInterceptors(
269
[]grpc.StreamServerInterceptor{grpcServicePrefixer.StreamServerInterceptor(), grpcMetrics.StreamServerInterceptor()},
270
[]grpc.UnaryServerInterceptor{grpcServicePrefixer.UnaryServerInterceptor(), grpcMetrics.UnaryServerInterceptor(), ratelimits.UnaryInterceptor()},
271
)
272
if cfg.RPCServer.TLS.CA != "" && cfg.RPCServer.TLS.Certificate != "" && cfg.RPCServer.TLS.PrivateKey != "" {
273
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
274
cfg.RPCServer.TLS.CA, cfg.RPCServer.TLS.Certificate, cfg.RPCServer.TLS.PrivateKey,
275
common_grpc.WithSetClientCAs(true),
276
common_grpc.WithServerName("ws-manager"),
277
)
278
if err != nil {
279
log.WithError(err).Fatal("cannot load ws-manager certs")
280
}
281
282
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
283
} else {
284
log.Warn("no TLS configured - gRPC server will be unsecured")
285
}
286
287
grpcServer := grpc.NewServer(grpcOpts...)
288
289
if cfg.ImageBuilderProxy.TargetAddr != "" {
290
creds := insecure.NewCredentials()
291
if cfg.ImageBuilderProxy.TLS.CA != "" && cfg.ImageBuilderProxy.TLS.Certificate != "" && cfg.ImageBuilderProxy.TLS.PrivateKey != "" {
292
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
293
cfg.ImageBuilderProxy.TLS.CA, cfg.ImageBuilderProxy.TLS.Certificate, cfg.ImageBuilderProxy.TLS.PrivateKey,
294
common_grpc.WithSetRootCAs(true),
295
common_grpc.WithServerName("image-builder-mk3"),
296
)
297
if err != nil {
298
log.WithError(err).Fatal("cannot load image-builder-mk3 TLS certs")
299
}
300
log.Info("Loaded TLS for image builder")
301
creds = credentials.NewTLS(tlsConfig)
302
}
303
// Note: never use block here, because image-builder connects to ws-manager,
304
// and if we blocked here, ws-manager wouldn't come up, hence we couldn't connect to ws-manager.
305
conn, err := grpc.Dial(cfg.ImageBuilderProxy.TargetAddr, grpc.WithTransportCredentials(creds))
306
if err != nil {
307
log.WithError(err).Fatal("failed to connect to image builder")
308
}
309
imgbldr.RegisterImageBuilderServer(grpcServer, imgproxy.ImageBuilder{D: imgbldr.NewImageBuilderClient(conn)})
310
}
311
312
srv := service.NewWorkspaceManagerServer(k8s, &cfg.Manager, metrics.Registry, maintenance)
313
314
grpc_prometheus.Register(grpcServer)
315
wsmanapi.RegisterWorkspaceManagerServer(grpcServer, srv)
316
regapi.RegisterSpecProviderServer(grpcServer, &service.WorkspaceImageSpecProvider{
317
Client: k8s,
318
Namespace: cfg.Manager.Namespace,
319
})
320
321
lis, err := net.Listen("tcp", cfg.RPCServer.Addr)
322
if err != nil {
323
log.WithError(err).WithField("addr", cfg.RPCServer.Addr).Fatal("cannot start RPC server")
324
}
325
go func() {
326
err := grpcServer.Serve(lis)
327
if err != nil {
328
log.WithError(err).Error("gRPC service failed")
329
}
330
}()
331
log.WithField("addr", cfg.RPCServer.Addr).Info("started gRPC server")
332
333
return srv, nil
334
}
335
336
func getConfig(fn string) (*config.ServiceConfiguration, error) {
337
ctnt, err := os.ReadFile(fn)
338
if err != nil {
339
return nil, fmt.Errorf("cannot read configuration. Maybe missing --config?: %w", err)
340
}
341
342
var cfg config.ServiceConfiguration
343
dec := json.NewDecoder(bytes.NewReader(ctnt))
344
dec.DisallowUnknownFields()
345
err = dec.Decode(&cfg)
346
if err != nil {
347
return nil, fmt.Errorf("cannot decode configuration from %s: %w", fn, err)
348
}
349
350
if cfg.Manager.SSHGatewayCAPublicKeyFile != "" {
351
ca, err := os.ReadFile(cfg.Manager.SSHGatewayCAPublicKeyFile)
352
if err != nil {
353
log.WithError(err).Error("cannot read SSH Gateway CA public key")
354
return &cfg, nil
355
}
356
cfg.Manager.SSHGatewayCAPublicKey = string(ca)
357
}
358
return &cfg, nil
359
}
360
361