Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/test/pkg/integration/apis.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package integration
6
7
import (
8
"bytes"
9
"context"
10
"crypto/aes"
11
"crypto/cipher"
12
"crypto/sha256"
13
"crypto/tls"
14
"crypto/x509"
15
"database/sql"
16
"encoding/base64"
17
"encoding/json"
18
"errors"
19
"fmt"
20
"io"
21
"net"
22
"net/http"
23
"net/url"
24
"strconv"
25
"strings"
26
"sync"
27
"testing"
28
"time"
29
30
"github.com/google/uuid"
31
"golang.org/x/xerrors"
32
"google.golang.org/grpc"
33
"google.golang.org/grpc/codes"
34
"google.golang.org/grpc/credentials"
35
"google.golang.org/grpc/credentials/insecure"
36
"google.golang.org/grpc/status"
37
appsv1 "k8s.io/api/apps/v1"
38
corev1 "k8s.io/api/core/v1"
39
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40
"k8s.io/apimachinery/pkg/labels"
41
"k8s.io/apimachinery/pkg/types"
42
"sigs.k8s.io/e2e-framework/klient"
43
"sigs.k8s.io/e2e-framework/klient/k8s"
44
45
// Gitpod uses mysql, so it makes sense to make this DB driver available
46
// by default.
47
_ "github.com/go-sql-driver/mysql"
48
49
"github.com/gitpod-io/gitpod/common-go/log"
50
csapi "github.com/gitpod-io/gitpod/content-service/api"
51
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
52
imgbldr "github.com/gitpod-io/gitpod/image-builder/api"
53
"github.com/gitpod-io/gitpod/test/pkg/integration/common"
54
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
55
)
56
57
// API provides access to the individual component's API
58
func NewComponentAPI(ctx context.Context, namespace string, kubeconfig string, client klient.Client) *ComponentAPI {
59
return &ComponentAPI{
60
namespace: namespace,
61
kubeconfig: kubeconfig,
62
client: client,
63
64
closerMutex: sync.Mutex{},
65
66
wsmanStatusMu: sync.Mutex{},
67
contentServiceStatusMu: sync.Mutex{},
68
imgbldStatusMu: sync.Mutex{},
69
70
serverStatus: &serverStatus{
71
Client: make(map[string]*gitpod.APIoverJSONRPC),
72
Token: make(map[string]string),
73
PAPIClient: make(map[string]*PAPIClient),
74
},
75
}
76
}
77
78
type serverStatus struct {
79
Token map[string]string
80
Client map[string]*gitpod.APIoverJSONRPC
81
PAPIClient map[string]*PAPIClient
82
}
83
84
// ComponentAPI provides access to the individual component's API
85
type ComponentAPI struct {
86
namespace string
87
kubeconfig string
88
client klient.Client
89
90
closer []func() error
91
closerMutex sync.Mutex
92
93
serverStatus *serverStatus
94
95
wsmanStatus struct {
96
Port int
97
Client wsmanapi.WorkspaceManagerClient
98
}
99
contentServiceStatus struct {
100
Port int
101
BlobServiceClient csapi.BlobServiceClient
102
ContentService ContentService
103
}
104
imgbldStatus struct {
105
Port int
106
Client imgbldr.ImageBuilderClient
107
}
108
109
wsmanStatusMu sync.Mutex
110
contentServiceStatusMu sync.Mutex
111
imgbldStatusMu sync.Mutex
112
serverStatusMu sync.Mutex
113
}
114
115
type EncryptionKeyMetadata struct {
116
Name string
117
Version int
118
}
119
120
type EncryptionKey struct {
121
Metadata EncryptionKeyMetadata
122
Material []byte
123
}
124
125
type DBConfig struct {
126
Host string
127
Port int32
128
ForwardPort *ForwardPort
129
Password string
130
EncryptionKeys EncryptionKey
131
}
132
133
type ForwardPort struct {
134
PodName string
135
RemotePort int32
136
}
137
138
type EncriptedDBData struct {
139
Data string `json:"data"`
140
KeyParams struct {
141
Iv string `json:"iv"`
142
} `json:"keyParams"`
143
KeyMetadata struct {
144
Name string `json:"name"`
145
Version int `json:"version"`
146
} `json:"keyMetadata"`
147
}
148
149
func EncryptValue(value []byte, key []byte) (data string, iv string) {
150
PKCS5Padding := func(ciphertext []byte, blockSize int, after int) []byte {
151
padding := (blockSize - len(ciphertext)%blockSize)
152
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
153
return append(ciphertext, padtext...)
154
}
155
156
ivData := []byte("1234567890123456")
157
158
block, _ := aes.NewCipher(key)
159
mode := cipher.NewCBCEncrypter(block, ivData)
160
161
paddedValue := PKCS5Padding(value, aes.BlockSize, len(value))
162
ciphertext := make([]byte, len(paddedValue))
163
mode.CryptBlocks(ciphertext, paddedValue)
164
165
data = base64.StdEncoding.EncodeToString(ciphertext)
166
iv = base64.StdEncoding.EncodeToString(ivData)
167
168
return
169
}
170
171
// Storage provides a url of the storage provider
172
// it takes a url as input and creates a port forward if required
173
// e.g. when minio running in gitpod cluster
174
// and modifies the url to refer to the localhost instead of dns name
175
func (c *ComponentAPI) Storage(connUrl string) (string, error) {
176
u, err := url.Parse(connUrl)
177
if err != nil {
178
return "", err
179
}
180
host, port, _ := net.SplitHostPort(u.Host)
181
if !strings.HasSuffix(host, ".svc.cluster.local") {
182
return connUrl, nil
183
}
184
serviceName := strings.Split(host, ".")[0]
185
186
localPort, err := getFreePort()
187
if err != nil {
188
return "", err
189
}
190
191
targetPort, err := strconv.Atoi(port)
192
if err != nil {
193
return "", err
194
}
195
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
196
err = c.portFwdWithRetry(ctx, common.ForwardPortOfSvc, serviceName, localPort, targetPort)
197
if err != nil {
198
cancel()
199
return "", err
200
}
201
202
c.appendCloser(func() error { cancel(); return nil })
203
204
return strings.Replace(connUrl, u.Host, fmt.Sprintf("localhost:%d", localPort), 1), nil
205
}
206
207
// Supervisor provides a gRPC connection to a workspace's supervisor
208
func (c *ComponentAPI) Supervisor(instanceID string) (grpc.ClientConnInterface, error) {
209
pod, _, err := selectPod(ComponentWorkspace, selectPodOptions{
210
InstanceID: instanceID,
211
}, c.namespace, c.client)
212
if err != nil {
213
return nil, err
214
}
215
216
localPort, err := getFreePort()
217
if err != nil {
218
return nil, err
219
}
220
221
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
222
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, pod, localPort, 8080)
223
if err != nil {
224
cancel()
225
return nil, err
226
}
227
c.appendCloser(func() error { cancel(); return nil })
228
229
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", localPort), grpc.WithTransportCredentials(insecure.NewCredentials()))
230
if err != nil {
231
return nil, err
232
}
233
234
c.appendCloser(conn.Close)
235
return conn, nil
236
}
237
238
type gitpodServerOpts struct {
239
User string
240
}
241
242
// GitpodServerOpt specificies Gitpod server access
243
type GitpodServerOpt func(*gitpodServerOpts) error
244
245
// WithGitpodUser specifies the user as which we want to access the API.
246
func WithGitpodUser(name string) GitpodServerOpt {
247
return func(o *gitpodServerOpts) error {
248
o.User = name
249
return nil
250
}
251
}
252
253
func (c *ComponentAPI) CreateOAuth2Token(user string, scopes []string) (string, error) {
254
tkn, err := c.createGitpodToken(user, scopes)
255
if err != nil {
256
return "", err
257
}
258
return tkn, nil
259
}
260
261
func (c *ComponentAPI) ClearGitpodServerClientCache() {
262
c.serverStatus.Client = map[string]*gitpod.APIoverJSONRPC{}
263
}
264
265
// GitpodServer provides access to the Gitpod server API
266
func (c *ComponentAPI) GitpodServer(opts ...GitpodServerOpt) (gitpod.APIInterface, error) {
267
var options gitpodServerOpts
268
for _, o := range opts {
269
err := o(&options)
270
if err != nil {
271
return nil, xerrors.Errorf("cannot access Gitpod server API: %q", err)
272
}
273
}
274
275
if cl, ok := c.serverStatus.Client[options.User]; ok {
276
return cl, nil
277
}
278
279
var res gitpod.APIInterface
280
err := func() error {
281
tkn := c.serverStatus.Token[options.User]
282
if tkn == "" {
283
var err error
284
tkn, err = c.createGitpodToken(options.User, []string{
285
"resource:default",
286
"function:*",
287
})
288
if err != nil {
289
return err
290
}
291
func() {
292
c.serverStatusMu.Lock()
293
defer c.serverStatusMu.Unlock()
294
c.serverStatus.Token[options.User] = tkn
295
}()
296
}
297
298
var pods corev1.PodList
299
err := c.client.Resources(c.namespace).List(context.Background(), &pods, func(opts *metav1.ListOptions) {
300
opts.LabelSelector = "component=server"
301
})
302
if err != nil {
303
return err
304
}
305
306
config, err := GetServerConfig(c.namespace, c.client)
307
if err != nil {
308
return err
309
}
310
311
hostURL := config.HostURL
312
if hostURL == "" {
313
return xerrors.Errorf("server config: empty HostURL")
314
}
315
316
hostURL = strings.ReplaceAll(hostURL, "http://", "ws://")
317
hostURL = strings.ReplaceAll(hostURL, "https://", "wss://")
318
endpoint, err := url.Parse(hostURL)
319
if err != nil {
320
return err
321
}
322
endpoint.Path = "/api/v1"
323
324
cl, err := gitpod.ConnectToServer(endpoint.String(), gitpod.ConnectToServerOpts{
325
Token: tkn,
326
Log: log.Log,
327
})
328
if err != nil {
329
return err
330
}
331
332
func() {
333
c.serverStatusMu.Lock()
334
defer c.serverStatusMu.Unlock()
335
c.serverStatus.Client[options.User] = cl
336
}()
337
338
res = cl
339
c.appendCloser(cl.Close)
340
341
return nil
342
}()
343
if err != nil {
344
return nil, xerrors.Errorf("cannot access Gitpod server API: %q", err)
345
}
346
347
return res, nil
348
}
349
350
func (c *ComponentAPI) GetServerEndpoint() (string, error) {
351
config, err := GetServerConfig(c.namespace, c.client)
352
if err != nil {
353
return "", err
354
}
355
356
hostURL := config.HostURL
357
if hostURL == "" {
358
return "", xerrors.Errorf("server config: empty HostURL")
359
}
360
361
endpoint, err := url.Parse(hostURL)
362
if err != nil {
363
return "", err
364
}
365
366
return fmt.Sprintf("%s://%s/", "https", endpoint.Hostname()), nil
367
}
368
369
func (c *ComponentAPI) GitpodSessionCookie(userId string, secretKey string) (*http.Cookie, error) {
370
var res *http.Cookie
371
err := func() error {
372
config, err := GetServerConfig(c.namespace, c.client)
373
if err != nil {
374
return err
375
}
376
377
hostURL := config.HostURL
378
if hostURL == "" {
379
return xerrors.Errorf("server config: empty HostURL")
380
}
381
382
endpoint, err := url.Parse(hostURL)
383
if err != nil {
384
return err
385
}
386
387
origin := fmt.Sprintf("%s://%s/", "https", endpoint.Hostname())
388
389
client := &http.Client{
390
CheckRedirect: func(req *http.Request, via []*http.Request) error {
391
return http.ErrUseLastResponse
392
},
393
}
394
395
req, _ := http.NewRequest("GET", hostURL+fmt.Sprintf("/api/login/ots/%s/%s", userId, secretKey), nil)
396
req.Header.Set("Origin", origin)
397
req.Header.Set("Cache-Control", "no-store")
398
399
httpresp, err := client.Do(req)
400
if err != nil {
401
return err
402
}
403
404
cookies := httpresp.Cookies()
405
if len(cookies) > 0 {
406
res = cookies[0]
407
}
408
409
return nil
410
}()
411
if err != nil {
412
return nil, err
413
}
414
if res == nil {
415
return nil, xerrors.Errorf("Server did not provide a session cookie")
416
}
417
418
return res, nil
419
}
420
421
func (c *ComponentAPI) GetUserId(user string) (userId string, err error) {
422
db, err := c.DB()
423
if err != nil {
424
return "", err
425
}
426
427
var row *sql.Row
428
if user == "" {
429
row = db.QueryRow(`SELECT id FROM d_b_user WHERE NOT id = "` + gitpodBuiltinUserID + `" AND blocked = FALSE AND markedDeleted = FALSE`)
430
} else {
431
row = db.QueryRow("SELECT id FROM d_b_user WHERE name = ? AND blocked != 1 and markedDeleted != 1", user)
432
}
433
434
var id string
435
err = row.Scan(&id)
436
if err == sql.ErrNoRows {
437
return "", xerrors.Errorf("no suitable user found: make sure there's at least one non-builtin user in the database (e.g. login)")
438
}
439
if err != nil {
440
return "", xerrors.Errorf("cannot look for users: %w", err)
441
}
442
443
return id, nil
444
}
445
446
func (c *ComponentAPI) UpdateUserFeatureFlag(userId, featureFlag string) error {
447
db, err := c.DB()
448
if err != nil {
449
return err
450
}
451
452
if _, err = db.Exec("SELECT id FROM d_b_user WHERE id = ?", userId); err != nil {
453
return err
454
}
455
456
if _, err = db.Exec("UPDATE d_b_user SET featureFlags=? WHERE id = ?", fmt.Sprintf("{\"permanentWSFeatureFlags\":[%q]}", featureFlag), userId); err != nil {
457
return err
458
}
459
return nil
460
}
461
462
func (c *ComponentAPI) CreateUser(username string, token string) (string, error) {
463
dbConfig, err := FindDBConfigFromPodEnv("server", c.namespace, c.client)
464
if err != nil {
465
return "", err
466
}
467
468
db, err := c.DB()
469
if err != nil {
470
return "", err
471
}
472
473
var userId string
474
err = db.QueryRow(`SELECT id FROM d_b_user WHERE name = ? and markedDeleted != 1 and blocked != 1`, username).Scan(&userId)
475
if err != nil && !errors.Is(err, sql.ErrNoRows) {
476
return "", err
477
}
478
479
if userId == "" {
480
userUuid, err := uuid.NewRandom()
481
if err != nil {
482
return "", err
483
}
484
485
userId = userUuid.String()
486
_, err = db.Exec(`INSERT IGNORE INTO d_b_user (id, creationDate, avatarUrl, name, fullName, featureFlags, lastVerificationTime) VALUES (?, ?, ?, ?, ?, ?, ?)`,
487
userId,
488
time.Now().Format(time.RFC3339),
489
"",
490
username,
491
username,
492
"{\"permanentWSFeatureFlags\":[]}",
493
time.Now().Format(time.RFC3339),
494
)
495
if err != nil {
496
return "", err
497
}
498
}
499
500
var authId string
501
err = db.QueryRow(`SELECT authId FROM d_b_identity WHERE userId = ?`, userId).Scan(&authId)
502
if err != nil && !errors.Is(err, sql.ErrNoRows) {
503
return "", err
504
}
505
if authId == "" {
506
authId = strconv.FormatInt(time.Now().UnixMilli(), 10)
507
_, err = db.Exec(`INSERT IGNORE INTO d_b_identity (authProviderId, authId, authName, userId) VALUES (?, ?, ?, ?)`,
508
"Public-GitHub",
509
authId,
510
username,
511
userId,
512
)
513
if err != nil {
514
return "", err
515
}
516
}
517
518
var cnt int
519
err = db.QueryRow(`SELECT COUNT(1) AS cnt FROM d_b_token_entry WHERE authId = ?`, authId).Scan(&cnt)
520
if err != nil && !errors.Is(err, sql.ErrNoRows) {
521
return "", err
522
}
523
if cnt == 0 {
524
uid, err := uuid.NewRandom()
525
if err != nil {
526
return "", err
527
}
528
529
// Double Marshalling to be compatible with EncryptionServiceImpl
530
value := struct {
531
Value string `json:"value"`
532
Scopes []string `json:"scopes"`
533
}{
534
Value: token,
535
Scopes: []string{"user:email", "read:user", "public_repo"},
536
}
537
valueBytes, err := json.Marshal(value)
538
if err != nil {
539
return "", err
540
}
541
valueBytes2, err := json.Marshal(string(valueBytes))
542
if err != nil {
543
return "", err
544
}
545
546
encryptedData, iv := EncryptValue(valueBytes2, dbConfig.EncryptionKeys.Material)
547
encrypted := EncriptedDBData{}
548
encrypted.Data = encryptedData
549
encrypted.KeyParams.Iv = iv
550
encrypted.KeyMetadata.Name = dbConfig.EncryptionKeys.Metadata.Name
551
encrypted.KeyMetadata.Version = dbConfig.EncryptionKeys.Metadata.Version
552
encryptedJson, err := json.Marshal(encrypted)
553
if err != nil {
554
return "", err
555
}
556
557
_, err = db.Exec(`INSERT IGNORE INTO d_b_token_entry (authProviderId, authId, token, uid) VALUES (?, ?, ?, ?)`,
558
"Public-GitHub",
559
authId,
560
encryptedJson,
561
uid.String(),
562
)
563
if err != nil {
564
return "", err
565
}
566
}
567
568
return userId, nil
569
}
570
571
func (c *ComponentAPI) createGitpodToken(user string, scopes []string) (tkn string, err error) {
572
id, err := c.GetUserId(user)
573
if err != nil {
574
return "", err
575
}
576
577
rawTkn, err := uuid.NewRandom()
578
if err != nil {
579
return "", err
580
}
581
tkn = rawTkn.String()
582
583
hash := sha256.New()
584
hash.Write([]byte(tkn))
585
hashVal := fmt.Sprintf("%x", hash.Sum(nil))
586
587
// see https://github.com/gitpod-io/gitpod/blob/master/components/gitpod-protocol/src/protocol.ts#L274
588
const tokenTypeMachineAuthToken = 1
589
590
db, err := c.DB()
591
if err != nil {
592
return "", err
593
}
594
_, err = db.Exec("INSERT INTO d_b_gitpod_token (tokenHash, name, type, userId, scopes, created) VALUES (?, ?, ?, ?, ?, ?)",
595
hashVal,
596
fmt.Sprintf("integration-test-%d", time.Now().UnixNano()),
597
tokenTypeMachineAuthToken,
598
id,
599
strings.Join(scopes, ","),
600
time.Now().Format(time.RFC3339),
601
)
602
if err != nil {
603
return "", err
604
}
605
606
c.appendCloser(func() error {
607
_, err := db.Exec("DELETE FROM d_b_gitpod_token WHERE tokenHash = ?", hashVal)
608
return err
609
})
610
611
return tkn, nil
612
}
613
614
func (c *ComponentAPI) CreateGitpodOneTimeSecret(value string) (id string, err error) {
615
dbConfig, err := FindDBConfigFromPodEnv("server", c.namespace, c.client)
616
if err != nil {
617
return "", err
618
}
619
620
db, err := c.DB()
621
if err != nil {
622
return "", err
623
}
624
625
rawUuid, err := uuid.NewRandom()
626
if err != nil {
627
return "", err
628
}
629
id = rawUuid.String()
630
631
// Double Marshalling to be compatible with EncryptionServiceImpl
632
valueBytes, err := json.Marshal(value)
633
if err != nil {
634
return "", err
635
}
636
valueBytes2, err := json.Marshal(string(valueBytes))
637
if err != nil {
638
return "", err
639
}
640
641
encryptedData, iv := EncryptValue(valueBytes2, dbConfig.EncryptionKeys.Material)
642
encrypted := EncriptedDBData{}
643
encrypted.Data = encryptedData
644
encrypted.KeyParams.Iv = iv
645
encrypted.KeyMetadata.Name = dbConfig.EncryptionKeys.Metadata.Name
646
encrypted.KeyMetadata.Version = dbConfig.EncryptionKeys.Metadata.Version
647
encryptedJson, err := json.Marshal(encrypted)
648
if err != nil {
649
return "", err
650
}
651
652
_, err = db.Exec("INSERT INTO d_b_one_time_secret (id, value, expirationTime, deleted) VALUES (?, ?, ?, ?)",
653
id,
654
string(encryptedJson),
655
time.Now().Add(30*time.Minute).UTC().Format("2006-01-02 15:04:05.999999"),
656
false,
657
)
658
if err != nil {
659
return "", err
660
}
661
662
c.appendCloser(func() error {
663
_, err := db.Exec("DELETE FROM d_b_one_time_secret WHERE id = ?", id)
664
return err
665
})
666
667
return id, nil
668
}
669
670
// WorkspaceManager provides access to ws-manager
671
func (c *ComponentAPI) WorkspaceManager() (wsmanapi.WorkspaceManagerClient, error) {
672
if c.wsmanStatus.Client != nil {
673
return c.wsmanStatus.Client, nil
674
}
675
676
var wsman = ComponentWorkspaceManagerMK2
677
if c.wsmanStatus.Port == 0 {
678
c.wsmanStatusMu.Lock()
679
defer c.wsmanStatusMu.Unlock()
680
681
pod, _, err := selectPod(wsman, selectPodOptions{}, c.namespace, c.client)
682
if err != nil {
683
return nil, err
684
}
685
686
localPort, err := getFreePort()
687
if err != nil {
688
return nil, err
689
}
690
691
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
692
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, pod, localPort, 8080)
693
if err != nil {
694
cancel()
695
return nil, err
696
}
697
698
c.appendCloser(func() error { cancel(); return nil })
699
c.wsmanStatus.Port = localPort
700
}
701
702
secretName := fmt.Sprintf("%s-client-tls", wsman)
703
ctx, cancel := context.WithCancel(context.Background())
704
705
c.appendCloser(func() error { cancel(); return nil })
706
707
var secret corev1.Secret
708
err := c.client.Resources().Get(ctx, secretName, c.namespace, &secret)
709
if err != nil {
710
return nil, err
711
}
712
713
caCrt := secret.Data["ca.crt"]
714
tlsCrt := secret.Data["tls.crt"]
715
tlsKey := secret.Data["tls.key"]
716
717
certPool := x509.NewCertPool()
718
if !certPool.AppendCertsFromPEM(caCrt) {
719
return nil, xerrors.Errorf("failed appending CA cert")
720
}
721
cert, err := tls.X509KeyPair(tlsCrt, tlsKey)
722
if err != nil {
723
return nil, err
724
}
725
creds := credentials.NewTLS(&tls.Config{
726
Certificates: []tls.Certificate{cert},
727
RootCAs: certPool,
728
ServerName: string(wsman),
729
})
730
dialOption := grpc.WithTransportCredentials(creds)
731
732
wsport := fmt.Sprintf("localhost:%d", c.wsmanStatus.Port)
733
conn, err := grpc.Dial(wsport, dialOption)
734
if err != nil {
735
return nil, err
736
}
737
c.appendCloser(conn.Close)
738
739
c.wsmanStatus.Client = wsmanapi.NewWorkspaceManagerClient(conn)
740
return c.wsmanStatus.Client, nil
741
}
742
743
func (c *ComponentAPI) ClearWorkspaceManagerClientCache() {
744
c.wsmanStatus.Client = nil
745
c.wsmanStatus.Port = 0
746
}
747
748
// BlobService provides access to the blob service of the content service
749
func (c *ComponentAPI) BlobService() (csapi.BlobServiceClient, error) {
750
if c.contentServiceStatus.BlobServiceClient != nil {
751
return c.contentServiceStatus.BlobServiceClient, nil
752
}
753
754
if c.contentServiceStatus.Port == 0 {
755
c.contentServiceStatusMu.Lock()
756
defer c.contentServiceStatusMu.Unlock()
757
758
pod, _, err := selectPod(ComponentContentService, selectPodOptions{}, c.namespace, c.client)
759
if err != nil {
760
return nil, err
761
}
762
763
localPort, err := getFreePort()
764
if err != nil {
765
return nil, err
766
}
767
768
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
769
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, pod, localPort, 8080)
770
if err != nil {
771
cancel()
772
return nil, err
773
}
774
c.appendCloser(func() error { cancel(); return nil })
775
c.contentServiceStatus.Port = localPort
776
}
777
778
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", c.contentServiceStatus.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
779
if err != nil {
780
return nil, err
781
}
782
c.appendCloser(conn.Close)
783
784
c.contentServiceStatus.BlobServiceClient = csapi.NewBlobServiceClient(conn)
785
return c.contentServiceStatus.BlobServiceClient, nil
786
}
787
788
func (c *ComponentAPI) ClearBlobServiceClientCache() {
789
c.contentServiceStatus.BlobServiceClient = nil
790
c.contentServiceStatus.Port = 0
791
}
792
793
type dbOpts struct {
794
Database string
795
}
796
797
// DNOpt configures DB access
798
type DBOpt func(*dbOpts)
799
800
// DBName forces a particular database
801
func DBName(name string) DBOpt {
802
return func(o *dbOpts) {
803
o.Database = name
804
}
805
}
806
807
var (
808
// cachedDBs caches DB connections per database name, so we don't have to re-establish connections all the time,
809
// saving us a lot of time in integration tests.
810
// The cache gets cleaned up when the component is closed.
811
cachedDBs = sync.Map{}
812
)
813
814
// DB provides access to the Gitpod database.
815
// Callers must never close the DB.
816
func (c *ComponentAPI) DB(options ...DBOpt) (*sql.DB, error) {
817
opts := dbOpts{
818
Database: "gitpod",
819
}
820
for _, o := range options {
821
o(&opts)
822
}
823
824
if db, ok := cachedDBs.Load(opts.Database); ok {
825
actualDb := db.(*sql.DB)
826
return actualDb, nil
827
}
828
829
config, err := c.findDBConfig()
830
if err != nil {
831
return nil, err
832
}
833
834
// if configured: setup local port-forward to DB pod
835
if config.ForwardPort != nil {
836
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
837
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, config.ForwardPort.PodName, int(config.Port), int(config.ForwardPort.RemotePort))
838
if err != nil {
839
cancel()
840
return nil, err
841
}
842
c.appendCloser(func() error { cancel(); return nil })
843
}
844
845
db, err := sql.Open("mysql", fmt.Sprintf("gitpod:%s@tcp(%s:%d)/%s", config.Password, config.Host, config.Port, opts.Database))
846
if err != nil {
847
return nil, err
848
}
849
// Hack: to fix new DB connections occasionally failing with `[mysql] packets.go:33: unexpected EOF` due
850
// to getting an idle connection from the pool which has for some reason been closed.
851
db.SetMaxIdleConns(0)
852
853
cachedDBs.Store(opts.Database, db)
854
c.appendCloser(func() error {
855
cachedDBs.Delete(opts.Database)
856
return db.Close()
857
})
858
return db, nil
859
}
860
861
func (c *ComponentAPI) findDBConfig() (*DBConfig, error) {
862
config, err := FindDBConfigFromPodEnv("server", c.namespace, c.client)
863
if err != nil {
864
return nil, err
865
}
866
867
// here we _assume_ that "config" points to a service: find us a concrete DB pod to forward to
868
var svc corev1.Service
869
err = c.client.Resources(c.namespace).Get(context.Background(), config.Host, c.namespace, &svc)
870
if err != nil {
871
return nil, err
872
}
873
874
// find remotePort
875
var remotePort int32
876
for _, p := range svc.Spec.Ports {
877
if p.Port == config.Port {
878
remotePort = p.TargetPort.IntVal
879
if remotePort == 0 {
880
remotePort = p.Port
881
}
882
break
883
}
884
}
885
if remotePort == 0 {
886
return nil, xerrors.Errorf("no ports found on service: %s", svc.Name)
887
}
888
889
// find pod to forward to
890
var pods corev1.PodList
891
err = c.client.Resources(c.namespace).List(context.Background(), &pods, func(opts *metav1.ListOptions) {
892
opts.LabelSelector = labels.SelectorFromSet(svc.Spec.Selector).String()
893
})
894
if err != nil {
895
return nil, err
896
}
897
if len(pods.Items) == 0 {
898
return nil, xerrors.Errorf("no pods for service %s found", svc.Name)
899
}
900
var pod *corev1.Pod
901
for _, p := range pods.Items {
902
if p.Spec.NodeName == "" {
903
// no node means the pod can't be ready
904
continue
905
}
906
var isReady bool
907
for _, cond := range p.Status.Conditions {
908
if cond.Type == corev1.PodReady {
909
isReady = cond.Status == corev1.ConditionTrue
910
break
911
}
912
}
913
if !isReady {
914
continue
915
}
916
917
pod = &p
918
break
919
}
920
if pod == nil {
921
return nil, xerrors.Errorf("no active pod for service %s found", svc.Name)
922
}
923
924
localPort, err := getFreePort()
925
if err != nil {
926
return nil, err
927
}
928
config.Port = int32(localPort)
929
config.ForwardPort = &ForwardPort{
930
RemotePort: remotePort,
931
PodName: pod.Name,
932
}
933
config.Host = "127.0.0.1"
934
935
return config, nil
936
}
937
938
func FindDBConfigFromPodEnv(componentName string, namespace string, client klient.Client) (*DBConfig, error) {
939
lblSelector := fmt.Sprintf("component=%s", componentName)
940
var list corev1.PodList
941
err := client.Resources(namespace).List(context.Background(), &list, func(opts *metav1.ListOptions) {
942
opts.LabelSelector = lblSelector
943
})
944
if err != nil {
945
return nil, err
946
}
947
if len(list.Items) == 0 {
948
return nil, xerrors.Errorf("no pods found for: %s", lblSelector)
949
}
950
pod := list.Items[0]
951
952
var password, host string
953
var dbEncryptionKeys *EncryptionKey
954
var port int32
955
OuterLoop:
956
for _, c := range pod.Spec.Containers {
957
for _, v := range c.Env {
958
var findErr error
959
if v.Name == "DB_PASSWORD" {
960
password, findErr = FindValueFromEnvVar(v, client, namespace)
961
if findErr != nil {
962
return nil, findErr
963
}
964
} else if v.Name == "DB_ENCRYPTION_KEYS" {
965
raw, findErr := FindValueFromEnvVar(v, client, namespace)
966
if findErr != nil {
967
return nil, findErr
968
}
969
970
var k []struct {
971
Name string `json:"name"`
972
Version int `json:"version"`
973
Material []byte `json:"material"`
974
}
975
err = json.Unmarshal([]byte(raw), &k)
976
if err != nil {
977
return nil, err
978
}
979
if len(k) > 0 {
980
dbEncryptionKeys = &EncryptionKey{
981
Metadata: EncryptionKeyMetadata{
982
Name: k[0].Name,
983
Version: k[0].Version,
984
},
985
Material: k[0].Material,
986
}
987
}
988
} else if v.Name == "DB_PORT" {
989
var portStr string
990
portStr, findErr = FindValueFromEnvVar(v, client, namespace)
991
if findErr != nil {
992
return nil, findErr
993
}
994
pPort, err := strconv.ParseUint(portStr, 10, 16)
995
if err != nil {
996
return nil, xerrors.Errorf("error parsing DB_PORT '%s' on pod %s!", v.Value, pod.Name)
997
}
998
port = int32(pPort)
999
} else if v.Name == "DB_HOST" {
1000
host, findErr = FindValueFromEnvVar(v, client, namespace)
1001
if findErr != nil {
1002
return nil, findErr
1003
}
1004
}
1005
if password != "" && port != 0 && host != "" && dbEncryptionKeys != nil {
1006
break OuterLoop
1007
}
1008
}
1009
}
1010
if password == "" || port == 0 || host == "" || dbEncryptionKeys == nil {
1011
return nil, xerrors.Errorf("could not find complete DBConfig on pod %s!", pod.Name)
1012
}
1013
config := DBConfig{
1014
Host: host,
1015
Port: port,
1016
Password: password,
1017
EncryptionKeys: *dbEncryptionKeys,
1018
}
1019
return &config, nil
1020
}
1021
1022
func FindValueFromEnvVar(ev corev1.EnvVar, client klient.Client, namespace string) (string, error) {
1023
// we have a value, just return it
1024
if ev.Value != "" {
1025
return ev.Value, nil
1026
}
1027
1028
if ev.ValueFrom == nil {
1029
return "", xerrors.Errorf("Neither Value or ValueFrom exist for %s", ev.Name)
1030
}
1031
1032
// value doesn't exist for ENV VARs set by config or secret
1033
// instead, valueFrom will contain a reference to the backing config or secret
1034
// secret references look like:
1035
// '{"name":"DB_PORT","valueFrom":{"secretKeyRef":{"name":"mysql","key":"port"}}}'
1036
if ev.ValueFrom.SecretKeyRef != nil {
1037
ctx, cancel := context.WithCancel(context.Background())
1038
defer cancel()
1039
1040
var secret corev1.Secret
1041
secretRef := ev.ValueFrom.SecretKeyRef
1042
err := client.Resources().Get(ctx, secretRef.Name, namespace, &secret)
1043
if err != nil {
1044
return "", err
1045
}
1046
1047
secretValue := string(secret.Data[secretRef.Key])
1048
return secretValue, nil
1049
} else {
1050
return "", xerrors.Errorf("A secret reference was expected for %s", ev.Name)
1051
}
1052
}
1053
1054
// APIImageBuilderOpt configures the image builder API access
1055
type APIImageBuilderOpt func(*apiImageBuilderOpts)
1056
1057
type apiImageBuilderOpts struct {
1058
SelectMK3 bool
1059
}
1060
1061
// ImageBuilder provides access to the image builder service.
1062
func (c *ComponentAPI) ImageBuilder(opts ...APIImageBuilderOpt) (imgbldr.ImageBuilderClient, error) {
1063
var cfg apiImageBuilderOpts
1064
for _, o := range opts {
1065
o(&cfg)
1066
}
1067
1068
if c.imgbldStatus.Client != nil {
1069
return c.imgbldStatus.Client, nil
1070
}
1071
1072
imgbuilder := ComponentImageBuilderMK3
1073
err := func() error {
1074
if c.imgbldStatus.Port == 0 {
1075
c.imgbldStatusMu.Lock()
1076
defer c.imgbldStatusMu.Unlock()
1077
1078
pod, _, err := selectPod(imgbuilder, selectPodOptions{}, c.namespace, c.client)
1079
if err != nil {
1080
return err
1081
}
1082
1083
localPort, err := getFreePort()
1084
if err != nil {
1085
return err
1086
}
1087
1088
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1089
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, pod, localPort, 8080)
1090
if err != nil {
1091
cancel()
1092
return err
1093
}
1094
c.appendCloser(func() error { cancel(); return nil })
1095
c.imgbldStatus.Port = localPort
1096
}
1097
1098
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", c.imgbldStatus.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
1099
if err != nil {
1100
return err
1101
}
1102
c.appendCloser(conn.Close)
1103
1104
c.imgbldStatus.Client = imgbldr.NewImageBuilderClient(conn)
1105
return nil
1106
}()
1107
if err != nil {
1108
return nil, err
1109
}
1110
1111
return c.imgbldStatus.Client, nil
1112
}
1113
1114
func (c *ComponentAPI) ClearImageBuilderClientCache() {
1115
c.imgbldStatus.Client = nil
1116
c.imgbldStatus.Port = 0
1117
}
1118
1119
// ContentService groups content service interfaces for convenience
1120
type ContentService interface {
1121
csapi.ContentServiceClient
1122
csapi.WorkspaceServiceClient
1123
csapi.HeadlessLogServiceClient
1124
}
1125
1126
func (c *ComponentAPI) ContentService() (ContentService, error) {
1127
if c.contentServiceStatus.ContentService != nil {
1128
return c.contentServiceStatus.ContentService, nil
1129
}
1130
if c.contentServiceStatus.Port == 0 {
1131
pod, _, err := selectPod(ComponentContentService, selectPodOptions{}, c.namespace, c.client)
1132
if err != nil {
1133
return nil, err
1134
}
1135
1136
localPort, err := getFreePort()
1137
if err != nil {
1138
return nil, err
1139
}
1140
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
1141
err = c.portFwdWithRetry(ctx, common.ForwardPortOfPod, pod, localPort, 8080)
1142
if err != nil {
1143
cancel()
1144
return nil, err
1145
}
1146
c.appendCloser(func() error { cancel(); return nil })
1147
c.contentServiceStatus.Port = localPort
1148
}
1149
1150
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", c.contentServiceStatus.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
1151
if err != nil {
1152
return nil, err
1153
}
1154
c.appendCloser(conn.Close)
1155
1156
type cs struct {
1157
csapi.ContentServiceClient
1158
csapi.WorkspaceServiceClient
1159
csapi.HeadlessLogServiceClient
1160
}
1161
1162
c.contentServiceStatus.ContentService = cs{
1163
ContentServiceClient: csapi.NewContentServiceClient(conn),
1164
WorkspaceServiceClient: csapi.NewWorkspaceServiceClient(conn),
1165
HeadlessLogServiceClient: csapi.NewHeadlessLogServiceClient(conn),
1166
}
1167
1168
return c.contentServiceStatus.ContentService, nil
1169
}
1170
1171
func (c *ComponentAPI) ClearContentServiceClientCache() {
1172
c.contentServiceStatus.ContentService = nil
1173
c.contentServiceStatus.Port = 0
1174
}
1175
1176
func (c *ComponentAPI) Done(t *testing.T) {
1177
// Much "defer", we run the closer in reversed order. This way, we can
1178
// append to this list quite naturally, and still break things down in
1179
// the correct order.
1180
for i := len(c.closer) - 1; i >= 0; i-- {
1181
err := c.closer[i]()
1182
if err != nil {
1183
t.Logf("cleanup failed: %q", err)
1184
}
1185
}
1186
1187
if t.Failed() {
1188
// Log preview env status when test fails to help debug the failure.
1189
ready, reason, err := isPreviewReady(c.client, c.namespace)
1190
if err != nil {
1191
t.Logf("failed to check preview status: %q", err)
1192
} else {
1193
t.Logf("preview status: ready=%v, reason=%s", ready, reason)
1194
}
1195
logGitpodStatus(t, c.client, c.namespace)
1196
}
1197
}
1198
1199
func (c *ComponentAPI) appendCloser(closer func() error) {
1200
c.closerMutex.Lock()
1201
defer c.closerMutex.Unlock()
1202
c.closer = append(c.closer, closer)
1203
}
1204
1205
type portFwdFunc = func(ctx context.Context, kubeconfig string, namespace, name, port string) (chan struct{}, chan error)
1206
1207
func (c *ComponentAPI) portFwdWithRetry(ctx context.Context, portFwdF portFwdFunc, serviceName string, localPort int, targetPort int) error {
1208
for {
1209
ready, errc := portFwdF(ctx, c.kubeconfig, c.namespace, serviceName, fmt.Sprintf("%d:%d", localPort, targetPort))
1210
select {
1211
case err := <-errc:
1212
if err == io.EOF {
1213
time.Sleep(10 * time.Second)
1214
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
1215
time.Sleep(10 * time.Second)
1216
} else {
1217
return err
1218
}
1219
case <-ready:
1220
return nil
1221
}
1222
}
1223
}
1224
1225
// RestartDeployment rollout restart the deployment by updating the
1226
// spec.template.metadata.annotations["kubectl.kubernetes.io/restartedAt"] = time.Now()
1227
func (c *ComponentAPI) RestartDeployment(deployName, namespace string, wait bool) error {
1228
var deploy appsv1.Deployment
1229
if err := c.client.Resources().WithNamespace(namespace).Get(context.Background(), deployName, namespace, &deploy); err != nil {
1230
return err
1231
}
1232
1233
patchData := map[string]interface{}{
1234
"spec": map[string]interface{}{
1235
"template": map[string]interface{}{
1236
"metadata": map[string]interface{}{
1237
"annotations": map[string]interface{}{
1238
"kubectl.kubernetes.io/restartedAt": time.Now().Format(time.Stamp),
1239
},
1240
},
1241
},
1242
},
1243
}
1244
1245
encodedPatchData, err := json.Marshal(patchData)
1246
if err != nil {
1247
return err
1248
}
1249
1250
if err := c.client.Resources().WithNamespace(namespace).Patch(context.Background(), &deploy, k8s.Patch{PatchType: types.MergePatchType, Data: encodedPatchData}); err != nil {
1251
return err
1252
}
1253
1254
if !wait {
1255
return nil
1256
}
1257
1258
// waits for the deployment rollout status, maximum to one minute
1259
for i := 0; i < 10; i++ {
1260
if err := c.client.Resources().WithNamespace(namespace).Get(context.Background(), deployName, namespace, &deploy); err != nil {
1261
return err
1262
}
1263
if deploy.Status.UnavailableReplicas == 0 {
1264
break
1265
}
1266
time.Sleep(6 * time.Second)
1267
}
1268
return nil
1269
}
1270
1271