Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/dispatch/dispatch.go
2501 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package dispatch
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"sync"
12
"time"
13
14
"github.com/sirupsen/logrus"
15
"golang.org/x/xerrors"
16
corev1 "k8s.io/api/core/v1"
17
"k8s.io/client-go/informers"
18
"k8s.io/client-go/kubernetes"
19
"k8s.io/client-go/tools/cache"
20
21
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
22
"github.com/gitpod-io/gitpod/common-go/log"
23
"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
24
)
25
26
const (
27
podInformerInitialSyncTimeout = 60 * time.Second
28
podInformerResyncInterval = 30 * time.Second
29
)
30
31
// Workspace represents all the info we have about a workspace
32
type Workspace struct {
33
ContainerID container.ID
34
WorkspaceID string
35
InstanceID string
36
Pod *corev1.Pod
37
}
38
39
// OWI returns the owner/workspace/instance tripple used for logging
40
func (w Workspace) OWI() logrus.Fields {
41
return log.OWI("", w.WorkspaceID, w.InstanceID)
42
}
43
44
// Listener get called when a new workspace appears, or an existing one is updated.
45
type Listener interface {
46
WorkspaceAdded(ctx context.Context, ws *Workspace) error
47
}
48
49
// UpdateListener gets called when a workspace pod is updated
50
type UpdateListener interface {
51
WorkspaceUpdated(ctx context.Context, ws *Workspace) error
52
}
53
54
// NewDispatch starts a new workspace dispatch
55
func NewDispatch(runtime container.Runtime, kubernetes kubernetes.Interface, k8sNamespace, nodename string, listener ...Listener) (*Dispatch, error) {
56
d := &Dispatch{
57
Runtime: runtime,
58
Kubernetes: kubernetes,
59
KubernetesNamespace: k8sNamespace,
60
Listener: listener,
61
NodeName: nodename,
62
63
ctxs: make(map[string]*workspaceState),
64
disposedCtxs: make(map[string]struct{}),
65
}
66
67
return d, nil
68
}
69
70
// Dispatch starts tasks when a new workspace appears, and cancels the corresponding
71
// context when the workspace goes away. If the dispatch is closed, all active contexts
72
// will be canceled, too.
73
type Dispatch struct {
74
Runtime container.Runtime
75
Kubernetes kubernetes.Interface
76
KubernetesNamespace string
77
NodeName string
78
79
Listener []Listener
80
81
stopchan chan struct{}
82
ctxs map[string]*workspaceState
83
disposedCtxs map[string]struct{}
84
mu sync.Mutex
85
}
86
87
type workspaceState struct {
88
WorkspaceAdded bool
89
Context context.Context
90
Cancel context.CancelFunc
91
Workspace *Workspace
92
93
// this WaitGroup keeps track of when each handler is finished. It's only relied upon in DisposeWorkspace() to determine when work on a given instanceID has commenced.
94
HandlerWaitGroup sync.WaitGroup
95
}
96
97
type contextKey struct{}
98
99
var (
100
contextDispatch = contextKey{}
101
)
102
103
// GetFromContext retrieves the issuing dispatch from the listener context
104
func GetFromContext(ctx context.Context) *Dispatch {
105
return ctx.Value(contextDispatch).(*Dispatch)
106
}
107
108
type dispatchHandlerWaitGroupKey struct{}
109
110
var (
111
contextDispatchWaitGroup = dispatchHandlerWaitGroupKey{}
112
)
113
114
func GetDispatchWaitGroup(ctx context.Context) *sync.WaitGroup {
115
return ctx.Value(contextDispatchWaitGroup).(*sync.WaitGroup)
116
}
117
118
// Start starts the dispatch
119
func (d *Dispatch) Start() error {
120
ifac := informers.NewSharedInformerFactoryWithOptions(d.Kubernetes, podInformerResyncInterval, informers.WithNamespace(d.KubernetesNamespace))
121
podInformer := ifac.Core().V1().Pods().Informer()
122
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
123
UpdateFunc: func(oldObj, newObj interface{}) {
124
oldPod, ok := oldObj.(*corev1.Pod)
125
if !ok {
126
return
127
}
128
newPod, ok := newObj.(*corev1.Pod)
129
if !ok {
130
return
131
}
132
133
d.handlePodUpdate(oldPod, newPod)
134
},
135
DeleteFunc: func(obj interface{}) {
136
pod, ok := obj.(*corev1.Pod)
137
if !ok {
138
return
139
}
140
141
d.handlePodDeleted(pod)
142
},
143
})
144
145
var synchan chan bool
146
d.stopchan, synchan = make(chan struct{}), make(chan bool)
147
go podInformer.Run(d.stopchan)
148
go func() {
149
synchan <- cache.WaitForCacheSync(d.stopchan, podInformer.HasSynced)
150
}()
151
select {
152
case <-time.After(podInformerInitialSyncTimeout):
153
return xerrors.Errorf("pod informer did not sync in time")
154
case ok := <-synchan:
155
if !ok {
156
return xerrors.Errorf("pod informer did not sync")
157
}
158
}
159
return nil
160
}
161
162
// Close stops the dispatch and cancels all previously started listener
163
func (d *Dispatch) Close() error {
164
d.mu.Lock()
165
defer d.mu.Unlock()
166
167
close(d.stopchan)
168
for _, state := range d.ctxs {
169
if state != nil && state.Cancel != nil {
170
state.Cancel()
171
}
172
}
173
174
d.ctxs = make(map[string]*workspaceState)
175
176
return nil
177
}
178
179
// WorkspaceExistsOnNode returns true if there is a workspace pod on this node and this
180
// dispatch knows about it.
181
func (d *Dispatch) WorkspaceExistsOnNode(instanceID string) (ok bool) {
182
d.mu.Lock()
183
defer d.mu.Unlock()
184
185
_, ok = d.ctxs[instanceID]
186
return
187
}
188
189
// DisposeWorkspace disposes the workspace incl. all running handler code for that pod
190
func (d *Dispatch) DisposeWorkspace(ctx context.Context, instanceID string) {
191
d.mu.Lock()
192
defer d.mu.Unlock()
193
194
log.WithField("instanceID", instanceID).Debug("disposing workspace")
195
defer log.WithField("instanceID", instanceID).Debug("disposing workspace done")
196
197
// Make the runtome drop all state it might still have about this workspace
198
d.Runtime.DisposeContainer(ctx, instanceID)
199
200
// If we have that instanceID present, cancel it's context
201
state, present := d.ctxs[instanceID]
202
if !present {
203
return
204
}
205
if state.Cancel != nil {
206
state.Cancel()
207
}
208
209
// ...and wait for all long-running/async processes/go-routines to finish
210
state.HandlerWaitGroup.Wait()
211
212
// Mark as disposed, so we do not handle any further updates for it (except deletion)
213
d.disposedCtxs[disposedKey(instanceID, state.Workspace.Pod)] = struct{}{}
214
215
delete(d.ctxs, instanceID)
216
}
217
218
func disposedKey(instanceID string, pod *corev1.Pod) string {
219
return fmt.Sprintf("%s-%s", instanceID, pod.CreationTimestamp.String())
220
}
221
222
func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {
223
workspaceID, ok := newPod.Labels[wsk8s.MetaIDLabel]
224
if !ok {
225
return
226
}
227
workspaceInstanceID, ok := newPod.Labels[wsk8s.WorkspaceIDLabel]
228
if !ok {
229
return
230
}
231
if d.NodeName != "" && newPod.Spec.NodeName != d.NodeName {
232
return
233
}
234
disposedKey := disposedKey(workspaceInstanceID, newPod)
235
if _, alreadyDisposed := d.disposedCtxs[disposedKey]; alreadyDisposed {
236
log.WithField("disposedKey", disposedKey).Debug("dropping pod update for disposed pod")
237
return
238
}
239
240
d.mu.Lock()
241
defer d.mu.Unlock()
242
243
state, ok := d.ctxs[workspaceInstanceID]
244
if !ok {
245
// we haven't seen this pod before - add it, and wait for the container
246
owi := wsk8s.GetOWIFromObject(&newPod.ObjectMeta)
247
s := &workspaceState{
248
WorkspaceAdded: false,
249
Workspace: &Workspace{
250
InstanceID: workspaceInstanceID,
251
WorkspaceID: workspaceID,
252
Pod: newPod,
253
},
254
}
255
d.ctxs[workspaceInstanceID] = s
256
257
containerCtx, containerCtxCancel := context.WithCancel(context.Background())
258
containerCtx = context.WithValue(containerCtx, contextDispatch, d)
259
containerCtx = context.WithValue(containerCtx, contextDispatchWaitGroup, &s.HandlerWaitGroup)
260
// Important!!!!: ideally this timeout must be equal to ws-manager https://github.com/gitpod-io/gitpod/blob/main/components/ws-manager/pkg/manager/manager.go#L171
261
waitForPodCtx, cancel := context.WithTimeout(containerCtx, 10*time.Minute)
262
go func() {
263
containerID, err := d.Runtime.WaitForContainer(waitForPodCtx, workspaceInstanceID)
264
if err != nil && err != context.Canceled {
265
log.WithError(err).WithFields(owi).Warn("cannot wait for container")
266
}
267
log.WithFields(owi).WithField("container", containerID).Debug("dispatch found new workspace container")
268
269
d.mu.Lock()
270
s := d.ctxs[workspaceInstanceID]
271
if s == nil {
272
log.WithFields(owi).Error("pod disappeared from dispatch state before container was ready")
273
d.mu.Unlock()
274
return
275
}
276
// Only register with the WaitGroup _after_ acquiring the lock to avoid DeadLocks
277
s.HandlerWaitGroup.Add(1)
278
defer s.HandlerWaitGroup.Done()
279
280
s.Context = containerCtx
281
s.Cancel = sync.OnceFunc(containerCtxCancel)
282
s.Workspace.ContainerID = containerID
283
284
for _, l := range d.Listener {
285
s.HandlerWaitGroup.Add(1)
286
go func(listener Listener) {
287
defer s.HandlerWaitGroup.Done()
288
289
err := listener.WorkspaceAdded(containerCtx, s.Workspace)
290
if err != nil {
291
log.WithError(err).WithFields(owi).Error("dispatch listener failed")
292
}
293
}(l)
294
}
295
296
s.WorkspaceAdded = true
297
d.mu.Unlock()
298
}()
299
go func() {
300
// no matter if the container was deleted or not - we've lost our guard that was waiting for that to happen.
301
// Hence, we must stop listening for it to come into existence and cancel the context.
302
err := d.Runtime.WaitForContainerStop(context.Background(), workspaceInstanceID)
303
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
304
log.WithError(err).WithFields(owi).Error("unexpected waiting for container to stop")
305
}
306
307
cancel()
308
}()
309
310
return
311
}
312
313
if !state.WorkspaceAdded {
314
return
315
}
316
317
state.Workspace.Pod = newPod
318
319
for _, l := range d.Listener {
320
lu, ok := l.(UpdateListener)
321
if !ok {
322
continue
323
}
324
325
state.HandlerWaitGroup.Add(1)
326
go func() {
327
defer state.HandlerWaitGroup.Done()
328
329
err := lu.WorkspaceUpdated(state.Context, state.Workspace)
330
if err != nil {
331
log.WithError(err).WithFields(wsk8s.GetOWIFromObject(&oldPod.ObjectMeta)).Error("dispatch listener failed")
332
}
333
}()
334
}
335
}
336
337
func (d *Dispatch) handlePodDeleted(pod *corev1.Pod) {
338
instanceID, ok := pod.Labels[wsk8s.WorkspaceIDLabel]
339
if !ok {
340
return
341
}
342
log.WithField("instanceID", instanceID).Debug("pod deleted")
343
defer log.WithField("instanceID", instanceID).Debug("pod deleted done")
344
345
d.mu.Lock()
346
defer d.mu.Unlock()
347
348
state, ok := d.ctxs[instanceID]
349
if !ok {
350
log.WithFields(wsk8s.GetOWIFromObject(&pod.ObjectMeta)).Debug("received pod deletion for a workspace, but have not seen it before. Probably another node. Ignoring update.")
351
return
352
}
353
if state.Cancel != nil {
354
state.Cancel()
355
}
356
357
delete(d.ctxs, instanceID)
358
359
}
360
361