Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/common-go/baseserver/server.go
2498 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package baseserver
6
7
import (
8
"context"
9
"fmt"
10
"io"
11
"net"
12
"net/http"
13
"os"
14
"os/signal"
15
"sync"
16
"syscall"
17
18
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
19
"github.com/gitpod-io/gitpod/common-go/log"
20
"github.com/gitpod-io/gitpod/common-go/pprof"
21
"github.com/gitpod-io/gitpod/common-go/tracing"
22
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
23
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
24
"github.com/opentracing/opentracing-go"
25
"github.com/prometheus/client_golang/prometheus"
26
"github.com/prometheus/client_golang/prometheus/collectors"
27
"github.com/prometheus/client_golang/prometheus/promhttp"
28
"github.com/sirupsen/logrus"
29
http_metrics "github.com/slok/go-http-metrics/metrics/prometheus"
30
"github.com/slok/go-http-metrics/middleware"
31
"github.com/slok/go-http-metrics/middleware/std"
32
"golang.org/x/sync/errgroup"
33
"google.golang.org/grpc"
34
"google.golang.org/grpc/credentials"
35
"google.golang.org/grpc/health/grpc_health_v1"
36
"google.golang.org/grpc/reflection"
37
)
38
39
func New(name string, opts ...Option) (*Server, error) {
40
options, err := evaluateOptions(defaultOptions(), opts...)
41
if err != nil {
42
return nil, fmt.Errorf("invalid config: %w", err)
43
}
44
45
server := &Server{
46
Name: name,
47
options: options,
48
}
49
server.builtinServices = newBuiltinServices(server)
50
server.tracingCloser = tracing.Init(name)
51
52
server.httpMux = http.NewServeMux()
53
server.http = &http.Server{Handler: std.Handler("", middleware.New(middleware.Config{
54
Recorder: http_metrics.NewRecorder(http_metrics.Config{
55
Prefix: "gitpod",
56
Registry: server.MetricsRegistry(),
57
}),
58
}), server.httpMux)}
59
60
err = server.initializeMetrics()
61
if err != nil {
62
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
63
}
64
65
err = server.initializeGRPC()
66
if err != nil {
67
return nil, fmt.Errorf("failed to initialize gRPC server: %w", err)
68
}
69
70
return server, nil
71
}
72
73
// Server is a packaged server with batteries included. It is designed to be standard across components where it makes sense.
74
// Server implements graceful shutdown making it suitable for usage in integration tests. See server_test.go.
75
//
76
// Server is composed of the following:
77
// - Debug server which serves observability and debug endpoints
78
// - /metrics for Prometheus metrics
79
// - /pprof for Golang profiler
80
// - /ready for kubernetes readiness check
81
// - /live for kubernetes liveness check
82
// - (optional) gRPC server with standard interceptors and configuration
83
// - Started when baseserver is configured WithGRPCPort (port is non-negative)
84
// - Use Server.GRPC() to get access to the underlying grpc.Server and register services
85
// - (optional) HTTP server
86
// - Currently does not come with any standard HTTP middlewares
87
// - Started when baseserver is configured WithHTTPPort (port is non-negative)
88
// - Use Server.HTTPMux() to get access to the root handler and register your endpoints
89
type Server struct {
90
// Name is the name of this server, used for logging context
91
Name string
92
93
options *options
94
95
builtinServices *builtinServices
96
97
// http is an http Server, only used when port is specified in cfg
98
http *http.Server
99
httpMux *http.ServeMux
100
httpListener net.Listener
101
102
// grpc is a grpc Server, only used when port is specified in cfg
103
grpc *grpc.Server
104
grpcListener net.Listener
105
106
tracingCloser io.Closer
107
108
// listening indicates the server is serving. When closed, the server is in the process of graceful termination.
109
listening chan struct{}
110
closeOnce sync.Once
111
}
112
113
func serveHTTP(cfg *ServerConfiguration, srv *http.Server, l net.Listener) (err error) {
114
if cfg.TLS == nil {
115
err = srv.Serve(l)
116
} else {
117
err = srv.ServeTLS(l, cfg.TLS.CertPath, cfg.TLS.KeyPath)
118
}
119
return
120
}
121
122
func (s *Server) ListenAndServe() error {
123
var err error
124
125
s.listening = make(chan struct{})
126
defer func() {
127
err := s.Close()
128
if err != nil {
129
s.Logger().WithError(err).Errorf("cannot close gracefully")
130
}
131
}()
132
133
go func() {
134
err := s.builtinServices.ListenAndServe()
135
if err != nil {
136
s.Logger().WithError(err).Errorf("builtin services encountered an error - closing remaining servers.")
137
s.Close()
138
}
139
}()
140
141
if srv := s.options.config.Services.HTTP; srv != nil {
142
s.httpListener, err = net.Listen("tcp", srv.Address)
143
if err != nil {
144
return fmt.Errorf("failed to start HTTP server: %w", err)
145
}
146
s.http.Addr = srv.Address
147
148
go func() {
149
err := serveHTTP(srv, s.http, s.httpListener)
150
if err != nil {
151
s.Logger().WithError(err).Errorf("HTTP server encountered an error - closing remaining servers.")
152
s.Close()
153
}
154
}()
155
}
156
157
if srv := s.options.config.Services.GRPC; srv != nil {
158
s.grpcListener, err = net.Listen("tcp", srv.Address)
159
if err != nil {
160
return fmt.Errorf("failed to start gRPC server: %w", err)
161
}
162
163
go func() {
164
err := s.grpc.Serve(s.grpcListener)
165
if err != nil {
166
s.Logger().WithError(err).Errorf("gRPC server encountered an error - closing remaining servers.")
167
s.Close()
168
}
169
}()
170
}
171
172
signals := make(chan os.Signal, 1)
173
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
174
175
// Await operating system signals, or server errors.
176
sig := <-signals
177
s.Logger().Infof("Received system signal %s, closing server.", sig.String())
178
return nil
179
}
180
181
func (s *Server) Close() error {
182
ctx, cancel := context.WithTimeout(context.Background(), s.options.closeTimeout)
183
defer cancel()
184
185
var err error
186
s.closeOnce.Do(func() {
187
err = s.close(ctx)
188
})
189
return err
190
}
191
192
func (s *Server) Logger() *logrus.Entry {
193
return s.options.logger
194
}
195
196
func (s *Server) HTTPMux() *http.ServeMux {
197
return s.httpMux
198
}
199
200
func (s *Server) GRPC() *grpc.Server {
201
return s.grpc
202
}
203
204
func (s *Server) MetricsRegistry() *prometheus.Registry {
205
return s.options.metricsRegistry
206
}
207
208
func (s *Server) Tracer() opentracing.Tracer {
209
return opentracing.GlobalTracer()
210
}
211
212
func (s *Server) close(ctx context.Context) error {
213
if s.listening == nil {
214
return fmt.Errorf("server is not running, invalid close operation")
215
}
216
217
if s.isClosing() {
218
s.Logger().Debug("Server is already closing.")
219
return nil
220
}
221
222
s.Logger().Info("Received graceful shutdown request.")
223
close(s.listening)
224
225
if s.grpc != nil {
226
s.grpc.GracefulStop()
227
// s.grpc.GracefulStop() also closes the underlying net.Listener, we just release the reference.
228
s.grpcListener = nil
229
s.Logger().Info("GRPC server terminated.")
230
}
231
232
if s.http != nil {
233
err := s.http.Shutdown(ctx)
234
if err != nil {
235
return fmt.Errorf("failed to close http server: %w", err)
236
}
237
// s.http.Shutdown() also closes the underlying net.Listener, we just release the reference.
238
s.httpListener = nil
239
s.Logger().Info("HTTP server terminated.")
240
}
241
242
// Always terminate builtin server last, we want to keep it running for as long as possible
243
err := s.builtinServices.Close()
244
if err != nil {
245
return fmt.Errorf("failed to close debug server: %w", err)
246
}
247
s.Logger().Info("Debug server terminated.")
248
249
err = s.tracingCloser.Close()
250
if err != nil {
251
return fmt.Errorf("failed to close tracing: %w", err)
252
}
253
254
return nil
255
}
256
257
func (s *Server) isClosing() bool {
258
select {
259
case <-s.listening:
260
// listening channel is closed, we're in graceful shutdown mode
261
return true
262
default:
263
return false
264
}
265
}
266
267
func (s *Server) healthEndpoint() http.Handler {
268
mux := http.NewServeMux()
269
mux.HandleFunc("/ready", s.options.healthHandler.ReadyEndpoint)
270
mux.HandleFunc("/live", s.options.healthHandler.LiveEndpoint)
271
return mux
272
}
273
274
func (s *Server) metricsEndpoint() http.Handler {
275
mux := http.NewServeMux()
276
mux.Handle("/metrics", promhttp.InstrumentMetricHandler(
277
s.options.metricsRegistry, promhttp.HandlerFor(s.options.metricsRegistry, promhttp.HandlerOpts{}),
278
))
279
return mux
280
}
281
282
func (s *Server) initializeGRPC() error {
283
common_grpc.SetupLogging()
284
285
grpcMetrics := grpc_prometheus.NewServerMetrics()
286
grpcMetrics.EnableHandlingTimeHistogram(
287
grpc_prometheus.WithHistogramBuckets([]float64{.005, .025, .05, .1, .5, 1, 2.5, 5, 30, 60, 120, 240, 600}),
288
)
289
if err := s.MetricsRegistry().Register(grpcMetrics); err != nil {
290
return fmt.Errorf("failed to register grpc metrics: %w", err)
291
}
292
293
unary := []grpc.UnaryServerInterceptor{
294
grpc_logrus.UnaryServerInterceptor(s.Logger(),
295
grpc_logrus.WithDecider(func(fullMethodName string, err error) bool {
296
// Skip logs for anything that does not contain an error.
297
if err == nil {
298
return false
299
}
300
// Skip gRPC healthcheck logs, they are frequent and pollute our logging infra
301
return fullMethodName != "/grpc.health.v1.Health/Check"
302
}),
303
),
304
grpcMetrics.UnaryServerInterceptor(),
305
}
306
stream := []grpc.StreamServerInterceptor{
307
grpc_logrus.StreamServerInterceptor(s.Logger()),
308
grpcMetrics.StreamServerInterceptor(),
309
}
310
311
opts := common_grpc.ServerOptionsWithInterceptors(stream, unary)
312
if cfg := s.options.config.Services.GRPC; cfg != nil && cfg.TLS != nil {
313
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
314
cfg.TLS.CAPath, cfg.TLS.CertPath, cfg.TLS.KeyPath,
315
common_grpc.WithSetClientCAs(true),
316
common_grpc.WithServerName(s.Name),
317
)
318
if err != nil {
319
return fmt.Errorf("failed to load certificates: %w", err)
320
}
321
322
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
323
}
324
325
opts = append(opts, grpc.MaxRecvMsgSize(100*1024*1024))
326
s.grpc = grpc.NewServer(opts...)
327
328
reflection.Register(s.grpc)
329
330
// Register health service by default
331
grpc_health_v1.RegisterHealthServer(s.grpc, s.options.grpcHealthCheck)
332
333
return nil
334
}
335
336
func (s *Server) initializeMetrics() error {
337
err := s.MetricsRegistry().Register(collectors.NewGoCollector())
338
if err != nil {
339
return fmt.Errorf("faile to register go collectors: %w", err)
340
}
341
342
err = s.MetricsRegistry().Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
343
if err != nil {
344
return fmt.Errorf("failed to register process collectors: %w", err)
345
}
346
347
err = registerMetrics(s.MetricsRegistry())
348
if err != nil {
349
return fmt.Errorf("failed to register baseserver metrics: %w", err)
350
}
351
352
if err := s.MetricsRegistry().Register(log.DefaultMetrics); err != nil {
353
return fmt.Errorf("failed to register log metrics: %w", err)
354
}
355
356
reportServerVersion(s.options.version)
357
358
return nil
359
}
360
361
func (s *Server) DebugAddress() string {
362
if s.builtinServices == nil {
363
return ""
364
}
365
return "http://" + s.builtinServices.Debug.Addr
366
}
367
func (s *Server) HealthAddr() string {
368
if s.builtinServices == nil {
369
return ""
370
}
371
return "http://" + s.builtinServices.Health.Addr
372
}
373
func (s *Server) HTTPAddress() string {
374
return httpAddress(s.options.config.Services.HTTP, s.httpListener)
375
}
376
func (s *Server) GRPCAddress() string {
377
// If the server hasn't started, it won't have a listener yet
378
if s.grpcListener == nil {
379
return ""
380
}
381
382
return s.grpcListener.Addr().String()
383
}
384
385
const (
386
BuiltinDebugPort = 6060
387
BuiltinMetricsPort = 9500
388
BuiltinHealthPort = 9501
389
390
BuiltinMetricsPortName = "metrics"
391
)
392
393
type builtinServices struct {
394
underTest bool
395
396
Debug *http.Server
397
Health *http.Server
398
Metrics *http.Server
399
}
400
401
func newBuiltinServices(server *Server) *builtinServices {
402
healthAddr := fmt.Sprintf(":%d", BuiltinHealthPort)
403
if server.options.underTest {
404
healthAddr = ":0"
405
}
406
407
return &builtinServices{
408
underTest: server.options.underTest,
409
Debug: &http.Server{
410
Addr: fmt.Sprintf(":%d", BuiltinDebugPort),
411
Handler: pprof.Handler(),
412
},
413
Health: &http.Server{
414
Addr: healthAddr,
415
Handler: server.healthEndpoint(),
416
},
417
Metrics: &http.Server{
418
Addr: fmt.Sprintf("127.0.0.1:%d", BuiltinMetricsPort),
419
Handler: server.metricsEndpoint(),
420
},
421
}
422
}
423
424
func (s *builtinServices) ListenAndServe() error {
425
if s == nil {
426
return nil
427
}
428
429
var eg errgroup.Group
430
if !s.underTest {
431
eg.Go(func() error { return s.Debug.ListenAndServe() })
432
eg.Go(func() error { return s.Metrics.ListenAndServe() })
433
}
434
eg.Go(func() error {
435
// health is the only service which has a variable address,
436
// because we need the health service to figure out if the
437
// server started at all
438
l, err := net.Listen("tcp", s.Health.Addr)
439
if err != nil {
440
return err
441
}
442
s.Health.Addr = l.Addr().String()
443
err = s.Health.Serve(l)
444
if err == http.ErrServerClosed {
445
return nil
446
}
447
return err
448
})
449
return eg.Wait()
450
}
451
452
func (s *builtinServices) Close() error {
453
var eg errgroup.Group
454
eg.Go(func() error { return s.Debug.Close() })
455
eg.Go(func() error { return s.Metrics.Close() })
456
eg.Go(func() error { return s.Health.Close() })
457
return eg.Wait()
458
}
459
460
func httpAddress(cfg *ServerConfiguration, l net.Listener) string {
461
if l == nil {
462
return ""
463
}
464
protocol := "http"
465
if cfg != nil && cfg.TLS != nil {
466
protocol = "https"
467
}
468
return fmt.Sprintf("%s://%s", protocol, l.Addr().String())
469
}
470
471