Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/pkg/serverapi/publicapi.go
2500 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package serverapi
6
7
import (
8
"context"
9
"crypto/tls"
10
"encoding/json"
11
"errors"
12
"fmt"
13
"io"
14
"sync"
15
"time"
16
17
backoff "github.com/cenkalti/backoff/v4"
18
"github.com/gitpod-io/gitpod/common-go/log"
19
v1 "github.com/gitpod-io/gitpod/components/public-api/go/experimental/v1"
20
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
21
"github.com/gitpod-io/gitpod/supervisor/api"
22
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
23
"github.com/prometheus/client_golang/prometheus"
24
"google.golang.org/grpc"
25
"google.golang.org/grpc/codes"
26
"google.golang.org/grpc/credentials"
27
"google.golang.org/grpc/metadata"
28
"google.golang.org/grpc/status"
29
)
30
31
type APIInterface interface {
32
GetToken(ctx context.Context, query *gitpod.GetTokenSearchOptions) (res *gitpod.Token, err error)
33
OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePort) (res *gitpod.WorkspaceInstancePort, err error)
34
UpdateGitStatus(ctx context.Context, status *gitpod.WorkspaceInstanceRepoStatus) (err error)
35
WorkspaceUpdates(ctx context.Context) (<-chan *gitpod.WorkspaceInstance, error)
36
SendHeartbeat(ctx context.Context) (err error)
37
38
// Metrics
39
RegisterMetrics(registry *prometheus.Registry) error
40
}
41
42
const (
43
// KindGitpod marks tokens that provide access to the Gitpod server API.
44
KindGitpod = "gitpod"
45
)
46
47
var errNotConnected = errors.New("not connected to server/public api")
48
49
type ServiceConfig struct {
50
Host string
51
Endpoint string
52
InstanceID string
53
WorkspaceID string
54
OwnerID string
55
SupervisorVersion string
56
ConfigcatEnabled bool
57
}
58
59
type Service struct {
60
cfg *ServiceConfig
61
token string
62
63
// publicAPIConn public API publicAPIConn
64
publicAPIConn *grpc.ClientConn
65
66
// subs is the subscribers of workspaceUpdates
67
subs map[chan *gitpod.WorkspaceInstance]struct{}
68
subMutex sync.Mutex
69
70
apiMetrics *ClientMetrics
71
}
72
73
// SendHeartbeat implements APIInterface.
74
func (s *Service) SendHeartbeat(ctx context.Context) (err error) {
75
if s == nil {
76
return errNotConnected
77
}
78
startTime := time.Now()
79
defer func() {
80
s.apiMetrics.ProcessMetrics("SendHeartbeat", err, startTime)
81
}()
82
83
workspaceID := s.cfg.WorkspaceID
84
service := v1.NewIDEClientServiceClient(s.publicAPIConn)
85
86
payload := &v1.SendHeartbeatRequest{
87
WorkspaceId: workspaceID,
88
}
89
_, err = service.SendHeartbeat(ctx, payload)
90
if err != nil {
91
log.WithField("method", "SendHeartbeat").WithError(err).Error("failed to call PublicAPI")
92
}
93
return err
94
}
95
96
var _ APIInterface = (*Service)(nil)
97
98
func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.TokenServiceServer) *Service {
99
tknres, err := tknsrv.GetToken(context.Background(), &api.GetTokenRequest{
100
Kind: KindGitpod,
101
Host: cfg.Host,
102
Scope: []string{
103
"function:getToken",
104
"function:openPort",
105
"function:trackEvent",
106
"function:getWorkspace",
107
"function:sendHeartBeat",
108
},
109
})
110
if err != nil {
111
log.WithError(err).Error("cannot get token for Gitpod API")
112
return nil
113
}
114
115
service := &Service{
116
token: tknres.Token,
117
cfg: cfg,
118
apiMetrics: NewClientMetrics(),
119
subs: make(map[chan *gitpod.WorkspaceInstance]struct{}),
120
}
121
122
// public api
123
service.tryConnToPublicAPI(ctx)
124
125
// start to listen on real instance updates
126
go service.onWorkspaceUpdates(ctx)
127
128
return service
129
}
130
131
func (s *Service) tryConnToPublicAPI(ctx context.Context) {
132
endpoint := fmt.Sprintf("api.%s:443", s.cfg.Host)
133
log.WithField("endpoint", endpoint).Info("connecting to PublicAPI...")
134
opts := []grpc.DialOption{
135
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})),
136
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient([]grpc.StreamClientInterceptor{
137
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
138
withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)
139
return streamer(withAuth, desc, cc, method, opts...)
140
},
141
}...)),
142
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient([]grpc.UnaryClientInterceptor{
143
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
144
withAuth := metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+s.token)
145
return invoker(withAuth, method, req, reply, cc, opts...)
146
},
147
}...)),
148
}
149
if conn, err := grpc.Dial(endpoint, opts...); err != nil {
150
log.WithError(err).Errorf("failed to dial public api %s", endpoint)
151
} else {
152
s.publicAPIConn = conn
153
go func() {
154
<-ctx.Done()
155
s.publicAPIConn.Close()
156
}()
157
}
158
}
159
160
func (s *Service) GetToken(ctx context.Context, query *gitpod.GetTokenSearchOptions) (res *gitpod.Token, err error) {
161
if s == nil {
162
return nil, errNotConnected
163
}
164
startTime := time.Now()
165
defer func() {
166
s.apiMetrics.ProcessMetrics("GetToken", err, startTime)
167
}()
168
169
service := v1.NewUserServiceClient(s.publicAPIConn)
170
resp, err := service.GetGitToken(ctx, &v1.GetGitTokenRequest{
171
Host: query.Host,
172
})
173
if err != nil {
174
log.WithField("method", "GetGitToken").WithError(err).Error("failed to call PublicAPI")
175
return nil, err
176
}
177
return &gitpod.Token{
178
ExpiryDate: resp.Token.ExpiryDate,
179
IDToken: resp.Token.IdToken,
180
RefreshToken: resp.Token.RefreshToken,
181
Scopes: resp.Token.Scopes,
182
UpdateDate: resp.Token.UpdateDate,
183
Username: resp.Token.Username,
184
Value: resp.Token.Value,
185
}, nil
186
}
187
188
func (s *Service) UpdateGitStatus(ctx context.Context, status *gitpod.WorkspaceInstanceRepoStatus) (err error) {
189
if s == nil {
190
return errNotConnected
191
}
192
startTime := time.Now()
193
defer func() {
194
s.apiMetrics.ProcessMetrics("UpdateGitStatus", err, startTime)
195
}()
196
workspaceID := s.cfg.WorkspaceID
197
service := v1.NewIDEClientServiceClient(s.publicAPIConn)
198
payload := &v1.UpdateGitStatusRequest{
199
WorkspaceId: workspaceID,
200
}
201
if status != nil {
202
payload.Status = capGitStatusLength(&v1.GitStatus{
203
Branch: status.Branch,
204
LatestCommit: status.LatestCommit,
205
TotalUncommitedFiles: int32(status.TotalUncommitedFiles),
206
TotalUnpushedCommits: int32(status.TotalUnpushedCommits),
207
TotalUntrackedFiles: int32(status.TotalUntrackedFiles),
208
UncommitedFiles: status.UncommitedFiles,
209
UnpushedCommits: status.UnpushedCommits,
210
UntrackedFiles: status.UntrackedFiles,
211
})
212
}
213
_, err = service.UpdateGitStatus(ctx, payload)
214
return
215
}
216
217
func (s *Service) OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePort) (res *gitpod.WorkspaceInstancePort, err error) {
218
if s == nil {
219
return nil, errNotConnected
220
}
221
startTime := time.Now()
222
defer func() {
223
s.apiMetrics.ProcessMetrics("OpenPort", err, startTime)
224
}()
225
workspaceID := s.cfg.WorkspaceID
226
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)
227
228
payload := &v1.UpdatePortRequest{
229
WorkspaceId: workspaceID,
230
Port: &v1.PortSpec{
231
Port: uint64(port.Port),
232
},
233
}
234
if port.Visibility == gitpod.PortVisibilityPublic {
235
payload.Port.Policy = v1.PortPolicy_PORT_POLICY_PUBLIC
236
} else {
237
payload.Port.Policy = v1.PortPolicy_PORT_POLICY_PRIVATE
238
}
239
if port.Protocol == gitpod.PortProtocolHTTPS {
240
payload.Port.Protocol = v1.PortProtocol_PORT_PROTOCOL_HTTPS
241
} else {
242
payload.Port.Protocol = v1.PortProtocol_PORT_PROTOCOL_HTTP
243
}
244
_, err = service.UpdatePort(ctx, payload)
245
if err != nil {
246
log.WithField("method", "UpdatePort").WithError(err).Error("failed to call PublicAPI")
247
return nil, err
248
}
249
// server don't respond anything
250
// see https://github.com/gitpod-io/gitpod/blob/2967579c330de67090d975661a6e3e1cd970ab68/components/server/src/workspace/gitpod-server-impl.ts#L1521
251
return port, nil
252
}
253
254
// onWorkspaceUpdates listen to server and public API workspaceUpdates and publish to subscribers once Service created.
255
func (s *Service) onWorkspaceUpdates(ctx context.Context) {
256
errChan := make(chan error)
257
processUpdate := func() context.CancelFunc {
258
childCtx, cancel := context.WithCancel(ctx)
259
go s.publicAPIWorkspaceUpdate(childCtx, errChan)
260
return cancel
261
}
262
go func() {
263
cancel := processUpdate()
264
defer func() {
265
cancel()
266
}()
267
// force reconnect after 7m to avoid unexpected 10m reconnection (internal error)
268
ticker := time.NewTicker(7 * time.Minute)
269
for {
270
select {
271
case <-ctx.Done():
272
ticker.Stop()
273
return
274
case <-ticker.C:
275
cancel()
276
cancel = processUpdate()
277
case err := <-errChan:
278
if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
279
continue
280
}
281
code := status.Code(err)
282
if code == codes.PermissionDenied {
283
log.WithError(err).Fatalf("failed to on instance update: have no permission")
284
}
285
log.WithField("method", "WorkspaceUpdates").WithError(err).Error("failed to listen")
286
cancel()
287
time.Sleep(time.Second * 2)
288
cancel = processUpdate()
289
}
290
}
291
}()
292
}
293
294
func (s *Service) WorkspaceUpdates(ctx context.Context) (<-chan *gitpod.WorkspaceInstance, error) {
295
if s == nil {
296
return nil, errNotConnected
297
}
298
ch := make(chan *gitpod.WorkspaceInstance)
299
s.subMutex.Lock()
300
s.subs[ch] = struct{}{}
301
s.subMutex.Unlock()
302
303
go func() {
304
defer func() {
305
close(ch)
306
}()
307
<-ctx.Done()
308
s.subMutex.Lock()
309
delete(s.subs, ch)
310
s.subMutex.Unlock()
311
}()
312
return ch, nil
313
}
314
315
func (s *Service) publicAPIWorkspaceUpdate(ctx context.Context, errChan chan error) {
316
workspaceID := s.cfg.WorkspaceID
317
resp, err := backoff.RetryWithData(func() (v1.WorkspacesService_StreamWorkspaceStatusClient, error) {
318
startTime := time.Now()
319
var err error
320
defer func() {
321
if err != nil {
322
s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)
323
}
324
}()
325
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)
326
resp, err := service.StreamWorkspaceStatus(ctx, &v1.StreamWorkspaceStatusRequest{
327
WorkspaceId: workspaceID,
328
})
329
if err != nil {
330
log.WithError(err).Info("backoff failed to get workspace service client of PublicAPI, try again")
331
}
332
return resp, err
333
}, backoff.WithContext(ConnBackoff, ctx))
334
if err != nil {
335
// we don't care about ctx canceled
336
if ctx.Err() != nil {
337
return
338
}
339
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to call PublicAPI")
340
errChan <- err
341
return
342
}
343
startTime := time.Now()
344
defer func() {
345
s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)
346
}()
347
var data *v1.StreamWorkspaceStatusResponse
348
for {
349
data, err = resp.Recv()
350
if err != nil {
351
code := status.Code(err)
352
if err != io.EOF && ctx.Err() == nil && code != codes.Canceled {
353
log.WithField("method", "StreamWorkspaceStatus").WithError(err).Error("failed to receive status update")
354
}
355
if ctx.Err() != nil || code == codes.Canceled {
356
return
357
}
358
errChan <- err
359
return
360
}
361
s.subMutex.Lock()
362
for sub := range s.subs {
363
sub <- workspaceStatusToWorkspaceInstance(data.Result)
364
}
365
s.subMutex.Unlock()
366
}
367
}
368
369
var ConnBackoff = &backoff.ExponentialBackOff{
370
InitialInterval: 2 * time.Second,
371
RandomizationFactor: 0.5,
372
Multiplier: 1.5,
373
MaxInterval: 30 * time.Second,
374
MaxElapsedTime: 0,
375
Stop: backoff.Stop,
376
Clock: backoff.SystemClock,
377
}
378
379
func (s *Service) RegisterMetrics(registry *prometheus.Registry) error {
380
if s == nil {
381
return errNotConnected
382
}
383
return registry.Register(s.apiMetrics)
384
}
385
386
func workspaceStatusToWorkspaceInstance(status *v1.WorkspaceStatus) *gitpod.WorkspaceInstance {
387
instance := &gitpod.WorkspaceInstance{
388
CreationTime: status.Instance.CreatedAt.String(),
389
ID: status.Instance.InstanceId,
390
Status: &gitpod.WorkspaceInstanceStatus{
391
ExposedPorts: []*gitpod.WorkspaceInstancePort{},
392
Message: status.Instance.Status.Message,
393
// OwnerToken: "", not used so ignore
394
Phase: status.Instance.Status.Phase.String(),
395
Timeout: status.Instance.Status.Conditions.Timeout,
396
Version: int(status.Instance.Status.StatusVersion),
397
},
398
WorkspaceID: status.Instance.WorkspaceId,
399
}
400
for _, port := range status.Instance.Status.Ports {
401
info := &gitpod.WorkspaceInstancePort{
402
Port: float64(port.Port),
403
URL: port.Url,
404
}
405
if port.Policy == v1.PortPolicy_PORT_POLICY_PUBLIC {
406
info.Visibility = gitpod.PortVisibilityPublic
407
} else {
408
info.Visibility = gitpod.PortVisibilityPrivate
409
}
410
if port.Protocol == v1.PortProtocol_PORT_PROTOCOL_HTTPS {
411
info.Protocol = gitpod.PortProtocolHTTPS
412
} else {
413
info.Protocol = gitpod.PortProtocolHTTP
414
}
415
instance.Status.ExposedPorts = append(instance.Status.ExposedPorts, info)
416
}
417
return instance
418
}
419
420
const GIT_STATUS_API_LIMIT_BYTES = 4096
421
422
func capGitStatusLength(s *v1.GitStatus) *v1.GitStatus {
423
const MARGIN = 200 // bytes (we account for differences in JSON formatting, as well JSON escape characters in the static part of the status)
424
const API_BUDGET = GIT_STATUS_API_LIMIT_BYTES - MARGIN // bytes
425
426
// calculate JSON length in bytes
427
bytes, err := json.Marshal(s)
428
if err != nil {
429
log.WithError(err).Warn("cannot marshal GitStatus to calculate byte length")
430
s.UncommitedFiles = nil
431
s.UnpushedCommits = nil
432
s.UntrackedFiles = nil
433
return s
434
}
435
if len(bytes) < API_BUDGET {
436
return s
437
}
438
439
// roughly estimate how many bytes we have left for the path arrays (containing long strings)
440
budget := API_BUDGET - len(s.Branch) - len(s.LatestCommit)
441
bytesUsed := 0
442
const PLACEHOLDER = "..."
443
capArrayAtByteLimit := func(arr []string) []string {
444
result := make([]string, 0, len(arr))
445
for _, s := range arr {
446
bytesRequired := len(s) + 4 // 4 bytes for the JSON encoding
447
if bytesUsed+bytesRequired+len(PLACEHOLDER) > budget {
448
result = append(result, PLACEHOLDER)
449
bytesUsed += len(PLACEHOLDER) + 4
450
break
451
}
452
result = append(result, s)
453
bytesUsed += bytesRequired
454
}
455
return result
456
}
457
s.UncommitedFiles = capArrayAtByteLimit(s.UncommitedFiles)
458
s.UnpushedCommits = capArrayAtByteLimit(s.UnpushedCommits)
459
s.UntrackedFiles = capArrayAtByteLimit(s.UntrackedFiles)
460
461
return s
462
}
463
464