Path: blob/main/components/supervisor/pkg/config/gitpod-config.go
2500 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package config56import (7"context"8"os"9"path/filepath"10"sync"11"time"1213"github.com/fsnotify/fsnotify"14"gopkg.in/yaml.v3"1516"github.com/gitpod-io/gitpod/common-go/log"17gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"18)1920// ConfigInterface provides access to the gitpod config file.21type ConfigInterface interface {22// Watch starts the config watching23Watch(ctx context.Context)24// Observe provides channels triggered whenever the config is changed25Observe(ctx context.Context) <-chan *gitpod.GitpodConfig26// Observe provides channels triggered whenever the image file is changed27ObserveImageFile(ctx context.Context) <-chan *struct{}28}2930// ConfigService provides access to the gitpod config file.31type ConfigService struct {32locationReady <-chan struct{}3334configLocation string35configWatcher *fileWatcher[gitpod.GitpodConfig]3637imageWatcher *fileWatcher[struct{}]38}3940// NewConfigService creates a new instance of ConfigService.41func NewConfigService(configLocation string, locationReady <-chan struct{}) *ConfigService {42return &ConfigService{43locationReady: locationReady,44configLocation: configLocation,45configWatcher: newFileWatcher(func(data []byte) (*gitpod.GitpodConfig, error) {46var config *gitpod.GitpodConfig47err := yaml.Unmarshal(data, &config)48return config, err49}),50imageWatcher: newFileWatcher(func(data []byte) (*struct{}, error) {51return &struct{}{}, nil52}),53}54}5556// Observe provides channels triggered whenever the config is changed.57func (service *ConfigService) Observe(ctx context.Context) <-chan *gitpod.GitpodConfig {58return service.configWatcher.observe(ctx)59}6061// Observe provides channels triggered whenever the image file is changed62func (service *ConfigService) ObserveImageFile(ctx context.Context) <-chan *struct{} {63return service.imageWatcher.observe(ctx)64}6566// Watch starts the config watching.67func (service *ConfigService) Watch(ctx context.Context) {68select {69case <-service.locationReady:70case <-ctx.Done():71return72}73go service.watchImageFile(ctx)74service.configWatcher.watch(ctx, service.configLocation)75}7677func (service *ConfigService) watchImageFile(ctx context.Context) {78var (79imageLocation string80cancelWatch func()81)82defer func() {83if cancelWatch != nil {84cancelWatch()85}86}()87cfgs := service.configWatcher.observe(ctx)88for {89select {90case cfg, ok := <-cfgs:91if !ok {92return93}94var currentImageLocation string95if cfg != nil {96switch img := cfg.Image.(type) {97case map[string]interface{}:98if file, ok := img["file"].(string); ok {99currentImageLocation = filepath.Join(filepath.Dir(service.configLocation), file)100}101}102}103if imageLocation == currentImageLocation {104continue105}106if cancelWatch != nil {107cancelWatch()108cancelWatch = nil109service.imageWatcher.reset()110}111imageLocation = currentImageLocation112if imageLocation == "" {113continue114}115watchCtx, cancel := context.WithCancel(ctx)116cancelWatch = cancel117go service.imageWatcher.watch(watchCtx, imageLocation)118case <-ctx.Done():119return120}121}122}123124type fileWatcher[T any] struct {125unmarshal func(data []byte) (*T, error)126127cond *sync.Cond128data *T129130pollTimer *time.Timer131132ready chan struct{}133readyOnce sync.Once134135debounceDuration time.Duration136}137138func newFileWatcher[T any](unmarshal func(data []byte) (*T, error)) *fileWatcher[T] {139return &fileWatcher[T]{140unmarshal: unmarshal,141cond: sync.NewCond(&sync.Mutex{}),142ready: make(chan struct{}),143debounceDuration: 100 * time.Millisecond,144}145}146147func (service *fileWatcher[T]) observe(ctx context.Context) <-chan *T {148results := make(chan *T)149go func() {150defer close(results)151152<-service.ready153154service.cond.L.Lock()155defer service.cond.L.Unlock()156for {157results <- service.data158159service.cond.Wait()160if ctx.Err() != nil {161return162}163}164}()165return results166}167168func (service *fileWatcher[T]) markReady() {169service.readyOnce.Do(func() {170close(service.ready)171})172}173174func (service *fileWatcher[T]) reset() {175service.cond.L.Lock()176defer service.cond.L.Unlock()177178if service.data != nil {179service.data = nil180service.cond.Broadcast()181}182}183184func (service *fileWatcher[T]) watch(ctx context.Context, location string) {185log.WithField("location", location).Info("file watcher: starting...")186187_, err := os.Stat(location)188if os.IsNotExist(err) {189service.poll(ctx, location)190} else {191service.doWatch(ctx, location)192}193}194195func (service *fileWatcher[T]) doWatch(ctx context.Context, location string) {196watcher, err := fsnotify.NewWatcher()197defer func() {198if err != nil {199log.WithField("location", location).WithError(err).Error("file watcher: failed to start")200return201}202203log.WithField("location", location).Info("file watcher: started")204}()205if err != nil {206return207}208209err = watcher.Add(location)210if err != nil {211watcher.Close()212return213}214215go func() {216defer log.WithField("location", location).Info("file watcher: stopped")217defer watcher.Close()218219polling := make(chan struct{}, 1)220service.scheduleUpdate(ctx, polling, location)221for {222select {223case <-polling:224return225case <-ctx.Done():226return227case err := <-watcher.Errors:228log.WithField("location", location).WithError(err).Error("file watcher: failed to watch")229case <-watcher.Events:230service.scheduleUpdate(ctx, polling, location)231}232}233}()234}235236func (service *fileWatcher[T]) scheduleUpdate(ctx context.Context, polling chan<- struct{}, location string) {237service.cond.L.Lock()238defer service.cond.L.Unlock()239if service.pollTimer != nil {240service.pollTimer.Stop()241}242service.pollTimer = time.AfterFunc(service.debounceDuration, func() {243err := service.update(location)244if os.IsNotExist(err) {245polling <- struct{}{}246go service.poll(ctx, location)247} else if err != nil {248log.WithField("location", location).WithError(err).Error("file watcher: failed to parse")249}250})251}252253func (service *fileWatcher[T]) poll(ctx context.Context, location string) {254service.markReady()255256timer := time.NewTicker(2 * time.Second)257defer timer.Stop()258259for {260select {261case <-ctx.Done():262return263case <-timer.C:264}265266if _, err := os.Stat(location); !os.IsNotExist(err) {267service.doWatch(ctx, location)268return269}270}271}272273func (service *fileWatcher[T]) update(location string) error {274service.cond.L.Lock()275defer service.cond.L.Unlock()276277data, err := service.parse(location)278if err == nil || os.IsNotExist(err) {279service.data = data280service.markReady()281service.cond.Broadcast()282283log.WithField("location", location).WithField("data", service.data).Debug("file watcher: updated")284}285286return err287}288289func (service *fileWatcher[T]) parse(location string) (*T, error) {290data, err := os.ReadFile(location)291if err != nil {292return nil, err293}294return service.unmarshal(data)295}296297298