Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-proxy/pkg/proxy/infoprovider.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package proxy
6
7
import (
8
"context"
9
"net/url"
10
"sort"
11
"strconv"
12
13
"golang.org/x/xerrors"
14
"k8s.io/apimachinery/pkg/api/errors"
15
"k8s.io/apimachinery/pkg/runtime"
16
"k8s.io/apimachinery/pkg/util/uuid"
17
"k8s.io/client-go/tools/cache"
18
ctrl "sigs.k8s.io/controller-runtime"
19
"sigs.k8s.io/controller-runtime/pkg/client"
20
"sigs.k8s.io/controller-runtime/pkg/predicate"
21
"sigs.k8s.io/controller-runtime/pkg/reconcile"
22
23
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
24
"github.com/gitpod-io/gitpod/common-go/log"
25
"github.com/gitpod-io/gitpod/ws-manager/api"
26
wsapi "github.com/gitpod-io/gitpod/ws-manager/api"
27
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
28
"github.com/gitpod-io/gitpod/ws-proxy/pkg/common"
29
)
30
31
const (
32
workspaceIndex = "workspaceIndex"
33
ipAddressIndex = "ipAddressIndex"
34
)
35
36
// getPortStr extracts the port part from a given URL string. Returns "" if parsing fails or port is not specified.
37
func getPortStr(urlStr string) string {
38
portURL, err := url.Parse(urlStr)
39
if err != nil {
40
log.WithField("url", urlStr).WithError(err).Error("error parsing URL while getting URL port")
41
return ""
42
}
43
if portURL.Port() == "" {
44
switch scheme := portURL.Scheme; scheme {
45
case "http":
46
return "80"
47
case "https":
48
return "443"
49
}
50
}
51
52
return portURL.Port()
53
}
54
55
type ConnectionContext struct {
56
WorkspaceID string
57
Port string
58
UUID string
59
CancelFunc context.CancelCauseFunc
60
}
61
62
type CRDWorkspaceInfoProvider struct {
63
client.Client
64
Scheme *runtime.Scheme
65
66
store cache.ThreadSafeStore
67
contextStore cache.ThreadSafeStore
68
}
69
70
// NewCRDWorkspaceInfoProvider creates a fresh WorkspaceInfoProvider.
71
func NewCRDWorkspaceInfoProvider(client client.Client, scheme *runtime.Scheme) (*CRDWorkspaceInfoProvider, error) {
72
// create custom indexer for searches
73
indexers := cache.Indexers{
74
workspaceIndex: func(obj interface{}) ([]string, error) {
75
if workspaceInfo, ok := obj.(*common.WorkspaceInfo); ok {
76
return []string{workspaceInfo.WorkspaceID}, nil
77
}
78
79
return nil, xerrors.Errorf("object is not a WorkspaceInfo")
80
},
81
ipAddressIndex: func(obj interface{}) ([]string, error) {
82
if workspaceInfo, ok := obj.(*common.WorkspaceInfo); ok {
83
if workspaceInfo.IPAddress == "" {
84
return nil, nil
85
}
86
return []string{workspaceInfo.IPAddress}, nil
87
}
88
return nil, xerrors.Errorf("object is not a WorkspaceInfo")
89
},
90
}
91
contextIndexers := cache.Indexers{
92
workspaceIndex: func(obj interface{}) ([]string, error) {
93
if connCtx, ok := obj.(*ConnectionContext); ok {
94
return []string{connCtx.WorkspaceID}, nil
95
}
96
return nil, xerrors.Errorf("object is not a ConnectionContext")
97
},
98
}
99
100
return &CRDWorkspaceInfoProvider{
101
Client: client,
102
Scheme: scheme,
103
104
store: cache.NewThreadSafeStore(indexers, cache.Indices{}),
105
contextStore: cache.NewThreadSafeStore(contextIndexers, cache.Indices{}),
106
}, nil
107
}
108
109
// WorkspaceInfo returns the WorkspaceInfo for the given workspaceID.
110
// It performs validation to ensure the workspace is unique and properly associated with its IP address.
111
func (r *CRDWorkspaceInfoProvider) WorkspaceInfo(workspaceID string) *common.WorkspaceInfo {
112
workspaces, err := r.store.ByIndex(workspaceIndex, workspaceID)
113
if err != nil {
114
return nil
115
}
116
117
if len(workspaces) == 0 {
118
return nil
119
}
120
121
if len(workspaces) > 1 {
122
log.WithField("workspaceID", workspaceID).WithField("instanceCount", len(workspaces)).Warn("multiple workspace instances found")
123
}
124
125
sort.Slice(workspaces, func(i, j int) bool {
126
a := workspaces[i].(*common.WorkspaceInfo)
127
b := workspaces[j].(*common.WorkspaceInfo)
128
return a.StartedAt.After(b.StartedAt)
129
})
130
131
wsInfo := workspaces[0].(*common.WorkspaceInfo)
132
133
if wsInfo.IPAddress == "" {
134
return wsInfo
135
}
136
137
if conflict, err := r.validateIPAddressConflict(workspaceID, wsInfo.IPAddress); conflict || err != nil {
138
return nil
139
}
140
141
return wsInfo
142
}
143
144
func (r *CRDWorkspaceInfoProvider) validateIPAddressConflict(workspaceID, ipAddress string) (bool, error) {
145
wsInfos, err := r.workspacesInfoByIPAddress(ipAddress)
146
if err != nil {
147
log.WithError(err).WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).Error("failed to get workspaces by IP address")
148
return true, err
149
}
150
151
if len(wsInfos) > 1 {
152
log.WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).WithField("workspaceCount", len(wsInfos)).Warn("multiple workspaces found for IP address")
153
return true, nil
154
}
155
156
if len(wsInfos) == 1 && wsInfos[0].WorkspaceID != workspaceID {
157
log.WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).WithField("foundWorkspaceID", wsInfos[0].WorkspaceID).Warn("workspace IP address conflict detected")
158
return true, nil
159
}
160
161
return false, nil
162
}
163
164
func (r *CRDWorkspaceInfoProvider) workspacesInfoByIPAddress(ipAddress string) ([]*common.WorkspaceInfo, error) {
165
workspaces := make([]*common.WorkspaceInfo, 0)
166
167
objs, err := r.store.ByIndex(ipAddressIndex, ipAddress)
168
if err != nil {
169
return nil, err
170
}
171
for _, w := range objs {
172
workspaces = append(workspaces, w.(*common.WorkspaceInfo))
173
}
174
175
return workspaces, nil
176
}
177
178
func (r *CRDWorkspaceInfoProvider) AcquireContext(ctx context.Context, workspaceID string, port string) (context.Context, string, error) {
179
ws := r.WorkspaceInfo(workspaceID)
180
if ws == nil {
181
return ctx, "", xerrors.Errorf("workspace %s not found", workspaceID)
182
}
183
id := string(uuid.NewUUID())
184
ctx, cancel := context.WithCancelCause(ctx)
185
connCtx := &ConnectionContext{
186
WorkspaceID: workspaceID,
187
Port: port,
188
CancelFunc: cancel,
189
UUID: id,
190
}
191
192
r.contextStore.Add(id, connCtx)
193
return ctx, id, nil
194
}
195
196
func (r *CRDWorkspaceInfoProvider) ReleaseContext(id string) {
197
r.contextStore.Delete(id)
198
}
199
200
func (r *CRDWorkspaceInfoProvider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
201
var ws workspacev1.Workspace
202
err := r.Client.Get(context.Background(), req.NamespacedName, &ws)
203
if errors.IsNotFound(err) {
204
// workspace is gone - that's ok
205
r.store.Delete(req.Name)
206
log.WithField("workspacepod", req.Name).Debug("removing workspace from store")
207
208
return reconcile.Result{}, nil
209
}
210
211
var podIP string
212
if ws.Status.Runtime != nil {
213
podIP = ws.Status.Runtime.PodIP
214
}
215
216
ports := make([]*wsapi.PortSpec, 0, len(ws.Spec.Ports))
217
for _, p := range ws.Spec.Ports {
218
v := wsapi.PortVisibility_PORT_VISIBILITY_PRIVATE
219
protocol := wsapi.PortProtocol_PORT_PROTOCOL_HTTP
220
if p.Visibility == workspacev1.AdmissionLevelEveryone {
221
v = wsapi.PortVisibility_PORT_VISIBILITY_PUBLIC
222
}
223
if p.Protocol == workspacev1.PortProtocolHttps {
224
protocol = wsapi.PortProtocol_PORT_PROTOCOL_HTTPS
225
}
226
ports = append(ports, &wsapi.PortSpec{
227
Port: p.Port,
228
Visibility: v,
229
Protocol: protocol,
230
})
231
}
232
233
admission := wsapi.AdmissionLevel_ADMIT_OWNER_ONLY
234
if ws.Spec.Admission.Level == workspacev1.AdmissionLevelEveryone {
235
admission = wsapi.AdmissionLevel_ADMIT_EVERYONE
236
}
237
managedByMk2 := true
238
if managedBy, ok := ws.Labels[wsk8s.WorkspaceManagedByLabel]; ok && managedBy != "ws-manager-mk2" {
239
managedByMk2 = false
240
}
241
242
wsinfo := &common.WorkspaceInfo{
243
WorkspaceID: ws.Spec.Ownership.WorkspaceID,
244
InstanceID: ws.Name,
245
URL: ws.Status.URL,
246
IDEImage: ws.Spec.Image.IDE.Web,
247
SupervisorImage: ws.Spec.Image.IDE.Supervisor,
248
IDEPublicPort: getPortStr(ws.Status.URL),
249
IPAddress: podIP,
250
Ports: ports,
251
Auth: &wsapi.WorkspaceAuthentication{Admission: admission, OwnerToken: ws.Status.OwnerToken},
252
StartedAt: ws.CreationTimestamp.Time,
253
OwnerUserId: ws.Spec.Ownership.Owner,
254
SSHPublicKeys: ws.Spec.SshPublicKeys,
255
IsRunning: ws.Status.Phase == workspacev1.WorkspacePhaseRunning,
256
IsEnabledSSHCA: ws.Spec.SSHGatewayCAPublicKey != "",
257
IsManagedByMk2: managedByMk2,
258
}
259
260
r.store.Update(req.Name, wsinfo)
261
r.invalidateConnectionContext(wsinfo)
262
log.WithField("workspace", req.Name).WithField("details", wsinfo).Debug("adding/updating workspace details")
263
264
return ctrl.Result{}, nil
265
}
266
267
func (r *CRDWorkspaceInfoProvider) invalidateConnectionContext(ws *common.WorkspaceInfo) {
268
connCtxs, err := r.contextStore.ByIndex(workspaceIndex, ws.WorkspaceID)
269
if err != nil {
270
return
271
}
272
if len(connCtxs) == 0 {
273
return
274
}
275
276
if ws.Auth != nil && ws.Auth.Admission == wsapi.AdmissionLevel_ADMIT_EVERYONE {
277
return
278
}
279
publicPorts := make(map[string]struct{})
280
for _, p := range ws.Ports {
281
if p.Visibility == api.PortVisibility_PORT_VISIBILITY_PUBLIC {
282
publicPorts[strconv.FormatUint(uint64(p.Port), 10)] = struct{}{}
283
}
284
}
285
286
for _, _connCtx := range connCtxs {
287
connCtx, ok := _connCtx.(*ConnectionContext)
288
if !ok {
289
continue
290
}
291
if _, ok := publicPorts[connCtx.Port]; ok {
292
continue
293
}
294
connCtx.CancelFunc(xerrors.Errorf("workspace %s is no longer public", ws.WorkspaceID))
295
r.contextStore.Delete(connCtx.UUID)
296
}
297
}
298
299
// SetupWithManager sets up the controller with the Manager.
300
func (r *CRDWorkspaceInfoProvider) SetupWithManager(mgr ctrl.Manager) error {
301
return ctrl.NewControllerManagedBy(mgr).
302
Named("workspacecrd").
303
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
304
For(
305
&workspacev1.Workspace{},
306
).
307
Complete(r)
308
}
309
310