Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/pkg/config/gitpod-config.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package config
6
7
import (
8
"context"
9
"os"
10
"path/filepath"
11
"sync"
12
"time"
13
14
"github.com/fsnotify/fsnotify"
15
"gopkg.in/yaml.v3"
16
17
"github.com/gitpod-io/gitpod/common-go/log"
18
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
19
)
20
21
// ConfigInterface provides access to the gitpod config file.
22
type ConfigInterface interface {
23
// Watch starts the config watching
24
Watch(ctx context.Context)
25
// Observe provides channels triggered whenever the config is changed
26
Observe(ctx context.Context) <-chan *gitpod.GitpodConfig
27
// Observe provides channels triggered whenever the image file is changed
28
ObserveImageFile(ctx context.Context) <-chan *struct{}
29
}
30
31
// ConfigService provides access to the gitpod config file.
32
type ConfigService struct {
33
locationReady <-chan struct{}
34
35
configLocation string
36
configWatcher *fileWatcher[gitpod.GitpodConfig]
37
38
imageWatcher *fileWatcher[struct{}]
39
}
40
41
// NewConfigService creates a new instance of ConfigService.
42
func NewConfigService(configLocation string, locationReady <-chan struct{}) *ConfigService {
43
return &ConfigService{
44
locationReady: locationReady,
45
configLocation: configLocation,
46
configWatcher: newFileWatcher(func(data []byte) (*gitpod.GitpodConfig, error) {
47
var config *gitpod.GitpodConfig
48
err := yaml.Unmarshal(data, &config)
49
return config, err
50
}),
51
imageWatcher: newFileWatcher(func(data []byte) (*struct{}, error) {
52
return &struct{}{}, nil
53
}),
54
}
55
}
56
57
// Observe provides channels triggered whenever the config is changed.
58
func (service *ConfigService) Observe(ctx context.Context) <-chan *gitpod.GitpodConfig {
59
return service.configWatcher.observe(ctx)
60
}
61
62
// Observe provides channels triggered whenever the image file is changed
63
func (service *ConfigService) ObserveImageFile(ctx context.Context) <-chan *struct{} {
64
return service.imageWatcher.observe(ctx)
65
}
66
67
// Watch starts the config watching.
68
func (service *ConfigService) Watch(ctx context.Context) {
69
select {
70
case <-service.locationReady:
71
case <-ctx.Done():
72
return
73
}
74
go service.watchImageFile(ctx)
75
service.configWatcher.watch(ctx, service.configLocation)
76
}
77
78
func (service *ConfigService) watchImageFile(ctx context.Context) {
79
var (
80
imageLocation string
81
cancelWatch func()
82
)
83
defer func() {
84
if cancelWatch != nil {
85
cancelWatch()
86
}
87
}()
88
cfgs := service.configWatcher.observe(ctx)
89
for {
90
select {
91
case cfg, ok := <-cfgs:
92
if !ok {
93
return
94
}
95
var currentImageLocation string
96
if cfg != nil {
97
switch img := cfg.Image.(type) {
98
case map[string]interface{}:
99
if file, ok := img["file"].(string); ok {
100
currentImageLocation = filepath.Join(filepath.Dir(service.configLocation), file)
101
}
102
}
103
}
104
if imageLocation == currentImageLocation {
105
continue
106
}
107
if cancelWatch != nil {
108
cancelWatch()
109
cancelWatch = nil
110
service.imageWatcher.reset()
111
}
112
imageLocation = currentImageLocation
113
if imageLocation == "" {
114
continue
115
}
116
watchCtx, cancel := context.WithCancel(ctx)
117
cancelWatch = cancel
118
go service.imageWatcher.watch(watchCtx, imageLocation)
119
case <-ctx.Done():
120
return
121
}
122
}
123
}
124
125
type fileWatcher[T any] struct {
126
unmarshal func(data []byte) (*T, error)
127
128
cond *sync.Cond
129
data *T
130
131
pollTimer *time.Timer
132
133
ready chan struct{}
134
readyOnce sync.Once
135
136
debounceDuration time.Duration
137
}
138
139
func newFileWatcher[T any](unmarshal func(data []byte) (*T, error)) *fileWatcher[T] {
140
return &fileWatcher[T]{
141
unmarshal: unmarshal,
142
cond: sync.NewCond(&sync.Mutex{}),
143
ready: make(chan struct{}),
144
debounceDuration: 100 * time.Millisecond,
145
}
146
}
147
148
func (service *fileWatcher[T]) observe(ctx context.Context) <-chan *T {
149
results := make(chan *T)
150
go func() {
151
defer close(results)
152
153
<-service.ready
154
155
service.cond.L.Lock()
156
defer service.cond.L.Unlock()
157
for {
158
results <- service.data
159
160
service.cond.Wait()
161
if ctx.Err() != nil {
162
return
163
}
164
}
165
}()
166
return results
167
}
168
169
func (service *fileWatcher[T]) markReady() {
170
service.readyOnce.Do(func() {
171
close(service.ready)
172
})
173
}
174
175
func (service *fileWatcher[T]) reset() {
176
service.cond.L.Lock()
177
defer service.cond.L.Unlock()
178
179
if service.data != nil {
180
service.data = nil
181
service.cond.Broadcast()
182
}
183
}
184
185
func (service *fileWatcher[T]) watch(ctx context.Context, location string) {
186
log.WithField("location", location).Info("file watcher: starting...")
187
188
_, err := os.Stat(location)
189
if os.IsNotExist(err) {
190
service.poll(ctx, location)
191
} else {
192
service.doWatch(ctx, location)
193
}
194
}
195
196
func (service *fileWatcher[T]) doWatch(ctx context.Context, location string) {
197
watcher, err := fsnotify.NewWatcher()
198
defer func() {
199
if err != nil {
200
log.WithField("location", location).WithError(err).Error("file watcher: failed to start")
201
return
202
}
203
204
log.WithField("location", location).Info("file watcher: started")
205
}()
206
if err != nil {
207
return
208
}
209
210
err = watcher.Add(location)
211
if err != nil {
212
watcher.Close()
213
return
214
}
215
216
go func() {
217
defer log.WithField("location", location).Info("file watcher: stopped")
218
defer watcher.Close()
219
220
polling := make(chan struct{}, 1)
221
service.scheduleUpdate(ctx, polling, location)
222
for {
223
select {
224
case <-polling:
225
return
226
case <-ctx.Done():
227
return
228
case err := <-watcher.Errors:
229
log.WithField("location", location).WithError(err).Error("file watcher: failed to watch")
230
case <-watcher.Events:
231
service.scheduleUpdate(ctx, polling, location)
232
}
233
}
234
}()
235
}
236
237
func (service *fileWatcher[T]) scheduleUpdate(ctx context.Context, polling chan<- struct{}, location string) {
238
service.cond.L.Lock()
239
defer service.cond.L.Unlock()
240
if service.pollTimer != nil {
241
service.pollTimer.Stop()
242
}
243
service.pollTimer = time.AfterFunc(service.debounceDuration, func() {
244
err := service.update(location)
245
if os.IsNotExist(err) {
246
polling <- struct{}{}
247
go service.poll(ctx, location)
248
} else if err != nil {
249
log.WithField("location", location).WithError(err).Error("file watcher: failed to parse")
250
}
251
})
252
}
253
254
func (service *fileWatcher[T]) poll(ctx context.Context, location string) {
255
service.markReady()
256
257
timer := time.NewTicker(2 * time.Second)
258
defer timer.Stop()
259
260
for {
261
select {
262
case <-ctx.Done():
263
return
264
case <-timer.C:
265
}
266
267
if _, err := os.Stat(location); !os.IsNotExist(err) {
268
service.doWatch(ctx, location)
269
return
270
}
271
}
272
}
273
274
func (service *fileWatcher[T]) update(location string) error {
275
service.cond.L.Lock()
276
defer service.cond.L.Unlock()
277
278
data, err := service.parse(location)
279
if err == nil || os.IsNotExist(err) {
280
service.data = data
281
service.markReady()
282
service.cond.Broadcast()
283
284
log.WithField("location", location).WithField("data", service.data).Debug("file watcher: updated")
285
}
286
287
return err
288
}
289
290
func (service *fileWatcher[T]) parse(location string) (*T, error) {
291
data, err := os.ReadFile(location)
292
if err != nil {
293
return nil, err
294
}
295
return service.unmarshal(data)
296
}
297
298