Path: blob/main/components/common-go/baseserver/server.go
2498 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package baseserver56import (7"context"8"fmt"9"io"10"net"11"net/http"12"os"13"os/signal"14"sync"15"syscall"1617common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"18"github.com/gitpod-io/gitpod/common-go/log"19"github.com/gitpod-io/gitpod/common-go/pprof"20"github.com/gitpod-io/gitpod/common-go/tracing"21grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"22grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"23"github.com/opentracing/opentracing-go"24"github.com/prometheus/client_golang/prometheus"25"github.com/prometheus/client_golang/prometheus/collectors"26"github.com/prometheus/client_golang/prometheus/promhttp"27"github.com/sirupsen/logrus"28http_metrics "github.com/slok/go-http-metrics/metrics/prometheus"29"github.com/slok/go-http-metrics/middleware"30"github.com/slok/go-http-metrics/middleware/std"31"golang.org/x/sync/errgroup"32"google.golang.org/grpc"33"google.golang.org/grpc/credentials"34"google.golang.org/grpc/health/grpc_health_v1"35"google.golang.org/grpc/reflection"36)3738func New(name string, opts ...Option) (*Server, error) {39options, err := evaluateOptions(defaultOptions(), opts...)40if err != nil {41return nil, fmt.Errorf("invalid config: %w", err)42}4344server := &Server{45Name: name,46options: options,47}48server.builtinServices = newBuiltinServices(server)49server.tracingCloser = tracing.Init(name)5051server.httpMux = http.NewServeMux()52server.http = &http.Server{Handler: std.Handler("", middleware.New(middleware.Config{53Recorder: http_metrics.NewRecorder(http_metrics.Config{54Prefix: "gitpod",55Registry: server.MetricsRegistry(),56}),57}), server.httpMux)}5859err = server.initializeMetrics()60if err != nil {61return nil, fmt.Errorf("failed to initialize metrics: %w", err)62}6364err = server.initializeGRPC()65if err != nil {66return nil, fmt.Errorf("failed to initialize gRPC server: %w", err)67}6869return server, nil70}7172// Server is a packaged server with batteries included. It is designed to be standard across components where it makes sense.73// Server implements graceful shutdown making it suitable for usage in integration tests. See server_test.go.74//75// Server is composed of the following:76// - Debug server which serves observability and debug endpoints77// - /metrics for Prometheus metrics78// - /pprof for Golang profiler79// - /ready for kubernetes readiness check80// - /live for kubernetes liveness check81// - (optional) gRPC server with standard interceptors and configuration82// - Started when baseserver is configured WithGRPCPort (port is non-negative)83// - Use Server.GRPC() to get access to the underlying grpc.Server and register services84// - (optional) HTTP server85// - Currently does not come with any standard HTTP middlewares86// - Started when baseserver is configured WithHTTPPort (port is non-negative)87// - Use Server.HTTPMux() to get access to the root handler and register your endpoints88type Server struct {89// Name is the name of this server, used for logging context90Name string9192options *options9394builtinServices *builtinServices9596// http is an http Server, only used when port is specified in cfg97http *http.Server98httpMux *http.ServeMux99httpListener net.Listener100101// grpc is a grpc Server, only used when port is specified in cfg102grpc *grpc.Server103grpcListener net.Listener104105tracingCloser io.Closer106107// listening indicates the server is serving. When closed, the server is in the process of graceful termination.108listening chan struct{}109closeOnce sync.Once110}111112func serveHTTP(cfg *ServerConfiguration, srv *http.Server, l net.Listener) (err error) {113if cfg.TLS == nil {114err = srv.Serve(l)115} else {116err = srv.ServeTLS(l, cfg.TLS.CertPath, cfg.TLS.KeyPath)117}118return119}120121func (s *Server) ListenAndServe() error {122var err error123124s.listening = make(chan struct{})125defer func() {126err := s.Close()127if err != nil {128s.Logger().WithError(err).Errorf("cannot close gracefully")129}130}()131132go func() {133err := s.builtinServices.ListenAndServe()134if err != nil {135s.Logger().WithError(err).Errorf("builtin services encountered an error - closing remaining servers.")136s.Close()137}138}()139140if srv := s.options.config.Services.HTTP; srv != nil {141s.httpListener, err = net.Listen("tcp", srv.Address)142if err != nil {143return fmt.Errorf("failed to start HTTP server: %w", err)144}145s.http.Addr = srv.Address146147go func() {148err := serveHTTP(srv, s.http, s.httpListener)149if err != nil {150s.Logger().WithError(err).Errorf("HTTP server encountered an error - closing remaining servers.")151s.Close()152}153}()154}155156if srv := s.options.config.Services.GRPC; srv != nil {157s.grpcListener, err = net.Listen("tcp", srv.Address)158if err != nil {159return fmt.Errorf("failed to start gRPC server: %w", err)160}161162go func() {163err := s.grpc.Serve(s.grpcListener)164if err != nil {165s.Logger().WithError(err).Errorf("gRPC server encountered an error - closing remaining servers.")166s.Close()167}168}()169}170171signals := make(chan os.Signal, 1)172signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)173174// Await operating system signals, or server errors.175sig := <-signals176s.Logger().Infof("Received system signal %s, closing server.", sig.String())177return nil178}179180func (s *Server) Close() error {181ctx, cancel := context.WithTimeout(context.Background(), s.options.closeTimeout)182defer cancel()183184var err error185s.closeOnce.Do(func() {186err = s.close(ctx)187})188return err189}190191func (s *Server) Logger() *logrus.Entry {192return s.options.logger193}194195func (s *Server) HTTPMux() *http.ServeMux {196return s.httpMux197}198199func (s *Server) GRPC() *grpc.Server {200return s.grpc201}202203func (s *Server) MetricsRegistry() *prometheus.Registry {204return s.options.metricsRegistry205}206207func (s *Server) Tracer() opentracing.Tracer {208return opentracing.GlobalTracer()209}210211func (s *Server) close(ctx context.Context) error {212if s.listening == nil {213return fmt.Errorf("server is not running, invalid close operation")214}215216if s.isClosing() {217s.Logger().Debug("Server is already closing.")218return nil219}220221s.Logger().Info("Received graceful shutdown request.")222close(s.listening)223224if s.grpc != nil {225s.grpc.GracefulStop()226// s.grpc.GracefulStop() also closes the underlying net.Listener, we just release the reference.227s.grpcListener = nil228s.Logger().Info("GRPC server terminated.")229}230231if s.http != nil {232err := s.http.Shutdown(ctx)233if err != nil {234return fmt.Errorf("failed to close http server: %w", err)235}236// s.http.Shutdown() also closes the underlying net.Listener, we just release the reference.237s.httpListener = nil238s.Logger().Info("HTTP server terminated.")239}240241// Always terminate builtin server last, we want to keep it running for as long as possible242err := s.builtinServices.Close()243if err != nil {244return fmt.Errorf("failed to close debug server: %w", err)245}246s.Logger().Info("Debug server terminated.")247248err = s.tracingCloser.Close()249if err != nil {250return fmt.Errorf("failed to close tracing: %w", err)251}252253return nil254}255256func (s *Server) isClosing() bool {257select {258case <-s.listening:259// listening channel is closed, we're in graceful shutdown mode260return true261default:262return false263}264}265266func (s *Server) healthEndpoint() http.Handler {267mux := http.NewServeMux()268mux.HandleFunc("/ready", s.options.healthHandler.ReadyEndpoint)269mux.HandleFunc("/live", s.options.healthHandler.LiveEndpoint)270return mux271}272273func (s *Server) metricsEndpoint() http.Handler {274mux := http.NewServeMux()275mux.Handle("/metrics", promhttp.InstrumentMetricHandler(276s.options.metricsRegistry, promhttp.HandlerFor(s.options.metricsRegistry, promhttp.HandlerOpts{}),277))278return mux279}280281func (s *Server) initializeGRPC() error {282common_grpc.SetupLogging()283284grpcMetrics := grpc_prometheus.NewServerMetrics()285grpcMetrics.EnableHandlingTimeHistogram(286grpc_prometheus.WithHistogramBuckets([]float64{.005, .025, .05, .1, .5, 1, 2.5, 5, 30, 60, 120, 240, 600}),287)288if err := s.MetricsRegistry().Register(grpcMetrics); err != nil {289return fmt.Errorf("failed to register grpc metrics: %w", err)290}291292unary := []grpc.UnaryServerInterceptor{293grpc_logrus.UnaryServerInterceptor(s.Logger(),294grpc_logrus.WithDecider(func(fullMethodName string, err error) bool {295// Skip logs for anything that does not contain an error.296if err == nil {297return false298}299// Skip gRPC healthcheck logs, they are frequent and pollute our logging infra300return fullMethodName != "/grpc.health.v1.Health/Check"301}),302),303grpcMetrics.UnaryServerInterceptor(),304}305stream := []grpc.StreamServerInterceptor{306grpc_logrus.StreamServerInterceptor(s.Logger()),307grpcMetrics.StreamServerInterceptor(),308}309310opts := common_grpc.ServerOptionsWithInterceptors(stream, unary)311if cfg := s.options.config.Services.GRPC; cfg != nil && cfg.TLS != nil {312tlsConfig, err := common_grpc.ClientAuthTLSConfig(313cfg.TLS.CAPath, cfg.TLS.CertPath, cfg.TLS.KeyPath,314common_grpc.WithSetClientCAs(true),315common_grpc.WithServerName(s.Name),316)317if err != nil {318return fmt.Errorf("failed to load certificates: %w", err)319}320321opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))322}323324opts = append(opts, grpc.MaxRecvMsgSize(100*1024*1024))325s.grpc = grpc.NewServer(opts...)326327reflection.Register(s.grpc)328329// Register health service by default330grpc_health_v1.RegisterHealthServer(s.grpc, s.options.grpcHealthCheck)331332return nil333}334335func (s *Server) initializeMetrics() error {336err := s.MetricsRegistry().Register(collectors.NewGoCollector())337if err != nil {338return fmt.Errorf("faile to register go collectors: %w", err)339}340341err = s.MetricsRegistry().Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))342if err != nil {343return fmt.Errorf("failed to register process collectors: %w", err)344}345346err = registerMetrics(s.MetricsRegistry())347if err != nil {348return fmt.Errorf("failed to register baseserver metrics: %w", err)349}350351if err := s.MetricsRegistry().Register(log.DefaultMetrics); err != nil {352return fmt.Errorf("failed to register log metrics: %w", err)353}354355reportServerVersion(s.options.version)356357return nil358}359360func (s *Server) DebugAddress() string {361if s.builtinServices == nil {362return ""363}364return "http://" + s.builtinServices.Debug.Addr365}366func (s *Server) HealthAddr() string {367if s.builtinServices == nil {368return ""369}370return "http://" + s.builtinServices.Health.Addr371}372func (s *Server) HTTPAddress() string {373return httpAddress(s.options.config.Services.HTTP, s.httpListener)374}375func (s *Server) GRPCAddress() string {376// If the server hasn't started, it won't have a listener yet377if s.grpcListener == nil {378return ""379}380381return s.grpcListener.Addr().String()382}383384const (385BuiltinDebugPort = 6060386BuiltinMetricsPort = 9500387BuiltinHealthPort = 9501388389BuiltinMetricsPortName = "metrics"390)391392type builtinServices struct {393underTest bool394395Debug *http.Server396Health *http.Server397Metrics *http.Server398}399400func newBuiltinServices(server *Server) *builtinServices {401healthAddr := fmt.Sprintf(":%d", BuiltinHealthPort)402if server.options.underTest {403healthAddr = ":0"404}405406return &builtinServices{407underTest: server.options.underTest,408Debug: &http.Server{409Addr: fmt.Sprintf(":%d", BuiltinDebugPort),410Handler: pprof.Handler(),411},412Health: &http.Server{413Addr: healthAddr,414Handler: server.healthEndpoint(),415},416Metrics: &http.Server{417Addr: fmt.Sprintf("127.0.0.1:%d", BuiltinMetricsPort),418Handler: server.metricsEndpoint(),419},420}421}422423func (s *builtinServices) ListenAndServe() error {424if s == nil {425return nil426}427428var eg errgroup.Group429if !s.underTest {430eg.Go(func() error { return s.Debug.ListenAndServe() })431eg.Go(func() error { return s.Metrics.ListenAndServe() })432}433eg.Go(func() error {434// health is the only service which has a variable address,435// because we need the health service to figure out if the436// server started at all437l, err := net.Listen("tcp", s.Health.Addr)438if err != nil {439return err440}441s.Health.Addr = l.Addr().String()442err = s.Health.Serve(l)443if err == http.ErrServerClosed {444return nil445}446return err447})448return eg.Wait()449}450451func (s *builtinServices) Close() error {452var eg errgroup.Group453eg.Go(func() error { return s.Debug.Close() })454eg.Go(func() error { return s.Metrics.Close() })455eg.Go(func() error { return s.Health.Close() })456return eg.Wait()457}458459func httpAddress(cfg *ServerConfiguration, l net.Listener) string {460if l == nil {461return ""462}463protocol := "http"464if cfg != nil && cfg.TLS != nil {465protocol = "https"466}467return fmt.Sprintf("%s://%s", protocol, l.Addr().String())468}469470471