Path: blob/main/components/ws-daemon/pkg/dispatch/dispatch.go
2501 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package dispatch56import (7"context"8"errors"9"fmt"10"sync"11"time"1213"github.com/sirupsen/logrus"14"golang.org/x/xerrors"15corev1 "k8s.io/api/core/v1"16"k8s.io/client-go/informers"17"k8s.io/client-go/kubernetes"18"k8s.io/client-go/tools/cache"1920wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"21"github.com/gitpod-io/gitpod/common-go/log"22"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"23)2425const (26podInformerInitialSyncTimeout = 60 * time.Second27podInformerResyncInterval = 30 * time.Second28)2930// Workspace represents all the info we have about a workspace31type Workspace struct {32ContainerID container.ID33WorkspaceID string34InstanceID string35Pod *corev1.Pod36}3738// OWI returns the owner/workspace/instance tripple used for logging39func (w Workspace) OWI() logrus.Fields {40return log.OWI("", w.WorkspaceID, w.InstanceID)41}4243// Listener get called when a new workspace appears, or an existing one is updated.44type Listener interface {45WorkspaceAdded(ctx context.Context, ws *Workspace) error46}4748// UpdateListener gets called when a workspace pod is updated49type UpdateListener interface {50WorkspaceUpdated(ctx context.Context, ws *Workspace) error51}5253// NewDispatch starts a new workspace dispatch54func NewDispatch(runtime container.Runtime, kubernetes kubernetes.Interface, k8sNamespace, nodename string, listener ...Listener) (*Dispatch, error) {55d := &Dispatch{56Runtime: runtime,57Kubernetes: kubernetes,58KubernetesNamespace: k8sNamespace,59Listener: listener,60NodeName: nodename,6162ctxs: make(map[string]*workspaceState),63disposedCtxs: make(map[string]struct{}),64}6566return d, nil67}6869// Dispatch starts tasks when a new workspace appears, and cancels the corresponding70// context when the workspace goes away. If the dispatch is closed, all active contexts71// will be canceled, too.72type Dispatch struct {73Runtime container.Runtime74Kubernetes kubernetes.Interface75KubernetesNamespace string76NodeName string7778Listener []Listener7980stopchan chan struct{}81ctxs map[string]*workspaceState82disposedCtxs map[string]struct{}83mu sync.Mutex84}8586type workspaceState struct {87WorkspaceAdded bool88Context context.Context89Cancel context.CancelFunc90Workspace *Workspace9192// this WaitGroup keeps track of when each handler is finished. It's only relied upon in DisposeWorkspace() to determine when work on a given instanceID has commenced.93HandlerWaitGroup sync.WaitGroup94}9596type contextKey struct{}9798var (99contextDispatch = contextKey{}100)101102// GetFromContext retrieves the issuing dispatch from the listener context103func GetFromContext(ctx context.Context) *Dispatch {104return ctx.Value(contextDispatch).(*Dispatch)105}106107type dispatchHandlerWaitGroupKey struct{}108109var (110contextDispatchWaitGroup = dispatchHandlerWaitGroupKey{}111)112113func GetDispatchWaitGroup(ctx context.Context) *sync.WaitGroup {114return ctx.Value(contextDispatchWaitGroup).(*sync.WaitGroup)115}116117// Start starts the dispatch118func (d *Dispatch) Start() error {119ifac := informers.NewSharedInformerFactoryWithOptions(d.Kubernetes, podInformerResyncInterval, informers.WithNamespace(d.KubernetesNamespace))120podInformer := ifac.Core().V1().Pods().Informer()121podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{122UpdateFunc: func(oldObj, newObj interface{}) {123oldPod, ok := oldObj.(*corev1.Pod)124if !ok {125return126}127newPod, ok := newObj.(*corev1.Pod)128if !ok {129return130}131132d.handlePodUpdate(oldPod, newPod)133},134DeleteFunc: func(obj interface{}) {135pod, ok := obj.(*corev1.Pod)136if !ok {137return138}139140d.handlePodDeleted(pod)141},142})143144var synchan chan bool145d.stopchan, synchan = make(chan struct{}), make(chan bool)146go podInformer.Run(d.stopchan)147go func() {148synchan <- cache.WaitForCacheSync(d.stopchan, podInformer.HasSynced)149}()150select {151case <-time.After(podInformerInitialSyncTimeout):152return xerrors.Errorf("pod informer did not sync in time")153case ok := <-synchan:154if !ok {155return xerrors.Errorf("pod informer did not sync")156}157}158return nil159}160161// Close stops the dispatch and cancels all previously started listener162func (d *Dispatch) Close() error {163d.mu.Lock()164defer d.mu.Unlock()165166close(d.stopchan)167for _, state := range d.ctxs {168if state != nil && state.Cancel != nil {169state.Cancel()170}171}172173d.ctxs = make(map[string]*workspaceState)174175return nil176}177178// WorkspaceExistsOnNode returns true if there is a workspace pod on this node and this179// dispatch knows about it.180func (d *Dispatch) WorkspaceExistsOnNode(instanceID string) (ok bool) {181d.mu.Lock()182defer d.mu.Unlock()183184_, ok = d.ctxs[instanceID]185return186}187188// DisposeWorkspace disposes the workspace incl. all running handler code for that pod189func (d *Dispatch) DisposeWorkspace(ctx context.Context, instanceID string) {190d.mu.Lock()191defer d.mu.Unlock()192193log.WithField("instanceID", instanceID).Debug("disposing workspace")194defer log.WithField("instanceID", instanceID).Debug("disposing workspace done")195196// Make the runtome drop all state it might still have about this workspace197d.Runtime.DisposeContainer(ctx, instanceID)198199// If we have that instanceID present, cancel it's context200state, present := d.ctxs[instanceID]201if !present {202return203}204if state.Cancel != nil {205state.Cancel()206}207208// ...and wait for all long-running/async processes/go-routines to finish209state.HandlerWaitGroup.Wait()210211// Mark as disposed, so we do not handle any further updates for it (except deletion)212d.disposedCtxs[disposedKey(instanceID, state.Workspace.Pod)] = struct{}{}213214delete(d.ctxs, instanceID)215}216217func disposedKey(instanceID string, pod *corev1.Pod) string {218return fmt.Sprintf("%s-%s", instanceID, pod.CreationTimestamp.String())219}220221func (d *Dispatch) handlePodUpdate(oldPod, newPod *corev1.Pod) {222workspaceID, ok := newPod.Labels[wsk8s.MetaIDLabel]223if !ok {224return225}226workspaceInstanceID, ok := newPod.Labels[wsk8s.WorkspaceIDLabel]227if !ok {228return229}230if d.NodeName != "" && newPod.Spec.NodeName != d.NodeName {231return232}233disposedKey := disposedKey(workspaceInstanceID, newPod)234if _, alreadyDisposed := d.disposedCtxs[disposedKey]; alreadyDisposed {235log.WithField("disposedKey", disposedKey).Debug("dropping pod update for disposed pod")236return237}238239d.mu.Lock()240defer d.mu.Unlock()241242state, ok := d.ctxs[workspaceInstanceID]243if !ok {244// we haven't seen this pod before - add it, and wait for the container245owi := wsk8s.GetOWIFromObject(&newPod.ObjectMeta)246s := &workspaceState{247WorkspaceAdded: false,248Workspace: &Workspace{249InstanceID: workspaceInstanceID,250WorkspaceID: workspaceID,251Pod: newPod,252},253}254d.ctxs[workspaceInstanceID] = s255256containerCtx, containerCtxCancel := context.WithCancel(context.Background())257containerCtx = context.WithValue(containerCtx, contextDispatch, d)258containerCtx = context.WithValue(containerCtx, contextDispatchWaitGroup, &s.HandlerWaitGroup)259// Important!!!!: ideally this timeout must be equal to ws-manager https://github.com/gitpod-io/gitpod/blob/main/components/ws-manager/pkg/manager/manager.go#L171260waitForPodCtx, cancel := context.WithTimeout(containerCtx, 10*time.Minute)261go func() {262containerID, err := d.Runtime.WaitForContainer(waitForPodCtx, workspaceInstanceID)263if err != nil && err != context.Canceled {264log.WithError(err).WithFields(owi).Warn("cannot wait for container")265}266log.WithFields(owi).WithField("container", containerID).Debug("dispatch found new workspace container")267268d.mu.Lock()269s := d.ctxs[workspaceInstanceID]270if s == nil {271log.WithFields(owi).Error("pod disappeared from dispatch state before container was ready")272d.mu.Unlock()273return274}275// Only register with the WaitGroup _after_ acquiring the lock to avoid DeadLocks276s.HandlerWaitGroup.Add(1)277defer s.HandlerWaitGroup.Done()278279s.Context = containerCtx280s.Cancel = sync.OnceFunc(containerCtxCancel)281s.Workspace.ContainerID = containerID282283for _, l := range d.Listener {284s.HandlerWaitGroup.Add(1)285go func(listener Listener) {286defer s.HandlerWaitGroup.Done()287288err := listener.WorkspaceAdded(containerCtx, s.Workspace)289if err != nil {290log.WithError(err).WithFields(owi).Error("dispatch listener failed")291}292}(l)293}294295s.WorkspaceAdded = true296d.mu.Unlock()297}()298go func() {299// no matter if the container was deleted or not - we've lost our guard that was waiting for that to happen.300// Hence, we must stop listening for it to come into existence and cancel the context.301err := d.Runtime.WaitForContainerStop(context.Background(), workspaceInstanceID)302if err != nil && !errors.Is(err, context.DeadlineExceeded) {303log.WithError(err).WithFields(owi).Error("unexpected waiting for container to stop")304}305306cancel()307}()308309return310}311312if !state.WorkspaceAdded {313return314}315316state.Workspace.Pod = newPod317318for _, l := range d.Listener {319lu, ok := l.(UpdateListener)320if !ok {321continue322}323324state.HandlerWaitGroup.Add(1)325go func() {326defer state.HandlerWaitGroup.Done()327328err := lu.WorkspaceUpdated(state.Context, state.Workspace)329if err != nil {330log.WithError(err).WithFields(wsk8s.GetOWIFromObject(&oldPod.ObjectMeta)).Error("dispatch listener failed")331}332}()333}334}335336func (d *Dispatch) handlePodDeleted(pod *corev1.Pod) {337instanceID, ok := pod.Labels[wsk8s.WorkspaceIDLabel]338if !ok {339return340}341log.WithField("instanceID", instanceID).Debug("pod deleted")342defer log.WithField("instanceID", instanceID).Debug("pod deleted done")343344d.mu.Lock()345defer d.mu.Unlock()346347state, ok := d.ctxs[instanceID]348if !ok {349log.WithFields(wsk8s.GetOWIFromObject(&pod.ObjectMeta)).Debug("received pod deletion for a workspace, but have not seen it before. Probably another node. Ignoring update.")350return351}352if state.Cancel != nil {353state.Cancel()354}355356delete(d.ctxs, instanceID)357358}359360361