Path: blob/main/components/supervisor/pkg/serverapi/publicapi.go
2500 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package serverapi56import (7"context"8"crypto/tls"9"encoding/json"10"errors"11"fmt"12"io"13"sync"14"time"1516backoff "github.com/cenkalti/backoff/v4"17"github.com/gitpod-io/gitpod/common-go/log"18v1 "github.com/gitpod-io/gitpod/components/public-api/go/experimental/v1"19gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"20"github.com/gitpod-io/gitpod/supervisor/api"21grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"22"github.com/prometheus/client_golang/prometheus"23"google.golang.org/grpc"24"google.golang.org/grpc/codes"25"google.golang.org/grpc/credentials"26"google.golang.org/grpc/metadata"27"google.golang.org/grpc/status"28)2930type APIInterface interface {31GetToken(ctx context.Context, query *gitpod.GetTokenSearchOptions) (res *gitpod.Token, err error)32OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePort) (res *gitpod.WorkspaceInstancePort, err error)33UpdateGitStatus(ctx context.Context, status *gitpod.WorkspaceInstanceRepoStatus) (err error)34WorkspaceUpdates(ctx context.Context) (<-chan *gitpod.WorkspaceInstance, error)35SendHeartbeat(ctx context.Context) (err error)3637// Metrics38RegisterMetrics(registry *prometheus.Registry) error39}4041const (42// KindGitpod marks tokens that provide access to the Gitpod server API.43KindGitpod = "gitpod"44)4546var errNotConnected = errors.New("not connected to server/public api")4748type ServiceConfig struct {49Host string50Endpoint string51InstanceID string52WorkspaceID string53OwnerID string54SupervisorVersion string55ConfigcatEnabled bool56}5758type Service struct {59cfg *ServiceConfig60token string6162// publicAPIConn public API publicAPIConn63publicAPIConn *grpc.ClientConn6465// subs is the subscribers of workspaceUpdates66subs map[chan *gitpod.WorkspaceInstance]struct{}67subMutex sync.Mutex6869apiMetrics *ClientMetrics70}7172// SendHeartbeat implements APIInterface.73func (s *Service) SendHeartbeat(ctx context.Context) (err error) {74if s == nil {75return errNotConnected76}77startTime := time.Now()78defer func() {79s.apiMetrics.ProcessMetrics("SendHeartbeat", err, startTime)80}()8182workspaceID := s.cfg.WorkspaceID83service := v1.NewIDEClientServiceClient(s.publicAPIConn)8485payload := &v1.SendHeartbeatRequest{86WorkspaceId: workspaceID,87}88_, err = service.SendHeartbeat(ctx, payload)89if err != nil {90log.WithField("method", "SendHeartbeat").WithError(err).Error("failed to call PublicAPI")91}92return err93}9495var _ APIInterface = (*Service)(nil)9697func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.TokenServiceServer) *Service {98tknres, err := tknsrv.GetToken(context.Background(), &api.GetTokenRequest{99Kind: KindGitpod,100Host: cfg.Host,101Scope: []string{102"function:getToken",103"function:openPort",104"function:trackEvent",105"function:getWorkspace",106"function:sendHeartBeat",107},108})109if err != nil {110log.WithError(err).Error("cannot get token for Gitpod API")111return nil112}113114service := &Service{115token: tknres.Token,116cfg: cfg,117apiMetrics: NewClientMetrics(),118subs: make(map[chan *gitpod.WorkspaceInstance]struct{}),119}120121// public api122service.tryConnToPublicAPI(ctx)123124// start to listen on real instance updates125go service.onWorkspaceUpdates(ctx)126127return service128}129130func (s *Service) tryConnToPublicAPI(ctx context.Context) {131endpoint := fmt.Sprintf("api.%s:443", s.cfg.Host)132log.WithField("endpoint", endpoint).Info("connecting to PublicAPI...")133opts := []grpc.DialOption{134grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})),135grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient([]grpc.StreamClientInterceptor{136func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {137withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)138return streamer(withAuth, desc, cc, method, opts...)139},140}...)),141grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient([]grpc.UnaryClientInterceptor{142func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {143withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)144return invoker(withAuth, method, req, reply, cc, opts...)145},146}...)),147}148if conn, err := grpc.Dial(endpoint, opts...); err != nil {149log.WithError(err).Errorf("failed to dial public api %s", endpoint)150} else {151s.publicAPIConn = conn152go func() {153<-ctx.Done()154s.publicAPIConn.Close()155}()156}157}158159func (s *Service) GetToken(ctx context.Context, query *gitpod.GetTokenSearchOptions) (res *gitpod.Token, err error) {160if s == nil {161return nil, errNotConnected162}163startTime := time.Now()164defer func() {165s.apiMetrics.ProcessMetrics("GetToken", err, startTime)166}()167168service := v1.NewUserServiceClient(s.publicAPIConn)169resp, err := service.GetGitToken(ctx, &v1.GetGitTokenRequest{170Host: query.Host,171})172if err != nil {173log.WithField("method", "GetGitToken").WithError(err).Error("failed to call PublicAPI")174return nil, err175}176return &gitpod.Token{177ExpiryDate: resp.Token.ExpiryDate,178IDToken: resp.Token.IdToken,179RefreshToken: resp.Token.RefreshToken,180Scopes: resp.Token.Scopes,181UpdateDate: resp.Token.UpdateDate,182Username: resp.Token.Username,183Value: resp.Token.Value,184}, nil185}186187func (s *Service) UpdateGitStatus(ctx context.Context, status *gitpod.WorkspaceInstanceRepoStatus) (err error) {188if s == nil {189return errNotConnected190}191startTime := time.Now()192defer func() {193s.apiMetrics.ProcessMetrics("UpdateGitStatus", err, startTime)194}()195workspaceID := s.cfg.WorkspaceID196service := v1.NewIDEClientServiceClient(s.publicAPIConn)197payload := &v1.UpdateGitStatusRequest{198WorkspaceId: workspaceID,199}200if status != nil {201payload.Status = capGitStatusLength(&v1.GitStatus{202Branch: status.Branch,203LatestCommit: status.LatestCommit,204TotalUncommitedFiles: int32(status.TotalUncommitedFiles),205TotalUnpushedCommits: int32(status.TotalUnpushedCommits),206TotalUntrackedFiles: int32(status.TotalUntrackedFiles),207UncommitedFiles: status.UncommitedFiles,208UnpushedCommits: status.UnpushedCommits,209UntrackedFiles: status.UntrackedFiles,210})211}212_, err = service.UpdateGitStatus(ctx, payload)213return214}215216func (s *Service) OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePort) (res *gitpod.WorkspaceInstancePort, err error) {217if s == nil {218return nil, errNotConnected219}220startTime := time.Now()221defer func() {222s.apiMetrics.ProcessMetrics("OpenPort", err, startTime)223}()224workspaceID := s.cfg.WorkspaceID225service := v1.NewWorkspacesServiceClient(s.publicAPIConn)226227payload := &v1.UpdatePortRequest{228WorkspaceId: workspaceID,229Port: &v1.PortSpec{230Port: uint64(port.Port),231},232}233if port.Visibility == gitpod.PortVisibilityPublic {234payload.Port.Policy = v1.PortPolicy_PORT_POLICY_PUBLIC235} else {236payload.Port.Policy = v1.PortPolicy_PORT_POLICY_PRIVATE237}238if port.Protocol == gitpod.PortProtocolHTTPS {239payload.Port.Protocol = v1.PortProtocol_PORT_PROTOCOL_HTTPS240} else {241payload.Port.Protocol = v1.PortProtocol_PORT_PROTOCOL_HTTP242}243_, err = service.UpdatePort(ctx, payload)244if err != nil {245log.WithField("method", "UpdatePort").WithError(err).Error("failed to call PublicAPI")246return nil, err247}248// server don't respond anything249// see https://github.com/gitpod-io/gitpod/blob/2967579c330de67090d975661a6e3e1cd970ab68/components/server/src/workspace/gitpod-server-impl.ts#L1521250return port, nil251}252253// onWorkspaceUpdates listen to server and public API workspaceUpdates and publish to subscribers once Service created.254func (s *Service) onWorkspaceUpdates(ctx context.Context) {255errChan := make(chan error)256processUpdate := func() context.CancelFunc {257childCtx, cancel := context.WithCancel(ctx)258go s.publicAPIWorkspaceUpdate(childCtx, errChan)259return cancel260}261go func() {262cancel := processUpdate()263defer func() {264cancel()265}()266// force reconnect after 7m to avoid unexpected 10m reconnection (internal error)267ticker := time.NewTicker(7 * time.Minute)268for {269select {270case <-ctx.Done():271ticker.Stop()272return273case <-ticker.C:274cancel()275cancel = processUpdate()276case err := <-errChan:277if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {278continue279}280code := status.Code(err)281if code == codes.PermissionDenied {282log.WithError(err).Fatalf("failed to on instance update: have no permission")283}284log.WithField("method", "WorkspaceUpdates").WithError(err).Error("failed to listen")285cancel()286time.Sleep(time.Second * 2)287cancel = processUpdate()288}289}290}()291}292293func (s *Service) WorkspaceUpdates(ctx context.Context) (<-chan *gitpod.WorkspaceInstance, error) {294if s == nil {295return nil, errNotConnected296}297ch := make(chan *gitpod.WorkspaceInstance)298s.subMutex.Lock()299s.subs[ch] = struct{}{}300s.subMutex.Unlock()301302go func() {303defer func() {304close(ch)305}()306<-ctx.Done()307s.subMutex.Lock()308delete(s.subs, ch)309s.subMutex.Unlock()310}()311return ch, nil312}313314func (s *Service) publicAPIWorkspaceUpdate(ctx context.Context, errChan chan error) {315workspaceID := s.cfg.WorkspaceID316resp, err := backoff.RetryWithData(func() (v1.WorkspacesService_StreamWorkspaceStatusClient, error) {317startTime := time.Now()318var err error319defer func() {320if err != nil {321s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)322}323}()324service := v1.NewWorkspacesServiceClient(s.publicAPIConn)325resp, err := service.StreamWorkspaceStatus(ctx, &v1.StreamWorkspaceStatusRequest{326WorkspaceId: workspaceID,327})328if err != nil {329log.WithError(err).Info("backoff failed to get workspace service client of PublicAPI, try again")330}331return resp, err332}, backoff.WithContext(ConnBackoff, ctx))333if err != nil {334// we don't care about ctx canceled335if ctx.Err() != nil {336return337}338log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to call PublicAPI")339errChan <- err340return341}342startTime := time.Now()343defer func() {344s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)345}()346var data *v1.StreamWorkspaceStatusResponse347for {348data, err = resp.Recv()349if err != nil {350code := status.Code(err)351if err != io.EOF && ctx.Err() == nil && code != codes.Canceled {352log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to receive status update")353}354if ctx.Err() != nil || code == codes.Canceled {355return356}357errChan <- err358return359}360s.subMutex.Lock()361for sub := range s.subs {362sub <- workspaceStatusToWorkspaceInstance(data.Result)363}364s.subMutex.Unlock()365}366}367368var ConnBackoff = &backoff.ExponentialBackOff{369InitialInterval: 2 * time.Second,370RandomizationFactor: 0.5,371Multiplier: 1.5,372MaxInterval: 30 * time.Second,373MaxElapsedTime: 0,374Stop: backoff.Stop,375Clock: backoff.SystemClock,376}377378func (s *Service) RegisterMetrics(registry *prometheus.Registry) error {379if s == nil {380return errNotConnected381}382return registry.Register(s.apiMetrics)383}384385func workspaceStatusToWorkspaceInstance(status *v1.WorkspaceStatus) *gitpod.WorkspaceInstance {386instance := &gitpod.WorkspaceInstance{387CreationTime: status.Instance.CreatedAt.String(),388ID: status.Instance.InstanceId,389Status: &gitpod.WorkspaceInstanceStatus{390ExposedPorts: []*gitpod.WorkspaceInstancePort{},391Message: status.Instance.Status.Message,392// OwnerToken: "", not used so ignore393Phase: status.Instance.Status.Phase.String(),394Timeout: status.Instance.Status.Conditions.Timeout,395Version: int(status.Instance.Status.StatusVersion),396},397WorkspaceID: status.Instance.WorkspaceId,398}399for _, port := range status.Instance.Status.Ports {400info := &gitpod.WorkspaceInstancePort{401Port: float64(port.Port),402URL: port.Url,403}404if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {405info.Visibility = gitpod.PortVisibilityPublic406} else {407info.Visibility = gitpod.PortVisibilityPrivate408}409if port.Protocol == v1.PortProtocol_PORT_PROTOCOL_HTTPS {410info.Protocol = gitpod.PortProtocolHTTPS411} else {412info.Protocol = gitpod.PortProtocolHTTP413}414instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)415}416return instance417}418419const GIT_STATUS_API_LIMIT_BYTES = 4096420421func capGitStatusLength(s *v1.GitStatus) *v1.GitStatus {422const MARGIN = 200 // bytes (we account for differences in JSON formatting, as well JSON escape characters in the static part of the status)423const API_BUDGET = GIT_STATUS_API_LIMIT_BYTES - MARGIN // bytes424425// calculate JSON length in bytes426bytes, err := json.Marshal(s)427if err != nil {428log.WithError(err).Warn("cannot marshal GitStatus to calculate byte length")429s.UncommitedFiles = nil430s.UnpushedCommits = nil431s.UntrackedFiles = nil432return s433}434if len(bytes) < API_BUDGET {435return s436}437438// roughly estimate how many bytes we have left for the path arrays (containing long strings)439budget := API_BUDGET - len(s.Branch) - len(s.LatestCommit)440bytesUsed := 0441const PLACEHOLDER = "..."442capArrayAtByteLimit := func(arr []string) []string {443result := make([]string, 0, len(arr))444for _, s := range arr {445bytesRequired := len(s) + 4 // 4 bytes for the JSON encoding446if bytesUsed+bytesRequired+len(PLACEHOLDER) > budget {447result = append(result, PLACEHOLDER)448bytesUsed += len(PLACEHOLDER) + 4449break450}451result = append(result, s)452bytesUsed += bytesRequired453}454return result455}456s.UncommitedFiles = capArrayAtByteLimit(s.UncommitedFiles)457s.UnpushedCommits = capArrayAtByteLimit(s.UnpushedCommits)458s.UntrackedFiles = capArrayAtByteLimit(s.UntrackedFiles)459460return s461}462463464