Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/cmd/init.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cmd
6
7
import (
8
"bytes"
9
"context"
10
"encoding/json"
11
"fmt"
12
"io"
13
"os"
14
"os/exec"
15
"os/signal"
16
"strings"
17
"sync"
18
"sync/atomic"
19
"syscall"
20
"time"
21
22
"github.com/gitpod-io/gitpod/common-go/log"
23
"github.com/gitpod-io/gitpod/common-go/process"
24
"github.com/gitpod-io/gitpod/supervisor/pkg/shared"
25
"github.com/gitpod-io/gitpod/supervisor/pkg/supervisor"
26
"github.com/prometheus/procfs"
27
reaper "github.com/ramr/go-reaper"
28
"github.com/spf13/cobra"
29
)
30
31
var initCmd = &cobra.Command{
32
Use: "init",
33
Short: "init the supervisor",
34
Run: func(cmd *cobra.Command, args []string) {
35
logFile := initLog(true)
36
defer logFile.Close()
37
38
cfg, err := supervisor.GetConfig()
39
if err != nil {
40
log.WithError(err).Info("cannnot load config")
41
}
42
var (
43
sigInput = make(chan os.Signal, 1)
44
)
45
signal.Notify(sigInput, os.Interrupt, syscall.SIGTERM)
46
47
// check if git executable exists, supervisor will fail if it doesn't
48
// checking for it here allows to bubble up this error to the user
49
_, err = exec.LookPath("git")
50
if err != nil {
51
log.WithError(err).Fatal("cannot find git executable, make sure it is installed as part of gitpod image")
52
}
53
54
supervisorPath, err := os.Executable()
55
if err != nil {
56
supervisorPath = "/.supervisor/supervisor"
57
}
58
59
debugProxyCtx, stopDebugProxy := context.WithCancel(context.Background())
60
if os.Getenv("SUPERVISOR_DEBUG_WORKSPACE_TYPE") != "" {
61
err = exec.CommandContext(debugProxyCtx, supervisorPath, "debug-proxy").Start()
62
if err != nil {
63
log.WithError(err).Fatal("cannot run debug workspace proxy")
64
}
65
}
66
defer stopDebugProxy()
67
68
runCommand := exec.Command(supervisorPath, "run")
69
runCommand.Args[0] = "supervisor"
70
runCommand.Stdin = os.Stdin
71
runCommand.Stdout = os.Stdout
72
runCommand.Stderr = os.Stderr
73
runCommand.Env = os.Environ()
74
err = runCommand.Start()
75
if err != nil {
76
log.WithError(err).Error("supervisor run start error")
77
return
78
}
79
80
supervisorDone := make(chan struct{})
81
handledByReaper := make(chan int)
82
// supervisor is expected to be killed when receiving signals
83
ignoreUnexpectedExitCode := atomic.Bool{}
84
handleSupervisorExit := func(exitCode int) {
85
if exitCode == 0 {
86
return
87
}
88
logs := extractFailureFromRun()
89
if shared.IsExpectedShutdown(exitCode) {
90
log.Fatal(logs)
91
} else {
92
if ignoreUnexpectedExitCode.Load() {
93
return
94
}
95
log.WithError(fmt.Errorf("%s", logs)).Fatal("supervisor run error with unexpected exit code")
96
}
97
}
98
go func() {
99
defer close(supervisorDone)
100
101
err := runCommand.Wait()
102
if err == nil {
103
return
104
}
105
// exited by reaper
106
if strings.Contains(err.Error(), "no child processes") {
107
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
108
defer cancel()
109
select {
110
case <-ctx.Done(): // timeout
111
case exitCode := <-handledByReaper:
112
handleSupervisorExit(exitCode)
113
}
114
} else if !(strings.Contains(err.Error(), "signal: ")) {
115
if eerr, ok := err.(*exec.ExitError); ok && eerr.ExitCode() != 0 {
116
handleSupervisorExit(eerr.ExitCode())
117
}
118
log.WithError(err).Error("supervisor run error")
119
return
120
}
121
}()
122
// start the reaper to clean up zombie processes
123
reaperChan := make(chan reaper.Status, 10)
124
reaper.Start(reaper.Config{
125
Pid: -1,
126
Options: 0,
127
DisablePid1Check: false,
128
StatusChannel: reaperChan,
129
})
130
go func() {
131
for status := range reaperChan {
132
if status.Pid != runCommand.Process.Pid {
133
continue
134
}
135
exitCode := status.WaitStatus.ExitStatus()
136
handledByReaper <- exitCode
137
}
138
}()
139
140
select {
141
case <-supervisorDone:
142
// supervisor has ended - we're all done here
143
defer log.Info("supervisor has ended (supervisorDone)")
144
return
145
case <-sigInput:
146
ignoreUnexpectedExitCode.Store(true)
147
// we received a terminating signal - pass on to supervisor and wait for it to finish
148
ctx, cancel := context.WithTimeout(context.Background(), cfg.GetTerminationGracePeriod())
149
defer cancel()
150
slog := newShutdownLogger()
151
defer slog.Close()
152
slog.write("Shutting down all processes")
153
154
terminationDone := make(chan struct{})
155
go func() {
156
defer close(terminationDone)
157
slog.TerminateSync(ctx, runCommand.Process.Pid)
158
terminateAllProcesses(ctx, slog)
159
}()
160
// wait for either successful termination or the timeout
161
select {
162
case <-ctx.Done():
163
// Time is up, but we give all the goroutines a bit more time to react to this.
164
time.Sleep(time.Millisecond * 1000)
165
defer log.Info("supervisor has ended (ctx.Done)")
166
case <-terminationDone:
167
defer log.Info("supervisor has ended (terminationDone)")
168
}
169
slog.write("Finished shutting down all processes.")
170
}
171
},
172
}
173
174
// terminateAllProcesses terminates all processes but ours until there are none anymore or the context is cancelled
175
// on context cancellation any still running processes receive a SIGKILL
176
func terminateAllProcesses(ctx context.Context, slog shutdownLogger) {
177
for {
178
processes, err := procfs.AllProcs()
179
if err != nil {
180
log.WithError(err).Error("Cannot list processes")
181
slog.write(fmt.Sprintf("Cannot list processes: %s", err))
182
return
183
}
184
// only one process (must be us)
185
if len(processes) == 1 {
186
return
187
}
188
// terminate all processes but ourself
189
var wg sync.WaitGroup
190
for _, proc := range processes {
191
if proc.PID == os.Getpid() {
192
continue
193
}
194
p := proc
195
wg.Add(1)
196
go func() {
197
defer wg.Done()
198
slog.TerminateSync(ctx, p.PID)
199
}()
200
}
201
wg.Wait()
202
}
203
}
204
205
func init() {
206
rootCmd.AddCommand(initCmd)
207
}
208
209
type shutdownLogger interface {
210
write(s string)
211
TerminateSync(ctx context.Context, pid int)
212
io.Closer
213
}
214
215
func newShutdownLogger() shutdownLogger {
216
file := "/workspace/.gitpod/supervisor-termination.log"
217
f, err := os.Create(file)
218
if err != nil {
219
log.WithError(err).WithField("file", file).Error("Couldn't create shutdown log file")
220
}
221
result := shutdownLoggerImpl{
222
file: f,
223
startTime: time.Now(),
224
}
225
return &result
226
}
227
228
type shutdownLoggerImpl struct {
229
file *os.File
230
startTime time.Time
231
}
232
233
func (l *shutdownLoggerImpl) write(s string) {
234
if l.file != nil {
235
msg := fmt.Sprintf("[%s] %s \n", time.Since(l.startTime), s)
236
_, err := l.file.WriteString(msg)
237
if err != nil {
238
log.WithError(err).Error("couldn't write to log file")
239
}
240
log.Infof("slog: %s", msg)
241
} else {
242
log.Debug(s)
243
}
244
}
245
func (l *shutdownLoggerImpl) Close() error {
246
return l.file.Close()
247
}
248
func (l *shutdownLoggerImpl) TerminateSync(ctx context.Context, pid int) {
249
proc, err := procfs.NewProc(pid)
250
if err != nil {
251
l.write(fmt.Sprintf("Couldn't obtain process information for PID %d.", pid))
252
return
253
}
254
stat, err := proc.Stat()
255
if err != nil {
256
l.write(fmt.Sprintf("Couldn't obtain process information for PID %d.", pid))
257
} else if stat.State == "Z" {
258
l.write(fmt.Sprintf("Process %s with PID %d is a zombie, skipping termination.", stat.Comm, pid))
259
return
260
} else {
261
l.write(fmt.Sprintf("Terminating process %s with PID %d (state: %s, cmdlind: %s).", stat.Comm, pid, stat.State, fmt.Sprint(proc.CmdLine())))
262
}
263
err = process.TerminateSync(ctx, pid)
264
if err != nil {
265
if err == process.ErrForceKilled {
266
l.write("Terminating process didn't finish, but had to be force killed")
267
} else {
268
l.write(fmt.Sprintf("Terminating main process errored: %s", err))
269
}
270
}
271
}
272
273
// extractFailureFromLogs attempts to extract the last error message from `supervisor run` command
274
func extractFailureFromRun() string {
275
logs, err := os.ReadFile("/dev/termination-log")
276
if err != nil {
277
return ""
278
}
279
var sep = []byte("\n")
280
var msg struct {
281
Error string `json:"error"`
282
Message string `json:"message"`
283
}
284
285
var nidx int
286
for idx := bytes.LastIndex(logs, sep); idx > 0; idx = nidx {
287
nidx = bytes.LastIndex(logs[:idx], sep)
288
if nidx < 0 {
289
nidx = 0
290
}
291
292
line := logs[nidx:idx]
293
err := json.Unmarshal(line, &msg)
294
if err != nil {
295
continue
296
}
297
298
if msg.Message == "" {
299
continue
300
}
301
302
if msg.Error == "" {
303
return msg.Message
304
}
305
306
return msg.Message + ": " + msg.Error
307
}
308
return string(logs)
309
}
310
311