Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ee/agent-smith/pkg/detector/proc.go
2501 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package detector
6
7
import (
8
"bufio"
9
"bytes"
10
"context"
11
"encoding/binary"
12
"fmt"
13
"io"
14
"os"
15
"path/filepath"
16
"sort"
17
"strconv"
18
"strings"
19
"sync"
20
"time"
21
22
"github.com/cespare/xxhash/v2"
23
"github.com/gitpod-io/gitpod/agent-smith/pkg/common"
24
"github.com/gitpod-io/gitpod/common-go/log"
25
lru "github.com/hashicorp/golang-lru"
26
"github.com/prometheus/client_golang/prometheus"
27
"github.com/prometheus/procfs"
28
)
29
30
type discoverableProcFS interface {
31
Discover() map[int]*process
32
Environ(pid int) ([]string, error)
33
}
34
35
type realProcfs procfs.FS
36
37
var _ discoverableProcFS = realProcfs{}
38
39
func (fs realProcfs) Discover() map[int]*process {
40
proc := procfs.FS(fs)
41
procs, err := proc.AllProcs()
42
if err != nil {
43
log.WithError(err).Error("cannot list processes")
44
}
45
sort.Sort(procs)
46
47
idx := make(map[int]*process, len(procs))
48
49
digest := make([]byte, 24)
50
for _, p := range procs {
51
cmdline, err := p.CmdLine()
52
if err != nil {
53
log.WithField("pid", p.PID).WithError(err).Debug("cannot get commandline of process")
54
continue
55
}
56
stat, err := statProc(p.PID)
57
if err != nil {
58
log.WithField("pid", p.PID).WithError(err).Debug("cannot stat process")
59
continue
60
}
61
// Note: don't use p.Executable() here because it resolves the exe symlink which yields
62
// a path that doesn't make sense in this mount namespace. However, reading from this
63
// file directly works.
64
path := filepath.Join("proc", strconv.Itoa(p.PID), "exe")
65
66
// Even though we loop through a sorted process list (lowest PID first), we cannot
67
// assume that we've seen the parent already due to PID reuse.
68
parent, ok := idx[stat.PPID]
69
if !ok {
70
parent = &process{PID: stat.PPID}
71
idx[parent.PID] = parent
72
}
73
proc, ok := idx[p.PID]
74
if !ok {
75
proc = &process{PID: p.PID, Leaf: true}
76
}
77
proc.Cmdline = cmdline
78
proc.Parent = parent
79
proc.Kind = ProcessUnknown
80
proc.Path = path
81
parent.Children = append(parent.Children, proc)
82
83
binary.LittleEndian.PutUint64(digest[0:8], uint64(p.PID))
84
binary.LittleEndian.PutUint64(digest[8:16], uint64(stat.PPID))
85
binary.LittleEndian.PutUint64(digest[16:24], stat.Starttime)
86
proc.Hash = xxhash.Sum64(digest)
87
88
idx[p.PID] = proc
89
}
90
return idx
91
}
92
93
type stat struct {
94
PPID int
95
Starttime uint64
96
}
97
98
// statProc returns a limited set of /proc/<pid>/stat content.
99
func statProc(pid int) (*stat, error) {
100
f, err := os.Open(fmt.Sprintf("/proc/%d/stat", pid))
101
if err != nil {
102
return nil, err
103
}
104
defer f.Close()
105
106
return parseStat(f)
107
}
108
109
func parseStat(r io.Reader) (res *stat, err error) {
110
var (
111
ppid uint64
112
foundPPID bool
113
starttime uint64
114
i = -1
115
)
116
117
scan := bufio.NewScanner(r)
118
// We use a fixed buffer size assuming that none of the env vars we're interested in is any larger.
119
// This is part of the trick to keep allocs down.
120
scan.Buffer(make([]byte, 512), 512)
121
scan.Split(scanFixedSpace(512))
122
for scan.Scan() {
123
text := scan.Bytes()
124
if text[len(text)-1] == ')' {
125
i = 0
126
}
127
128
if i == 2 {
129
ppid, err = strconv.ParseUint(string(text), 10, 64)
130
foundPPID = true
131
}
132
if i == 20 {
133
starttime, err = strconv.ParseUint(string(text), 10, 64)
134
}
135
if err != nil {
136
return
137
}
138
139
if i >= 0 {
140
i++
141
}
142
}
143
if err != nil {
144
return nil, err
145
}
146
if err := scan.Err(); err != nil {
147
return nil, err
148
}
149
150
if !foundPPID || starttime == 0 {
151
return nil, fmt.Errorf("cannot parse stat")
152
}
153
154
return &stat{
155
PPID: int(ppid),
156
Starttime: starttime,
157
}, nil
158
}
159
160
func (p realProcfs) Environ(pid int) ([]string, error) {
161
// Note: procfs.Environ is too expensive becuase it uses io.ReadAll which leaks
162
// memory over time.
163
164
f, err := os.Open(fmt.Sprintf("/proc/%d/environ", pid))
165
if err != nil {
166
return nil, err
167
}
168
defer f.Close()
169
170
return parseGitpodEnviron(f)
171
}
172
173
func parseGitpodEnviron(r io.Reader) ([]string, error) {
174
// Note: this function is benchmarked in BenchmarkParseGitpodEnviron.
175
// At the time of this wriging it consumed 3+N allocs where N is the number of
176
// env vars starting with GITPOD_.
177
//
178
// When making changes to this function, ensure you're not causing more allocs
179
// which could have a too drastic resource usage effect in prod.
180
181
scan := bufio.NewScanner(r)
182
// We use a fixed buffer size assuming that none of the env vars we're interested in is any larger.
183
// This is part of the trick to keep allocs down.
184
scan.Buffer(make([]byte, 512), 512)
185
scan.Split(scanNullTerminatedLines(512))
186
187
// we expect at least 10 relevant env vars
188
res := make([]string, 0, 10)
189
for scan.Scan() {
190
// we only keep GITPOD_ variables for optimisation
191
text := scan.Bytes()
192
if !bytes.HasPrefix(text, []byte("GITPOD_")) {
193
continue
194
}
195
196
res = append(res, string(text))
197
}
198
return res, nil
199
}
200
201
func scanNullTerminatedLines(fixedBufferSize int) func(data []byte, atEOF bool) (advance int, token []byte, err error) {
202
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
203
if atEOF && len(data) == 0 {
204
return 0, nil, nil
205
}
206
if i := bytes.IndexByte(data, 0); i >= 0 {
207
// We have a full null-terminated line.
208
return i + 1, data[:i], nil
209
}
210
// If we're at EOF, we have a final, non-terminated line. Return it.
211
if atEOF {
212
return len(data), data, nil
213
}
214
if len(data) == 512 {
215
return len(data), data, nil
216
}
217
// Request more data.
218
return 0, nil, nil
219
}
220
}
221
222
func scanFixedSpace(fixedBufferSize int) func(data []byte, atEOF bool) (advance int, token []byte, err error) {
223
// The returned function behaves like bufio.ScanLines except that it doesn't try to
224
// request lines longer than fixedBufferSize.
225
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
226
if atEOF && len(data) == 0 {
227
return 0, nil, nil
228
}
229
if i := bytes.IndexByte(data, ' '); i >= 0 {
230
// We have a full null-terminated line.
231
return i + 1, data[:i], nil
232
}
233
// If we're at EOF, we have a final, non-terminated line. Return it.
234
if atEOF {
235
return len(data), data, nil
236
}
237
if len(data) == 512 {
238
return len(data), data, nil
239
}
240
// Request more data.
241
return 0, nil, nil
242
}
243
}
244
245
var _ ProcessDetector = &ProcfsDetector{}
246
247
// ProcfsDetector detects processes and workspaces on this node by scanning procfs
248
type ProcfsDetector struct {
249
mu sync.RWMutex
250
ps chan Process
251
252
indexSizeGuage prometheus.Gauge
253
cacheUseCounterVec *prometheus.CounterVec
254
workspaceGauge prometheus.Gauge
255
256
startOnce sync.Once
257
258
proc discoverableProcFS
259
cache *lru.Cache
260
}
261
262
func NewProcfsDetector() (*ProcfsDetector, error) {
263
p, err := procfs.NewFS("/proc")
264
if err != nil {
265
return nil, err
266
}
267
268
cache, err := lru.New(2000)
269
if err != nil {
270
return nil, err
271
}
272
273
return &ProcfsDetector{
274
indexSizeGuage: prometheus.NewGauge(prometheus.GaugeOpts{
275
Namespace: "gitpod",
276
Subsystem: "agent_smith_procfs_detector",
277
Name: "index_size",
278
Help: "number of entries in the last procfs scan index",
279
}),
280
cacheUseCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
281
Namespace: "gitpod",
282
Subsystem: "agent_smith_procfs_detector",
283
Name: "cache_use_total",
284
Help: "process cache statistics",
285
}, []string{"use"}),
286
workspaceGauge: prometheus.NewGauge(prometheus.GaugeOpts{
287
Namespace: "gitpod",
288
Subsystem: "agent_smith_procfs_detector",
289
Name: "workspace_count",
290
Help: "number of detected workspaces",
291
}),
292
proc: realProcfs(p),
293
cache: cache,
294
}, nil
295
}
296
297
func (det *ProcfsDetector) Describe(d chan<- *prometheus.Desc) {
298
det.indexSizeGuage.Describe(d)
299
det.cacheUseCounterVec.Describe(d)
300
det.workspaceGauge.Describe(d)
301
}
302
303
func (det *ProcfsDetector) Collect(m chan<- prometheus.Metric) {
304
det.indexSizeGuage.Collect(m)
305
det.cacheUseCounterVec.Collect(m)
306
det.workspaceGauge.Collect(m)
307
}
308
309
func (det *ProcfsDetector) start() {
310
ps := make(chan Process, 100)
311
go func() {
312
t := time.NewTicker(30 * time.Second)
313
defer t.Stop()
314
315
for {
316
det.run(ps)
317
<-t.C
318
}
319
}()
320
go func() {
321
for p := range ps {
322
det.ps <- p
323
}
324
}()
325
log.Info("procfs detector started")
326
}
327
328
type process struct {
329
PID int
330
Depth int
331
Path string
332
Kind ProcessKind
333
Parent *process
334
Children []*process
335
Leaf bool
336
Cmdline []string
337
Workspace *common.Workspace
338
Hash uint64
339
}
340
341
func (det *ProcfsDetector) run(processes chan<- Process) {
342
log.Debug("procfs detector run")
343
idx := det.proc.Discover()
344
345
// We now have a complete view of the process table. Let's calculate the depths
346
root, ok := idx[1]
347
if !ok {
348
log.Error("cannot find pid 1")
349
return
350
}
351
det.indexSizeGuage.Set(float64(len(idx)))
352
353
// let's find all workspaces, from the root down
354
findWorkspaces(det.proc, root, 0, nil)
355
356
workspaces := 0
357
for _, p := range idx {
358
if p.Workspace == nil {
359
continue
360
}
361
362
if p.Kind == ProcessSandbox {
363
workspaces = workspaces + 1
364
}
365
366
if p.Kind != ProcessUserWorkload {
367
continue
368
}
369
370
if _, ok := det.cache.Get(p.Hash); ok {
371
det.cacheUseCounterVec.WithLabelValues("hit").Inc()
372
continue
373
}
374
det.cacheUseCounterVec.WithLabelValues("miss").Inc()
375
det.cache.Add(p.Hash, struct{}{})
376
377
proc := Process{
378
Path: p.Path,
379
CommandLine: p.Cmdline,
380
Kind: p.Kind,
381
Workspace: p.Workspace,
382
}
383
log.WithField("proc", proc).Debug("found process")
384
processes <- proc
385
}
386
387
det.workspaceGauge.Set(float64(workspaces))
388
}
389
390
func findWorkspaces(proc discoverableProcFS, p *process, d int, ws *common.Workspace) {
391
p.Depth = d
392
p.Workspace = ws
393
if ws == nil {
394
p.Kind = ProcessUnknown
395
396
if len(p.Cmdline) >= 2 && p.Cmdline[0] == "/proc/self/exe" && p.Cmdline[1] == "ring1" {
397
// we've potentially found a workspacekit process, and expect it's one child to a be a supervisor process
398
if len(p.Children) > 0 {
399
c := p.Children[0]
400
401
if isSupervisor(c.Cmdline) {
402
// we've found the corresponding supervisor process - hence the original process must be a workspace
403
p.Workspace = extractWorkspaceFromWorkspacekit(proc, p.PID)
404
405
if p.Workspace != nil {
406
// we have actually found a workspace, but extractWorkspaceFromWorkspacekit sets the PID of the workspace
407
// to the PID we extracted that data from, i.e. workspacekit. We want the workspace PID to point to the
408
// supervisor process, so that when we kill that process we hit supervisor, not workspacekit.
409
p.Workspace.PID = c.PID
410
p.Kind = ProcessSandbox
411
c.Kind = ProcessSupervisor
412
}
413
}
414
}
415
}
416
} else if isSupervisor(p.Cmdline) {
417
p.Kind = ProcessSupervisor
418
} else {
419
p.Kind = ProcessUserWorkload
420
}
421
422
for _, c := range p.Children {
423
findWorkspaces(proc, c, d+1, p.Workspace)
424
}
425
}
426
427
func isSupervisor(cmdline []string) bool {
428
return len(cmdline) == 2 && cmdline[0] == "supervisor" && cmdline[1] == "init"
429
}
430
431
func extractWorkspaceFromWorkspacekit(proc discoverableProcFS, pid int) *common.Workspace {
432
env, err := proc.Environ(pid)
433
if err != nil {
434
log.WithError(err).Debug("cannot get environment from process - might have missed a workspace")
435
return nil
436
}
437
var (
438
ownerID, workspaceID, instanceID string
439
gitURL string
440
)
441
for _, e := range env {
442
if strings.HasPrefix(e, "GITPOD_OWNER_ID=") {
443
ownerID = strings.TrimPrefix(e, "GITPOD_OWNER_ID=")
444
continue
445
}
446
if strings.HasPrefix(e, "GITPOD_WORKSPACE_ID=") {
447
workspaceID = strings.TrimPrefix(e, "GITPOD_WORKSPACE_ID=")
448
continue
449
}
450
if strings.HasPrefix(e, "GITPOD_INSTANCE_ID=") {
451
instanceID = strings.TrimPrefix(e, "GITPOD_INSTANCE_ID=")
452
continue
453
}
454
if strings.HasPrefix(e, "GITPOD_WORKSPACE_CONTEXT_URL=") {
455
gitURL = strings.TrimPrefix(e, "GITPOD_WORKSPACE_CONTEXT_URL=")
456
continue
457
}
458
}
459
return &common.Workspace{
460
OwnerID: ownerID,
461
WorkspaceID: workspaceID,
462
InstanceID: instanceID,
463
GitURL: gitURL,
464
PID: pid,
465
}
466
}
467
468
// DiscoverProcesses starts process discovery. Must not be called more than once.
469
func (det *ProcfsDetector) DiscoverProcesses(ctx context.Context) (<-chan Process, error) {
470
det.mu.Lock()
471
defer det.mu.Unlock()
472
473
if det.ps != nil {
474
return nil, fmt.Errorf("already discovering processes")
475
}
476
res := make(chan Process, 100)
477
det.ps = res
478
det.startOnce.Do(det.start)
479
480
return res, nil
481
}
482
483